improve read pipe opening

This commit is contained in:
Jakob Ketterl 2020-08-14 20:20:07 +02:00
parent 49383e757f
commit 7e5ea6e065

View File

@ -31,27 +31,23 @@ class Pipe(object):
os.mkfifo(self.path) os.mkfifo(self.path)
def open(self): def open(self):
retries = 0 """
this method opens the file descriptor with an added O_NONBLOCK flag. This gives us a special behaviour for
FIFOS, when they are not opened by the opposing side:
- opening a pipe for writing will throw an OSError with errno = 6 (ENXIO). This is handled specially in the
WritingPipe class.
- opening a pipe for reading will pass through this method instantly, even if the opposing end has not been
opened yet, but the resulting file descriptor will behave as if O_NONBLOCK is set (even if we remove it
immediately here), resulting in empty reads until data is available. This is handled specially in the
ReadingPipe class.
"""
def opener(path, flags): def opener(path, flags):
fd = os.open(path, flags | os.O_NONBLOCK) fd = os.open(path, flags | os.O_NONBLOCK)
os.set_blocking(fd, True) os.set_blocking(fd, True)
return fd return fd
while self.file is None and self.doOpen and retries < 10:
try:
self.file = open(self.path, self.direction, encoding=self.encoding, opener=opener) self.file = open(self.path, self.direction, encoding=self.encoding, opener=opener)
except OSError as error:
# ENXIO = FIFO has not been opened for reading
if error.errno == 6:
time.sleep(.1)
retries += 1
else:
raise
# if doOpen is false, opening has been canceled, so no warning in that case.
if self.file is None and self.doOpen:
logger.warning("could not open FIFO %s", self.path)
def close(self): def close(self):
self.doOpen = False self.doOpen = False
@ -77,9 +73,30 @@ class WritingPipe(Pipe):
self.open() self.open()
def open_and_dequeue(self): def open_and_dequeue(self):
super().open() """
This method implements a retry loop that can be interrupted in case the Pipe gets shutdown before actually
being connected.
After the pipe is opened successfully, all data that has been queued is sent in the order it was passed into
write().
"""
retries = 0
while self.file is None and self.doOpen and retries < 10:
try:
super().open()
except OSError as error:
# ENXIO = FIFO has not been opened for reading
if error.errno == 6:
time.sleep(.1)
retries += 1
else:
raise
# if doOpen is false, opening has been canceled, so no warning in that case.
if self.file is None: if self.file is None:
if self.doOpen:
logger.warning("could not open FIFO %s", self.path)
return return
with self.queueLock: with self.queueLock:
@ -89,9 +106,17 @@ class WritingPipe(Pipe):
self.queue = None self.queue = None
def open(self): def open(self):
"""
This sends the opening operation off to a background thread. If we were to block the thread here, another pipe
may be waiting in the queue to be opened on the opposing side, resulting in a deadlock
"""
threading.Thread(target=self.open_and_dequeue).start() threading.Thread(target=self.open_and_dequeue).start()
def write(self, data): def write(self, data):
"""
This method queues all data to be written until the file is actually opened. As soon as a file is available,
it becomes a passthrough.
"""
if self.file is None: if self.file is None:
with self.queueLock: with self.queueLock:
self.queue.append(data) self.queue.append(data)
@ -106,10 +131,18 @@ class ReadingPipe(Pipe):
super().__init__(path, "r", encoding=encoding) super().__init__(path, "r", encoding=encoding)
def open(self): def open(self):
"""
This method implements an interruptible loop that waits for the file descriptor to be opened and the first
batch of data coming in using repeated select() calls.
:return:
"""
if not self.doOpen: if not self.doOpen:
return return
super().open() super().open()
select.select([self.file], [], [], 10) while self.doOpen:
(read, _, _) = select.select([self.file], [], [], 1)
if self.file in read:
break
def read(self): def read(self):
if self.file is None: if self.file is None: