From 22ec80c8ea6e42955e755a8291e1eb20a4efdc81 Mon Sep 17 00:00:00 2001 From: Jakob Ketterl Date: Wed, 7 Apr 2021 18:57:42 +0200 Subject: [PATCH] make decoding queue settings work from the web config --- owrx/audio.py | 37 +++++++++++++++++++++++++++++-------- 1 file changed, 29 insertions(+), 8 deletions(-) diff --git a/owrx/audio.py b/owrx/audio.py index ef77b76..93956ea 100644 --- a/owrx/audio.py +++ b/owrx/audio.py @@ -46,7 +46,7 @@ class QueueWorker(threading.Thread): while self.doRun: job = self.queue.get() if job is PoisonPill: - self.doRun = False + self.stop() else: try: job.run() @@ -58,6 +58,9 @@ class QueueWorker(threading.Thread): self.queue.task_done() + def stop(self): + self.doRun = False + class DecoderQueue(Queue): sharedInstance = None @@ -67,10 +70,7 @@ class DecoderQueue(Queue): def getSharedInstance(): with DecoderQueue.creationLock: if DecoderQueue.sharedInstance is None: - pm = Config.get() - DecoderQueue.sharedInstance = DecoderQueue( - maxsize=pm["decoding_queue_length"], workers=pm["decoding_queue_workers"] - ) + DecoderQueue.sharedInstance = DecoderQueue() return DecoderQueue.sharedInstance @staticmethod @@ -80,8 +80,15 @@ class DecoderQueue(Queue): DecoderQueue.sharedInstance.stop() DecoderQueue.sharedInstance = None - def __init__(self, maxsize, workers): - super().__init__(maxsize) + def __init__(self): + pm = Config.get() + super().__init__(pm["decoding_queue_length"]) + self.workers = [] + self._setWorkers(pm["decoding_queue_workers"]) + self.subscriptions = [ + pm.wireProperty("decoding_queue_length", self._setMaxSize), + pm.wireProperty("decoding_queue_workers", self._setWorkers), + ] metrics = Metrics.getSharedInstance() metrics.addMetric("decoding.queue.length", DirectMetric(self.qsize)) self.inCounter = CounterMetric() @@ -92,10 +99,24 @@ class DecoderQueue(Queue): metrics.addMetric("decoding.queue.overflow", self.overflowCounter) self.errorCounter = CounterMetric() metrics.addMetric("decoding.queue.error", self.errorCounter) - self.workers = [self.newWorker() for _ in range(0, workers)] + + def _setMaxSize(self, size): + if self.maxsize == size: + return + self.maxsize = size + + def _setWorkers(self, workers): + while len(self.workers) > workers: + logger.debug("stopping one worker") + self.workers.pop().stop() + while len(self.workers) < workers: + logger.debug("starting one worker") + self.workers.append(self.newWorker()) def stop(self): logger.debug("shutting down the queue") + while self.subscriptions: + self.subscriptions.pop().cancel() try: # purge all remaining jobs while not self.empty():