make decoding queue settings work from the web config

This commit is contained in:
Jakob Ketterl 2021-04-07 18:57:42 +02:00
parent 5487861da1
commit 22ec80c8ea

View File

@ -46,7 +46,7 @@ class QueueWorker(threading.Thread):
while self.doRun: while self.doRun:
job = self.queue.get() job = self.queue.get()
if job is PoisonPill: if job is PoisonPill:
self.doRun = False self.stop()
else: else:
try: try:
job.run() job.run()
@ -58,6 +58,9 @@ class QueueWorker(threading.Thread):
self.queue.task_done() self.queue.task_done()
def stop(self):
self.doRun = False
class DecoderQueue(Queue): class DecoderQueue(Queue):
sharedInstance = None sharedInstance = None
@ -67,10 +70,7 @@ class DecoderQueue(Queue):
def getSharedInstance(): def getSharedInstance():
with DecoderQueue.creationLock: with DecoderQueue.creationLock:
if DecoderQueue.sharedInstance is None: if DecoderQueue.sharedInstance is None:
pm = Config.get() DecoderQueue.sharedInstance = DecoderQueue()
DecoderQueue.sharedInstance = DecoderQueue(
maxsize=pm["decoding_queue_length"], workers=pm["decoding_queue_workers"]
)
return DecoderQueue.sharedInstance return DecoderQueue.sharedInstance
@staticmethod @staticmethod
@ -80,8 +80,15 @@ class DecoderQueue(Queue):
DecoderQueue.sharedInstance.stop() DecoderQueue.sharedInstance.stop()
DecoderQueue.sharedInstance = None DecoderQueue.sharedInstance = None
def __init__(self, maxsize, workers): def __init__(self):
super().__init__(maxsize) pm = Config.get()
super().__init__(pm["decoding_queue_length"])
self.workers = []
self._setWorkers(pm["decoding_queue_workers"])
self.subscriptions = [
pm.wireProperty("decoding_queue_length", self._setMaxSize),
pm.wireProperty("decoding_queue_workers", self._setWorkers),
]
metrics = Metrics.getSharedInstance() metrics = Metrics.getSharedInstance()
metrics.addMetric("decoding.queue.length", DirectMetric(self.qsize)) metrics.addMetric("decoding.queue.length", DirectMetric(self.qsize))
self.inCounter = CounterMetric() self.inCounter = CounterMetric()
@ -92,10 +99,24 @@ class DecoderQueue(Queue):
metrics.addMetric("decoding.queue.overflow", self.overflowCounter) metrics.addMetric("decoding.queue.overflow", self.overflowCounter)
self.errorCounter = CounterMetric() self.errorCounter = CounterMetric()
metrics.addMetric("decoding.queue.error", self.errorCounter) metrics.addMetric("decoding.queue.error", self.errorCounter)
self.workers = [self.newWorker() for _ in range(0, workers)]
def _setMaxSize(self, size):
if self.maxsize == size:
return
self.maxsize = size
def _setWorkers(self, workers):
while len(self.workers) > workers:
logger.debug("stopping one worker")
self.workers.pop().stop()
while len(self.workers) < workers:
logger.debug("starting one worker")
self.workers.append(self.newWorker())
def stop(self): def stop(self):
logger.debug("shutting down the queue") logger.debug("shutting down the queue")
while self.subscriptions:
self.subscriptions.pop().cancel()
try: try:
# purge all remaining jobs # purge all remaining jobs
while not self.empty(): while not self.empty():