openwebrx-clone/owrx/audio.py

271 lines
8.3 KiB
Python
Raw Normal View History

2020-04-22 21:53:19 +00:00
from abc import ABC, ABCMeta, abstractmethod
from owrx.config import Config
from owrx.metrics import Metrics, CounterMetric, DirectMetric
import threading
import wave
import subprocess
import os
2020-04-22 22:21:59 +00:00
from multiprocessing.connection import Pipe, wait
2020-04-22 21:53:19 +00:00
from datetime import datetime, timedelta
from queue import Queue, Full
import logging
logger = logging.getLogger(__name__)
2020-04-22 22:34:49 +00:00
logger.setLevel(logging.INFO)
2020-04-22 21:53:19 +00:00
class QueueJob(object):
def __init__(self, decoder, file, freq):
self.decoder = decoder
self.file = file
self.freq = freq
def run(self):
self.decoder.decode(self)
def unlink(self):
try:
os.unlink(self.file)
except FileNotFoundError:
pass
2020-04-22 21:53:19 +00:00
class QueueWorker(threading.Thread):
def __init__(self, queue):
self.queue = queue
self.doRun = True
super().__init__(daemon=True)
def run(self) -> None:
while self.doRun:
job = self.queue.get()
try:
job.run()
except Exception:
logger.exception("failed to decode job")
self.queue.onError()
finally:
job.unlink()
2020-04-22 21:53:19 +00:00
self.queue.task_done()
class DecoderQueue(Queue):
sharedInstance = None
creationLock = threading.Lock()
@staticmethod
def getSharedInstance():
with DecoderQueue.creationLock:
if DecoderQueue.sharedInstance is None:
pm = Config.get()
DecoderQueue.sharedInstance = DecoderQueue(maxsize=pm["decoding_queue_length"], workers=pm["decoding_queue_workers"])
2020-04-22 21:53:19 +00:00
return DecoderQueue.sharedInstance
def __init__(self, maxsize, workers):
super().__init__(maxsize)
metrics = Metrics.getSharedInstance()
metrics.addMetric("decoding.queue.length", DirectMetric(self.qsize))
2020-04-22 21:53:19 +00:00
self.inCounter = CounterMetric()
metrics.addMetric("decoding.queue.in", self.inCounter)
2020-04-22 21:53:19 +00:00
self.outCounter = CounterMetric()
metrics.addMetric("decoding.queue.out", self.outCounter)
2020-04-22 21:53:19 +00:00
self.overflowCounter = CounterMetric()
metrics.addMetric("decoding.queue.overflow", self.overflowCounter)
2020-04-22 21:53:19 +00:00
self.errorCounter = CounterMetric()
metrics.addMetric("decoding.queue.error", self.errorCounter)
2020-04-22 21:53:19 +00:00
self.workers = [self.newWorker() for _ in range(0, workers)]
def put(self, item, **kwars):
self.inCounter.inc()
try:
super(DecoderQueue, self).put(item, block=False)
except Full:
self.overflowCounter.inc()
raise
def get(self, **kwargs):
# super.get() is blocking, so it would mess up the stats to inc() first
out = super(DecoderQueue, self).get(**kwargs)
self.outCounter.inc()
return out
def newWorker(self):
worker = QueueWorker(self)
worker.start()
return worker
def onError(self):
self.errorCounter.inc()
class AudioChopperProfile(ABC):
@abstractmethod
def getInterval(self):
pass
@abstractmethod
def getFileTimestampFormat(self):
pass
@abstractmethod
def decoder_commandline(self, file):
pass
2020-04-22 22:21:59 +00:00
class AudioWriter(object):
2020-04-22 21:53:19 +00:00
def __init__(self, dsp, source, profile: AudioChopperProfile):
self.dsp = dsp
self.source = source
self.profile = profile
self.tmp_dir = Config.get()["temporary_directory"]
self.wavefile = None
self.wavefilename = None
self.switchingLock = threading.Lock()
self.timer = None
(self.outputReader, self.outputWriter) = Pipe()
def getWaveFile(self):
filename = "{tmp_dir}/openwebrx-audiochopper-{id}-{timestamp}.wav".format(
2020-04-22 22:21:59 +00:00
tmp_dir=self.tmp_dir,
2020-04-22 22:34:49 +00:00
id=id(self),
2020-04-22 22:21:59 +00:00
timestamp=datetime.utcnow().strftime(self.profile.getFileTimestampFormat()),
2020-04-22 21:53:19 +00:00
)
wavefile = wave.open(filename, "wb")
wavefile.setnchannels(1)
wavefile.setsampwidth(2)
wavefile.setframerate(12000)
return filename, wavefile
def getNextDecodingTime(self):
t = datetime.utcnow()
zeroed = t.replace(minute=0, second=0, microsecond=0)
delta = t - zeroed
interval = self.profile.getInterval()
seconds = (int(delta.total_seconds() / interval) + 1) * interval
t = zeroed + timedelta(seconds=seconds)
logger.debug("scheduling: {0}".format(t))
return t
def cancelTimer(self):
if self.timer:
self.timer.cancel()
self.timer = None
def _scheduleNextSwitch(self):
self.cancelTimer()
2020-04-22 22:21:59 +00:00
delta = self.getNextDecodingTime() - datetime.utcnow()
self.timer = threading.Timer(delta.total_seconds(), self.switchFiles)
self.timer.start()
2020-04-22 21:53:19 +00:00
def switchFiles(self):
self.switchingLock.acquire()
file = self.wavefile
filename = self.wavefilename
(self.wavefilename, self.wavefile) = self.getWaveFile()
self.switchingLock.release()
file.close()
job = QueueJob(self, filename, self.dsp.get_operating_freq())
2020-04-22 21:53:19 +00:00
try:
DecoderQueue.getSharedInstance().put(job)
2020-04-22 21:53:19 +00:00
except Full:
logger.warning("decoding queue overflow; dropping one file")
job.unlink()
2020-04-22 21:53:19 +00:00
self._scheduleNextSwitch()
def decode(self, job: QueueJob):
logger.debug("processing file %s", job.file)
decoder = subprocess.Popen(
["nice", "-n", "10"] + self.profile.decoder_commandline(job.file),
stdout=subprocess.PIPE,
cwd=self.tmp_dir,
close_fds=True,
)
try:
for line in decoder.stdout:
self.outputWriter.send((job.freq, line))
except OSError:
decoder.stdout.flush()
# TODO uncouple parsing from the output so that decodes can still go to the map and the spotters
logger.debug("output has gone away while decoding job.")
2020-04-22 21:53:19 +00:00
try:
rc = decoder.wait(timeout=10)
if rc != 0:
logger.warning("decoder return code: %i", rc)
except subprocess.TimeoutExpired:
logger.warning("subprocess (pid=%i}) did not terminate correctly; sending kill signal.", decoder.pid)
decoder.kill()
2020-04-22 22:21:59 +00:00
def start(self):
2020-04-22 21:53:19 +00:00
(self.wavefilename, self.wavefile) = self.getWaveFile()
self._scheduleNextSwitch()
2020-04-22 22:21:59 +00:00
def write(self, data):
self.switchingLock.acquire()
self.wavefile.writeframes(data)
self.switchingLock.release()
def stop(self):
2020-04-22 21:53:19 +00:00
self.outputWriter.close()
self.outputWriter = None
# drain messages left in the queue so that the queue can be successfully closed
# this is necessary since python keeps the file descriptors open otherwise
try:
while True:
self.outputReader.recv()
except EOFError:
pass
self.outputReader.close()
self.outputReader = None
2020-04-22 21:53:19 +00:00
self.cancelTimer()
try:
self.wavefile.close()
except Exception:
logger.exception("error closing wave file")
2020-04-22 21:53:19 +00:00
try:
os.unlink(self.wavefilename)
except Exception:
logger.exception("error removing undecoded file")
self.wavefile = None
self.wavefilename = None
2020-04-22 21:53:19 +00:00
2020-04-22 22:21:59 +00:00
class AudioChopper(threading.Thread, metaclass=ABCMeta):
def __init__(self, dsp, source, *profiles: AudioChopperProfile):
self.source = source
self.writers = [AudioWriter(dsp, source, p) for p in profiles]
self.doRun = True
super().__init__()
def run(self) -> None:
logger.debug("Audio chopper starting up")
for w in self.writers:
w.start()
while self.doRun:
2020-08-06 18:06:04 +00:00
data = None
try:
data = self.source.read(256)
except ValueError:
pass
2020-04-22 22:21:59 +00:00
if data is None or (isinstance(data, bytes) and len(data) == 0):
self.doRun = False
else:
for w in self.writers:
w.write(data)
logger.debug("Audio chopper shutting down")
for w in self.writers:
w.stop()
2020-04-22 21:53:19 +00:00
def read(self):
try:
2020-04-22 22:21:59 +00:00
readers = wait([w.outputReader for w in self.writers])
return [r.recv() for r in readers]
2020-07-19 17:00:26 +00:00
except (EOFError, OSError):
2020-04-22 21:53:19 +00:00
return None