diff --git a/csdr/csdr.py b/csdr/csdr.py index 95f6a0d..4ef95f6 100644 --- a/csdr/csdr.py +++ b/csdr/csdr.py @@ -110,15 +110,36 @@ class Pipe(object): class WritingPipe(Pipe): def __init__(self, path, encoding=None): + self.queue = [] + self.queueLock = threading.Lock() super().__init__(path, "w", encoding=encoding) + self.open() + + def open_and_dequeue(self): + super().open() + with self.queueLock: + for i in self.queue: + self.file.write(i) + self.file.flush() + self.queue = None + + def open(self): + threading.Thread(target=self.open_and_dequeue).start() def write(self, data): if self.file is None: - self.open() - r =self.file.write(data) + with self.queueLock: + self.queue.append(data) + return + r = self.file.write(data) self.file.flush() return r + def close(self): + if self.file is None: + logger.warning("queue %s never successfully opened - thread leak!", self.path) + super().close() + class ReadingPipe(Pipe): def __init__(self, path, encoding=None):