diff --git a/csdr/csdr.py b/csdr/csdr.py index 82e3f48..d8dd6ed 100644 --- a/csdr/csdr.py +++ b/csdr/csdr.py @@ -26,6 +26,7 @@ import os import signal import threading import math +import time from functools import partial from owrx.kiss import KissClient, DirewolfConfig @@ -85,6 +86,7 @@ class Pipe(object): return Pipe(path, None, encoding=encoding) def __init__(self, path, direction, encoding=None): + self.doOpen = True self.path = path self.direction = direction self.encoding = encoding @@ -99,6 +101,7 @@ class Pipe(object): self.file = open(self.path, self.direction, encoding=self.encoding) def close(self): + self.doOpen = False try: if self.file is not None: self.file.close() @@ -121,7 +124,30 @@ class WritingPipe(Pipe): self.open() def open_and_dequeue(self): - super().open() + retries = 0 + + def opener(path, flags): + fd = os.open(path, flags | os.O_NONBLOCK) + os.set_blocking(fd, True) + return fd + + while self.file is None and self.doOpen and retries < 10: + try: + self.file = open(self.path, self.direction, encoding=self.encoding, opener=opener) + except OSError as error: + # ENXIO = FIFO has not been opened for reading + if error.errno == 6: + time.sleep(.1) + retries += 1 + else: + raise + + # if doOpen is false, opening has been canceled, so no warning in that case. + if self.file is None: + if self.doOpen: + logger.warning("could not open FIFO %s", self.path) + return + with self.queueLock: for i in self.queue: self.file.write(i) @@ -140,11 +166,6 @@ class WritingPipe(Pipe): self.file.flush() return r - def close(self): - if self.file is None: - logger.warning("queue %s never successfully opened - thread leak!", self.path) - super().close() - class ReadingPipe(Pipe): def __init__(self, path, encoding=None):