use threading to uncouble the queues

This commit is contained in:
Jakob Ketterl 2020-02-28 16:13:53 +01:00
parent b8c71109b8
commit 10523dbbd7

View File

@ -110,15 +110,36 @@ class Pipe(object):
class WritingPipe(Pipe): class WritingPipe(Pipe):
def __init__(self, path, encoding=None): def __init__(self, path, encoding=None):
self.queue = []
self.queueLock = threading.Lock()
super().__init__(path, "w", encoding=encoding) super().__init__(path, "w", encoding=encoding)
self.open()
def open_and_dequeue(self):
super().open()
with self.queueLock:
for i in self.queue:
self.file.write(i)
self.file.flush()
self.queue = None
def open(self):
threading.Thread(target=self.open_and_dequeue).start()
def write(self, data): def write(self, data):
if self.file is None: if self.file is None:
self.open() with self.queueLock:
r =self.file.write(data) self.queue.append(data)
return
r = self.file.write(data)
self.file.flush() self.file.flush()
return r return r
def close(self):
if self.file is None:
logger.warning("queue %s never successfully opened - thread leak!", self.path)
super().close()
class ReadingPipe(Pipe): class ReadingPipe(Pipe):
def __init__(self, path, encoding=None): def __init__(self, path, encoding=None):