173 lines
5.3 KiB
Python
173 lines
5.3 KiB
Python
|
from owrx.config import Config
|
||
|
from owrx.config.core import CoreConfig
|
||
|
from owrx.metrics import Metrics, CounterMetric, DirectMetric
|
||
|
from queue import Queue, Full, Empty
|
||
|
import subprocess
|
||
|
import os
|
||
|
import threading
|
||
|
|
||
|
import logging
|
||
|
|
||
|
logger = logging.getLogger(__name__)
|
||
|
logger.setLevel(logging.INFO)
|
||
|
|
||
|
|
||
|
class QueueJob(object):
|
||
|
def __init__(self, profile, writer, file, freq):
|
||
|
self.profile = profile
|
||
|
self.writer = writer
|
||
|
self.file = file
|
||
|
self.freq = freq
|
||
|
|
||
|
def run(self):
|
||
|
logger.debug("processing file %s", self.file)
|
||
|
tmp_dir = CoreConfig().get_temporary_directory()
|
||
|
decoder = subprocess.Popen(
|
||
|
["nice", "-n", "10"] + self.profile.decoder_commandline(self.file),
|
||
|
stdout=subprocess.PIPE,
|
||
|
cwd=tmp_dir,
|
||
|
close_fds=True,
|
||
|
)
|
||
|
try:
|
||
|
for line in decoder.stdout:
|
||
|
self.writer.send((self.profile, self.freq, line))
|
||
|
except (OSError, AttributeError):
|
||
|
decoder.stdout.flush()
|
||
|
# TODO uncouple parsing from the output so that decodes can still go to the map and the spotters
|
||
|
logger.debug("output has gone away while decoding job.")
|
||
|
try:
|
||
|
rc = decoder.wait(timeout=10)
|
||
|
if rc != 0:
|
||
|
raise RuntimeError("decoder return code: {0}".format(rc))
|
||
|
except subprocess.TimeoutExpired:
|
||
|
logger.warning("subprocess (pid=%i}) did not terminate correctly; sending kill signal.", decoder.pid)
|
||
|
decoder.kill()
|
||
|
raise
|
||
|
|
||
|
def unlink(self):
|
||
|
try:
|
||
|
os.unlink(self.file)
|
||
|
except FileNotFoundError:
|
||
|
pass
|
||
|
|
||
|
|
||
|
PoisonPill = object()
|
||
|
|
||
|
|
||
|
class QueueWorker(threading.Thread):
|
||
|
def __init__(self, queue):
|
||
|
self.queue = queue
|
||
|
self.doRun = True
|
||
|
super().__init__()
|
||
|
|
||
|
def run(self) -> None:
|
||
|
while self.doRun:
|
||
|
job = self.queue.get()
|
||
|
if job is PoisonPill:
|
||
|
self.stop()
|
||
|
else:
|
||
|
try:
|
||
|
job.run()
|
||
|
except Exception:
|
||
|
logger.exception("failed to decode job")
|
||
|
self.queue.onError()
|
||
|
finally:
|
||
|
job.unlink()
|
||
|
|
||
|
self.queue.task_done()
|
||
|
|
||
|
def stop(self):
|
||
|
self.doRun = False
|
||
|
|
||
|
|
||
|
class DecoderQueue(Queue):
|
||
|
sharedInstance = None
|
||
|
creationLock = threading.Lock()
|
||
|
|
||
|
@staticmethod
|
||
|
def getSharedInstance():
|
||
|
with DecoderQueue.creationLock:
|
||
|
if DecoderQueue.sharedInstance is None:
|
||
|
DecoderQueue.sharedInstance = DecoderQueue()
|
||
|
return DecoderQueue.sharedInstance
|
||
|
|
||
|
@staticmethod
|
||
|
def stopAll():
|
||
|
with DecoderQueue.creationLock:
|
||
|
if DecoderQueue.sharedInstance is not None:
|
||
|
DecoderQueue.sharedInstance.stop()
|
||
|
DecoderQueue.sharedInstance = None
|
||
|
|
||
|
def __init__(self):
|
||
|
pm = Config.get()
|
||
|
super().__init__(pm["decoding_queue_length"])
|
||
|
self.workers = []
|
||
|
self._setWorkers(pm["decoding_queue_workers"])
|
||
|
self.subscriptions = [
|
||
|
pm.wireProperty("decoding_queue_length", self._setMaxSize),
|
||
|
pm.wireProperty("decoding_queue_workers", self._setWorkers),
|
||
|
]
|
||
|
metrics = Metrics.getSharedInstance()
|
||
|
metrics.addMetric("decoding.queue.length", DirectMetric(self.qsize))
|
||
|
self.inCounter = CounterMetric()
|
||
|
metrics.addMetric("decoding.queue.in", self.inCounter)
|
||
|
self.outCounter = CounterMetric()
|
||
|
metrics.addMetric("decoding.queue.out", self.outCounter)
|
||
|
self.overflowCounter = CounterMetric()
|
||
|
metrics.addMetric("decoding.queue.overflow", self.overflowCounter)
|
||
|
self.errorCounter = CounterMetric()
|
||
|
metrics.addMetric("decoding.queue.error", self.errorCounter)
|
||
|
|
||
|
def _setMaxSize(self, size):
|
||
|
if self.maxsize == size:
|
||
|
return
|
||
|
self.maxsize = size
|
||
|
|
||
|
def _setWorkers(self, workers):
|
||
|
while len(self.workers) > workers:
|
||
|
logger.debug("stopping one worker")
|
||
|
self.workers.pop().stop()
|
||
|
while len(self.workers) < workers:
|
||
|
logger.debug("starting one worker")
|
||
|
self.workers.append(self.newWorker())
|
||
|
|
||
|
def stop(self):
|
||
|
logger.debug("shutting down the queue")
|
||
|
while self.subscriptions:
|
||
|
self.subscriptions.pop().cancel()
|
||
|
try:
|
||
|
# purge all remaining jobs
|
||
|
while not self.empty():
|
||
|
job = self.get()
|
||
|
job.unlink()
|
||
|
self.task_done()
|
||
|
except Empty:
|
||
|
pass
|
||
|
# put() a PoisonPill for all active workers to shut them down
|
||
|
for w in self.workers:
|
||
|
if w.is_alive():
|
||
|
self.put(PoisonPill)
|
||
|
self.join()
|
||
|
|
||
|
def put(self, item, **kwargs):
|
||
|
self.inCounter.inc()
|
||
|
try:
|
||
|
super(DecoderQueue, self).put(item, block=False)
|
||
|
except Full:
|
||
|
self.overflowCounter.inc()
|
||
|
raise
|
||
|
|
||
|
def get(self, **kwargs):
|
||
|
# super.get() is blocking, so it would mess up the stats to inc() first
|
||
|
out = super(DecoderQueue, self).get(**kwargs)
|
||
|
self.outCounter.inc()
|
||
|
return out
|
||
|
|
||
|
def newWorker(self):
|
||
|
worker = QueueWorker(self)
|
||
|
worker.start()
|
||
|
return worker
|
||
|
|
||
|
def onError(self):
|
||
|
self.errorCounter.inc()
|