improve queue shutdown to avoid stale files

This commit is contained in:
Jakob Ketterl 2021-01-17 17:49:03 +01:00
parent 0f81964598
commit a0eeea8fe3
2 changed files with 46 additions and 18 deletions

View File

@ -13,6 +13,7 @@ from owrx.service import Services
from owrx.websocket import WebSocketConnection from owrx.websocket import WebSocketConnection
from owrx.reporting import ReportingEngine from owrx.reporting import ReportingEngine
from owrx.version import openwebrx_version from owrx.version import openwebrx_version
from owrx.audio import DecoderQueue
class ThreadedHttpServer(ThreadingMixIn, HTTPServer): class ThreadedHttpServer(ThreadingMixIn, HTTPServer):
@ -68,3 +69,4 @@ Support and info: https://groups.io/g/openwebrx
WebSocketConnection.closeAll() WebSocketConnection.closeAll()
Services.stop() Services.stop()
ReportingEngine.stopAll() ReportingEngine.stopAll()
DecoderQueue.stopAll()

View File

@ -7,7 +7,7 @@ import subprocess
import os import os
from multiprocessing.connection import Pipe, wait from multiprocessing.connection import Pipe, wait
from datetime import datetime, timedelta from datetime import datetime, timedelta
from queue import Queue, Full from queue import Queue, Full, Empty
import logging import logging
@ -32,15 +32,23 @@ class QueueJob(object):
pass pass
PoisonPill = object()
class QueueWorker(threading.Thread): class QueueWorker(threading.Thread):
def __init__(self, queue): def __init__(self, queue):
self.queue = queue self.queue = queue
self.doRun = True self.doRun = True
super().__init__(daemon=True) super().__init__()
def run(self) -> None: def run(self) -> None:
while self.doRun: while self.doRun:
job = self.queue.get() job = self.queue.get()
if job is PoisonPill:
self.doRun = False
# put the poison pill back on the queue for the next worker
self.queue.put(PoisonPill)
else:
try: try:
job.run() job.run()
except Exception: except Exception:
@ -64,6 +72,13 @@ class DecoderQueue(Queue):
DecoderQueue.sharedInstance = DecoderQueue(maxsize=pm["decoding_queue_length"], workers=pm["decoding_queue_workers"]) DecoderQueue.sharedInstance = DecoderQueue(maxsize=pm["decoding_queue_length"], workers=pm["decoding_queue_workers"])
return DecoderQueue.sharedInstance return DecoderQueue.sharedInstance
@staticmethod
def stopAll():
with DecoderQueue.creationLock:
if DecoderQueue.sharedInstance is not None:
DecoderQueue.sharedInstance.stop()
DecoderQueue.sharedInstance = None
def __init__(self, maxsize, workers): def __init__(self, maxsize, workers):
super().__init__(maxsize) super().__init__(maxsize)
metrics = Metrics.getSharedInstance() metrics = Metrics.getSharedInstance()
@ -78,6 +93,18 @@ class DecoderQueue(Queue):
metrics.addMetric("decoding.queue.error", self.errorCounter) metrics.addMetric("decoding.queue.error", self.errorCounter)
self.workers = [self.newWorker() for _ in range(0, workers)] self.workers = [self.newWorker() for _ in range(0, workers)]
def stop(self):
logger.debug("shutting down the queue")
try:
# purge all remaining jobs
while not self.empty():
job = self.get()
job.unlink()
except Empty:
pass
# put() PoisonPill to tell workers to shut down
self.put(PoisonPill)
def put(self, item, **kwars): def put(self, item, **kwars):
self.inCounter.inc() self.inCounter.inc()
try: try:
@ -161,11 +188,10 @@ class AudioWriter(object):
self.timer.start() self.timer.start()
def switchFiles(self): def switchFiles(self):
self.switchingLock.acquire() with self.switchingLock:
file = self.wavefile file = self.wavefile
filename = self.wavefilename filename = self.wavefilename
(self.wavefilename, self.wavefile) = self.getWaveFile() (self.wavefilename, self.wavefile) = self.getWaveFile()
self.switchingLock.release()
file.close() file.close()
job = QueueJob(self, filename, self.dsp.get_operating_freq()) job = QueueJob(self, filename, self.dsp.get_operating_freq())
@ -205,9 +231,8 @@ class AudioWriter(object):
self._scheduleNextSwitch() self._scheduleNextSwitch()
def write(self, data): def write(self, data):
self.switchingLock.acquire() with self.switchingLock:
self.wavefile.writeframes(data) self.wavefile.writeframes(data)
self.switchingLock.release()
def stop(self): def stop(self):
self.outputWriter.close() self.outputWriter.close()
@ -229,6 +254,7 @@ class AudioWriter(object):
except Exception: except Exception:
logger.exception("error closing wave file") logger.exception("error closing wave file")
try: try:
with self.switchingLock:
os.unlink(self.wavefilename) os.unlink(self.wavefilename)
except Exception: except Exception:
logger.exception("error removing undecoded file") logger.exception("error removing undecoded file")