open pipes in non-blocking loops, preventing thread leaks

This commit is contained in:
Jakob Ketterl 2020-08-13 23:35:49 +02:00
parent 56debcd08a
commit ddb5fe51b3

View File

@ -26,6 +26,7 @@ import os
import signal import signal
import threading import threading
import math import math
import time
from functools import partial from functools import partial
from owrx.kiss import KissClient, DirewolfConfig from owrx.kiss import KissClient, DirewolfConfig
@ -85,6 +86,7 @@ class Pipe(object):
return Pipe(path, None, encoding=encoding) return Pipe(path, None, encoding=encoding)
def __init__(self, path, direction, encoding=None): def __init__(self, path, direction, encoding=None):
self.doOpen = True
self.path = path self.path = path
self.direction = direction self.direction = direction
self.encoding = encoding self.encoding = encoding
@ -99,6 +101,7 @@ class Pipe(object):
self.file = open(self.path, self.direction, encoding=self.encoding) self.file = open(self.path, self.direction, encoding=self.encoding)
def close(self): def close(self):
self.doOpen = False
try: try:
if self.file is not None: if self.file is not None:
self.file.close() self.file.close()
@ -121,7 +124,30 @@ class WritingPipe(Pipe):
self.open() self.open()
def open_and_dequeue(self): def open_and_dequeue(self):
super().open() retries = 0
def opener(path, flags):
fd = os.open(path, flags | os.O_NONBLOCK)
os.set_blocking(fd, True)
return fd
while self.file is None and self.doOpen and retries < 10:
try:
self.file = open(self.path, self.direction, encoding=self.encoding, opener=opener)
except OSError as error:
# ENXIO = FIFO has not been opened for reading
if error.errno == 6:
time.sleep(.1)
retries += 1
else:
raise
# if doOpen is false, opening has been canceled, so no warning in that case.
if self.file is None:
if self.doOpen:
logger.warning("could not open FIFO %s", self.path)
return
with self.queueLock: with self.queueLock:
for i in self.queue: for i in self.queue:
self.file.write(i) self.file.write(i)
@ -140,11 +166,6 @@ class WritingPipe(Pipe):
self.file.flush() self.file.flush()
return r return r
def close(self):
if self.file is None:
logger.warning("queue %s never successfully opened - thread leak!", self.path)
super().close()
class ReadingPipe(Pipe): class ReadingPipe(Pipe):
def __init__(self, path, encoding=None): def __init__(self, path, encoding=None):