From a0eeea8fe3186ef3f7dfac6e34bd88bb7fec3420 Mon Sep 17 00:00:00 2001 From: Jakob Ketterl Date: Sun, 17 Jan 2021 17:49:03 +0100 Subject: [PATCH] improve queue shutdown to avoid stale files --- owrx/__main__.py | 2 ++ owrx/audio.py | 62 ++++++++++++++++++++++++++++++++++-------------- 2 files changed, 46 insertions(+), 18 deletions(-) diff --git a/owrx/__main__.py b/owrx/__main__.py index 517c5ad..908767f 100644 --- a/owrx/__main__.py +++ b/owrx/__main__.py @@ -13,6 +13,7 @@ from owrx.service import Services from owrx.websocket import WebSocketConnection from owrx.reporting import ReportingEngine from owrx.version import openwebrx_version +from owrx.audio import DecoderQueue class ThreadedHttpServer(ThreadingMixIn, HTTPServer): @@ -68,3 +69,4 @@ Support and info: https://groups.io/g/openwebrx WebSocketConnection.closeAll() Services.stop() ReportingEngine.stopAll() + DecoderQueue.stopAll() diff --git a/owrx/audio.py b/owrx/audio.py index d6ba3f5..9a5b07f 100644 --- a/owrx/audio.py +++ b/owrx/audio.py @@ -7,7 +7,7 @@ import subprocess import os from multiprocessing.connection import Pipe, wait from datetime import datetime, timedelta -from queue import Queue, Full +from queue import Queue, Full, Empty import logging @@ -32,22 +32,30 @@ class QueueJob(object): pass +PoisonPill = object() + + class QueueWorker(threading.Thread): def __init__(self, queue): self.queue = queue self.doRun = True - super().__init__(daemon=True) + super().__init__() def run(self) -> None: while self.doRun: job = self.queue.get() - try: - job.run() - except Exception: - logger.exception("failed to decode job") - self.queue.onError() - finally: - job.unlink() + if job is PoisonPill: + self.doRun = False + # put the poison pill back on the queue for the next worker + self.queue.put(PoisonPill) + else: + try: + job.run() + except Exception: + logger.exception("failed to decode job") + self.queue.onError() + finally: + job.unlink() self.queue.task_done() @@ -64,6 +72,13 @@ class DecoderQueue(Queue): DecoderQueue.sharedInstance = DecoderQueue(maxsize=pm["decoding_queue_length"], workers=pm["decoding_queue_workers"]) return DecoderQueue.sharedInstance + @staticmethod + def stopAll(): + with DecoderQueue.creationLock: + if DecoderQueue.sharedInstance is not None: + DecoderQueue.sharedInstance.stop() + DecoderQueue.sharedInstance = None + def __init__(self, maxsize, workers): super().__init__(maxsize) metrics = Metrics.getSharedInstance() @@ -78,6 +93,18 @@ class DecoderQueue(Queue): metrics.addMetric("decoding.queue.error", self.errorCounter) self.workers = [self.newWorker() for _ in range(0, workers)] + def stop(self): + logger.debug("shutting down the queue") + try: + # purge all remaining jobs + while not self.empty(): + job = self.get() + job.unlink() + except Empty: + pass + # put() PoisonPill to tell workers to shut down + self.put(PoisonPill) + def put(self, item, **kwars): self.inCounter.inc() try: @@ -161,11 +188,10 @@ class AudioWriter(object): self.timer.start() def switchFiles(self): - self.switchingLock.acquire() - file = self.wavefile - filename = self.wavefilename - (self.wavefilename, self.wavefile) = self.getWaveFile() - self.switchingLock.release() + with self.switchingLock: + file = self.wavefile + filename = self.wavefilename + (self.wavefilename, self.wavefile) = self.getWaveFile() file.close() job = QueueJob(self, filename, self.dsp.get_operating_freq()) @@ -205,9 +231,8 @@ class AudioWriter(object): self._scheduleNextSwitch() def write(self, data): - self.switchingLock.acquire() - self.wavefile.writeframes(data) - self.switchingLock.release() + with self.switchingLock: + self.wavefile.writeframes(data) def stop(self): self.outputWriter.close() @@ -229,7 +254,8 @@ class AudioWriter(object): except Exception: logger.exception("error closing wave file") try: - os.unlink(self.wavefilename) + with self.switchingLock: + os.unlink(self.wavefilename) except Exception: logger.exception("error removing undecoded file") self.wavefile = None