From 7e5ea6e0655ca8e288f66f9ca73f9327b419df57 Mon Sep 17 00:00:00 2001 From: Jakob Ketterl Date: Fri, 14 Aug 2020 20:20:07 +0200 Subject: [PATCH] improve read pipe opening --- csdr/pipe.py | 67 +++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 50 insertions(+), 17 deletions(-) diff --git a/csdr/pipe.py b/csdr/pipe.py index 3cc354a..bbf9a67 100644 --- a/csdr/pipe.py +++ b/csdr/pipe.py @@ -31,27 +31,23 @@ class Pipe(object): os.mkfifo(self.path) def open(self): - retries = 0 + """ + this method opens the file descriptor with an added O_NONBLOCK flag. This gives us a special behaviour for + FIFOS, when they are not opened by the opposing side: + - opening a pipe for writing will throw an OSError with errno = 6 (ENXIO). This is handled specially in the + WritingPipe class. + - opening a pipe for reading will pass through this method instantly, even if the opposing end has not been + opened yet, but the resulting file descriptor will behave as if O_NONBLOCK is set (even if we remove it + immediately here), resulting in empty reads until data is available. This is handled specially in the + ReadingPipe class. + """ def opener(path, flags): fd = os.open(path, flags | os.O_NONBLOCK) os.set_blocking(fd, True) return fd - while self.file is None and self.doOpen and retries < 10: - try: - self.file = open(self.path, self.direction, encoding=self.encoding, opener=opener) - except OSError as error: - # ENXIO = FIFO has not been opened for reading - if error.errno == 6: - time.sleep(.1) - retries += 1 - else: - raise - - # if doOpen is false, opening has been canceled, so no warning in that case. - if self.file is None and self.doOpen: - logger.warning("could not open FIFO %s", self.path) + self.file = open(self.path, self.direction, encoding=self.encoding, opener=opener) def close(self): self.doOpen = False @@ -77,9 +73,30 @@ class WritingPipe(Pipe): self.open() def open_and_dequeue(self): - super().open() + """ + This method implements a retry loop that can be interrupted in case the Pipe gets shutdown before actually + being connected. + After the pipe is opened successfully, all data that has been queued is sent in the order it was passed into + write(). + """ + retries = 0 + + while self.file is None and self.doOpen and retries < 10: + try: + super().open() + except OSError as error: + # ENXIO = FIFO has not been opened for reading + if error.errno == 6: + time.sleep(.1) + retries += 1 + else: + raise + + # if doOpen is false, opening has been canceled, so no warning in that case. if self.file is None: + if self.doOpen: + logger.warning("could not open FIFO %s", self.path) return with self.queueLock: @@ -89,9 +106,17 @@ class WritingPipe(Pipe): self.queue = None def open(self): + """ + This sends the opening operation off to a background thread. If we were to block the thread here, another pipe + may be waiting in the queue to be opened on the opposing side, resulting in a deadlock + """ threading.Thread(target=self.open_and_dequeue).start() def write(self, data): + """ + This method queues all data to be written until the file is actually opened. As soon as a file is available, + it becomes a passthrough. + """ if self.file is None: with self.queueLock: self.queue.append(data) @@ -106,10 +131,18 @@ class ReadingPipe(Pipe): super().__init__(path, "r", encoding=encoding) def open(self): + """ + This method implements an interruptible loop that waits for the file descriptor to be opened and the first + batch of data coming in using repeated select() calls. + :return: + """ if not self.doOpen: return super().open() - select.select([self.file], [], [], 10) + while self.doOpen: + (read, _, _) = select.select([self.file], [], [], 1) + if self.file in read: + break def read(self): if self.file is None: