diff --git a/owrx/wsjt.py b/owrx/wsjt.py index 4542cf8..7bd3c5c 100644 --- a/owrx/wsjt.py +++ b/owrx/wsjt.py @@ -8,6 +8,7 @@ import os from multiprocessing.connection import Pipe from owrx.map import Map, LocatorLocation import re +from queue import Queue from owrx.config import PropertyManager from owrx.bands import Bandplan from owrx.metrics import Metrics @@ -17,6 +18,42 @@ import logging logger = logging.getLogger(__name__) +class WsjtQueueWorker(threading.Thread): + def __init__(self, queue): + self.queue = queue + self.doRun = True + super().__init__(daemon=True) + + def run(self) -> None: + while self.doRun: + (processor, file) = self.queue.get() + logger.debug("processing file %s", file) + processor.decode(file) + self.queue.task_done() + + +class WsjtQueue(Queue): + sharedInstance = None + + @staticmethod + def getSharedInstance(): + if WsjtQueue.sharedInstance is None: + WsjtQueue.sharedInstance = WsjtQueue(maxsize=10, workers=2) + return WsjtQueue.sharedInstance + + def __init__(self, maxsize, workers): + super().__init__(maxsize) + self.workers = [self.newWorker() for _ in range(0, workers)] + + def put(self, item): + super(WsjtQueue, self).put(item, block=False) + + def newWorker(self): + worker = WsjtQueueWorker(self) + worker.start() + return worker + + class WsjtChopper(threading.Thread): def __init__(self, source): self.source = source @@ -24,7 +61,6 @@ class WsjtChopper(threading.Thread): (self.wavefilename, self.wavefile) = self.getWaveFile() self.switchingLock = threading.Lock() self.scheduler = sched.scheduler(time.time, time.sleep) - self.fileQueue = [] (self.outputReader, self.outputWriter) = Pipe() self.doRun = True super().__init__() @@ -67,7 +103,7 @@ class WsjtChopper(threading.Thread): self.switchingLock.release() file.close() - self.fileQueue.append(filename) + WsjtQueue.getSharedInstance().put((self, filename)) self._scheduleNextSwitch() def decoder_commandline(self, file): @@ -76,23 +112,17 @@ class WsjtChopper(threading.Thread): """ return [] - def decode(self): - def decode_and_unlink(file): - decoder = subprocess.Popen(self.decoder_commandline(file), stdout=subprocess.PIPE, cwd=self.tmp_dir, preexec_fn=lambda : os.nice(10)) - while True: - line = decoder.stdout.readline() - if line is None or (isinstance(line, bytes) and len(line) == 0): - break - self.outputWriter.send(line) - rc = decoder.wait() - if rc != 0: - logger.warning("decoder return code: %i", rc) - os.unlink(file) - - if self.fileQueue: - file = self.fileQueue.pop() - logger.debug("processing file {0}".format(file)) - threading.Thread(target=decode_and_unlink, args=[file]).start() + def decode(self, file): + decoder = subprocess.Popen(self.decoder_commandline(file), stdout=subprocess.PIPE, cwd=self.tmp_dir, preexec_fn=lambda : os.nice(10)) + while True: + line = decoder.stdout.readline() + if line is None or (isinstance(line, bytes) and len(line) == 0): + break + self.outputWriter.send(line) + rc = decoder.wait() + if rc != 0: + logger.warning("decoder return code: %i", rc) + os.unlink(file) def run(self) -> None: logger.debug("WSJT chopper starting up") @@ -106,7 +136,6 @@ class WsjtChopper(threading.Thread): self.wavefile.writeframes(data) self.switchingLock.release() - self.decode() logger.debug("WSJT chopper shutting down") self.outputReader.close() self.outputWriter.close()