merge AudioHandler and AudioChopper; split audio module

This commit is contained in:
Jakob Ketterl 2021-04-11 14:40:28 +02:00
parent 1f91908e06
commit 9351e4793c
7 changed files with 374 additions and 367 deletions

View File

@ -31,7 +31,7 @@ from functools import partial
from csdr.output import Output from csdr.output import Output
from owrx.kiss import KissClient, DirewolfConfig, DirewolfConfigSubscriber from owrx.kiss import KissClient, DirewolfConfig, DirewolfConfigSubscriber
from owrx.audio.handler import AudioHandler from owrx.audio.chopper import AudioChopper
from csdr.pipe import Pipe from csdr.pipe import Pipe
@ -374,10 +374,10 @@ class Dsp(DirewolfConfigSubscriber):
self.secondary_processes_running = True self.secondary_processes_running = True
if self.isWsjtMode() or self.isJs8(): if self.isWsjtMode() or self.isJs8():
handler = AudioHandler(self, self.get_secondary_demodulator()) chopper = AudioChopper(self, self.get_secondary_demodulator())
handler.send_output("audio", self.secondary_process_demod.stdout.read) chopper.send_output("audio", self.secondary_process_demod.stdout.read)
output_type = "js8_demod" if self.isJs8() else "wsjt_demod" output_type = "js8_demod" if self.isJs8() else "wsjt_demod"
self.output.send_output(output_type, handler.read) self.output.send_output(output_type, chopper.read)
elif self.isPacket(): elif self.isPacket():
# we best get the ax25 packets from the kiss socket # we best get the ax25 packets from the kiss socket
kiss = KissClient(self.direwolf_config.getPort()) kiss = KissClient(self.direwolf_config.getPort())

View File

@ -9,7 +9,7 @@ from owrx.service import Services
from owrx.websocket import WebSocketConnection from owrx.websocket import WebSocketConnection
from owrx.reporting import ReportingEngine from owrx.reporting import ReportingEngine
from owrx.version import openwebrx_version from owrx.version import openwebrx_version
from owrx.audio import DecoderQueue from owrx.audio.queue import DecoderQueue
import signal import signal
import logging import logging

View File

@ -1,181 +1,4 @@
from abc import ABC, ABCMeta, abstractmethod from abc import ABC, abstractmethod
from owrx.config.core import CoreConfig
from owrx.config import Config
from owrx.metrics import Metrics, CounterMetric, DirectMetric
import threading
import wave
import subprocess
import os
from multiprocessing.connection import Pipe, wait
from datetime import datetime, timedelta
from queue import Queue, Full, Empty
from itertools import groupby
from typing import List
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class QueueJob(object):
def __init__(self, profile, writer, file, freq):
self.profile = profile
self.writer = writer
self.file = file
self.freq = freq
def run(self):
logger.debug("processing file %s", self.file)
tmp_dir = CoreConfig().get_temporary_directory()
decoder = subprocess.Popen(
["nice", "-n", "10"] + self.profile.decoder_commandline(self.file),
stdout=subprocess.PIPE,
cwd=tmp_dir,
close_fds=True,
)
try:
for line in decoder.stdout:
self.writer.send((self.profile, self.freq, line))
except (OSError, AttributeError):
decoder.stdout.flush()
# TODO uncouple parsing from the output so that decodes can still go to the map and the spotters
logger.debug("output has gone away while decoding job.")
try:
rc = decoder.wait(timeout=10)
if rc != 0:
raise RuntimeError("decoder return code: {0}".format(rc))
except subprocess.TimeoutExpired:
logger.warning("subprocess (pid=%i}) did not terminate correctly; sending kill signal.", decoder.pid)
decoder.kill()
raise
def unlink(self):
try:
os.unlink(self.file)
except FileNotFoundError:
pass
PoisonPill = object()
class QueueWorker(threading.Thread):
def __init__(self, queue):
self.queue = queue
self.doRun = True
super().__init__()
def run(self) -> None:
while self.doRun:
job = self.queue.get()
if job is PoisonPill:
self.stop()
else:
try:
job.run()
except Exception:
logger.exception("failed to decode job")
self.queue.onError()
finally:
job.unlink()
self.queue.task_done()
def stop(self):
self.doRun = False
class DecoderQueue(Queue):
sharedInstance = None
creationLock = threading.Lock()
@staticmethod
def getSharedInstance():
with DecoderQueue.creationLock:
if DecoderQueue.sharedInstance is None:
DecoderQueue.sharedInstance = DecoderQueue()
return DecoderQueue.sharedInstance
@staticmethod
def stopAll():
with DecoderQueue.creationLock:
if DecoderQueue.sharedInstance is not None:
DecoderQueue.sharedInstance.stop()
DecoderQueue.sharedInstance = None
def __init__(self):
pm = Config.get()
super().__init__(pm["decoding_queue_length"])
self.workers = []
self._setWorkers(pm["decoding_queue_workers"])
self.subscriptions = [
pm.wireProperty("decoding_queue_length", self._setMaxSize),
pm.wireProperty("decoding_queue_workers", self._setWorkers),
]
metrics = Metrics.getSharedInstance()
metrics.addMetric("decoding.queue.length", DirectMetric(self.qsize))
self.inCounter = CounterMetric()
metrics.addMetric("decoding.queue.in", self.inCounter)
self.outCounter = CounterMetric()
metrics.addMetric("decoding.queue.out", self.outCounter)
self.overflowCounter = CounterMetric()
metrics.addMetric("decoding.queue.overflow", self.overflowCounter)
self.errorCounter = CounterMetric()
metrics.addMetric("decoding.queue.error", self.errorCounter)
def _setMaxSize(self, size):
if self.maxsize == size:
return
self.maxsize = size
def _setWorkers(self, workers):
while len(self.workers) > workers:
logger.debug("stopping one worker")
self.workers.pop().stop()
while len(self.workers) < workers:
logger.debug("starting one worker")
self.workers.append(self.newWorker())
def stop(self):
logger.debug("shutting down the queue")
while self.subscriptions:
self.subscriptions.pop().cancel()
try:
# purge all remaining jobs
while not self.empty():
job = self.get()
job.unlink()
self.task_done()
except Empty:
pass
# put() a PoisonPill for all active workers to shut them down
for w in self.workers:
if w.is_alive():
self.put(PoisonPill)
self.join()
def put(self, item, **kwars):
self.inCounter.inc()
try:
super(DecoderQueue, self).put(item, block=False)
except Full:
self.overflowCounter.inc()
raise
def get(self, **kwargs):
# super.get() is blocking, so it would mess up the stats to inc() first
out = super(DecoderQueue, self).get(**kwargs)
self.outCounter.inc()
return out
def newWorker(self):
worker = QueueWorker(self)
worker.start()
return worker
def onError(self):
self.errorCounter.inc()
class AudioChopperProfile(ABC): class AudioChopperProfile(ABC):
@ -190,165 +13,3 @@ class AudioChopperProfile(ABC):
@abstractmethod @abstractmethod
def decoder_commandline(self, file): def decoder_commandline(self, file):
pass pass
class WaveFile(object):
def __init__(self, filename):
self.filename = filename
self.waveFile = wave.open(filename, "wb")
self.waveFile.setnchannels(1)
self.waveFile.setsampwidth(2)
self.waveFile.setframerate(12000)
def close(self):
self.waveFile.close()
def getFileName(self):
return self.filename
def writeframes(self, data):
return self.waveFile.writeframes(data)
def unlink(self):
os.unlink(self.filename)
self.waveFile = None
class AudioWriter(object):
def __init__(self, active_dsp, interval, profiles: List[AudioChopperProfile]):
self.dsp = active_dsp
self.interval = interval
self.profiles = profiles
self.wavefile = None
self.switchingLock = threading.Lock()
self.timer = None
(self.outputReader, self.outputWriter) = Pipe()
def getWaveFile(self):
tmp_dir = CoreConfig().get_temporary_directory()
filename = "{tmp_dir}/openwebrx-audiochopper-master-{id}-{timestamp}.wav".format(
tmp_dir=tmp_dir,
id=id(self),
timestamp=datetime.utcnow().strftime("%y%m%d_%H%M%S"),
)
return WaveFile(filename)
def getNextDecodingTime(self):
t = datetime.utcnow()
zeroed = t.replace(minute=0, second=0, microsecond=0)
delta = t - zeroed
seconds = (int(delta.total_seconds() / self.interval) + 1) * self.interval
t = zeroed + timedelta(seconds=seconds)
logger.debug("scheduling: {0}".format(t))
return t
def cancelTimer(self):
if self.timer:
self.timer.cancel()
self.timer = None
def _scheduleNextSwitch(self):
self.cancelTimer()
delta = self.getNextDecodingTime() - datetime.utcnow()
self.timer = threading.Timer(delta.total_seconds(), self.switchFiles)
self.timer.start()
def switchFiles(self):
with self.switchingLock:
file = self.wavefile
self.wavefile = self.getWaveFile()
file.close()
for profile in self.profiles:
tmp_dir = CoreConfig().get_temporary_directory()
# create hardlinks for the individual profiles
filename = "{tmp_dir}/openwebrx-audiochopper-{pid}-{timestamp}.wav".format(
tmp_dir=tmp_dir,
pid=id(profile),
timestamp=datetime.utcnow().strftime(profile.getFileTimestampFormat()),
)
os.link(file.getFileName(), filename)
job = QueueJob(profile, self.outputWriter, filename, self.dsp.get_operating_freq())
try:
DecoderQueue.getSharedInstance().put(job)
except Full:
logger.warning("decoding queue overflow; dropping one file")
job.unlink()
# our master can be deleted now, the profiles will delete their hardlinked copies after processing
file.unlink()
self._scheduleNextSwitch()
def start(self):
self.wavefile = self.getWaveFile()
self._scheduleNextSwitch()
def write(self, data):
with self.switchingLock:
self.wavefile.writeframes(data)
def stop(self):
self.outputWriter.close()
self.outputWriter = None
# drain messages left in the queue so that the queue can be successfully closed
# this is necessary since python keeps the file descriptors open otherwise
try:
while True:
self.outputReader.recv()
except EOFError:
pass
self.outputReader.close()
self.outputReader = None
self.cancelTimer()
try:
self.wavefile.close()
except Exception:
logger.exception("error closing wave file")
try:
with self.switchingLock:
self.wavefile.unlink()
except Exception:
logger.exception("error removing undecoded file")
self.wavefile = None
class AudioChopper(threading.Thread, metaclass=ABCMeta):
def __init__(self, active_dsp, readfn: callable, *profiles: AudioChopperProfile):
sorted_profiles = sorted(profiles, key=lambda p: p.getInterval())
groups = {interval: list(group) for interval, group in groupby(sorted_profiles, key=lambda p: p.getInterval())}
self.readfn = readfn
self.writers = [AudioWriter(active_dsp, interval, profiles) for interval, profiles in groups.items()]
self.doRun = True
super().__init__()
def run(self) -> None:
logger.debug("Audio chopper starting up")
for w in self.writers:
w.start()
while self.doRun:
data = None
try:
data = self.readfn(256)
except ValueError:
pass
if data is None or (isinstance(data, bytes) and len(data) == 0):
self.doRun = False
else:
for w in self.writers:
w.write(data)
logger.debug("Audio chopper shutting down")
for w in self.writers:
w.stop()
def read(self):
try:
readers = wait([w.outputReader for w in self.writers])
return [r.recv() for r in readers]
except (EOFError, OSError):
return None

56
owrx/audio/chopper.py Normal file
View File

@ -0,0 +1,56 @@
from owrx.modes import Modes, AudioChopperMode
from csdr.output import Output
from itertools import groupby
from abc import ABCMeta
import threading
from owrx.audio.wav import AudioWriter
from multiprocessing.connection import wait
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class AudioChopper(threading.Thread, Output, metaclass=ABCMeta):
def __init__(self, active_dsp, mode_str: str):
mode = Modes.findByModulation(mode_str)
if mode is None or not isinstance(mode, AudioChopperMode):
raise ValueError("Mode {} is not an audio chopper mode".format(mode_str))
sorted_profiles = sorted(mode.getProfiles(), key=lambda p: p.getInterval())
groups = {interval: list(group) for interval, group in groupby(sorted_profiles, key=lambda p: p.getInterval())}
self.read_fn = None
self.writers = [AudioWriter(active_dsp, interval, profiles) for interval, profiles in groups.items()]
self.doRun = True
super().__init__()
def receive_output(self, t, read_fn):
self.read_fn = read_fn
self.start()
def run(self) -> None:
logger.debug("Audio chopper starting up")
for w in self.writers:
w.start()
while self.doRun:
data = None
try:
data = self.read_fn(256)
except ValueError:
pass
if data is None or (isinstance(data, bytes) and len(data) == 0):
self.doRun = False
else:
for w in self.writers:
w.write(data)
logger.debug("Audio chopper shutting down")
for w in self.writers:
w.stop()
def read(self):
try:
readers = wait([w.outputReader for w in self.writers])
return [r.recv() for r in readers]
except (EOFError, OSError):
return None

View File

@ -1,22 +0,0 @@
from owrx.modes import Modes, AudioChopperMode
from csdr.output import Output
from owrx.audio import AudioChopper
class AudioHandler(Output):
def __init__(self, active_dsp, mode: str):
self.dsp = active_dsp
self.mode = Modes.findByModulation(mode)
if mode is None or not isinstance(self.mode, AudioChopperMode):
raise ValueError("Mode {} is not an audio chopper mode".format(mode))
self.chopper = None
def supports_type(self, t):
return t == "audio"
def receive_output(self, t, read_fn):
self.chopper = AudioChopper(self.dsp, read_fn, *self.mode.getProfiles())
self.chopper.start()
def read(self, *args, **kwargs):
return self.chopper.read(*args, **kwargs)

172
owrx/audio/queue.py Normal file
View File

@ -0,0 +1,172 @@
from owrx.config import Config
from owrx.config.core import CoreConfig
from owrx.metrics import Metrics, CounterMetric, DirectMetric
from queue import Queue, Full, Empty
import subprocess
import os
import threading
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class QueueJob(object):
def __init__(self, profile, writer, file, freq):
self.profile = profile
self.writer = writer
self.file = file
self.freq = freq
def run(self):
logger.debug("processing file %s", self.file)
tmp_dir = CoreConfig().get_temporary_directory()
decoder = subprocess.Popen(
["nice", "-n", "10"] + self.profile.decoder_commandline(self.file),
stdout=subprocess.PIPE,
cwd=tmp_dir,
close_fds=True,
)
try:
for line in decoder.stdout:
self.writer.send((self.profile, self.freq, line))
except (OSError, AttributeError):
decoder.stdout.flush()
# TODO uncouple parsing from the output so that decodes can still go to the map and the spotters
logger.debug("output has gone away while decoding job.")
try:
rc = decoder.wait(timeout=10)
if rc != 0:
raise RuntimeError("decoder return code: {0}".format(rc))
except subprocess.TimeoutExpired:
logger.warning("subprocess (pid=%i}) did not terminate correctly; sending kill signal.", decoder.pid)
decoder.kill()
raise
def unlink(self):
try:
os.unlink(self.file)
except FileNotFoundError:
pass
PoisonPill = object()
class QueueWorker(threading.Thread):
def __init__(self, queue):
self.queue = queue
self.doRun = True
super().__init__()
def run(self) -> None:
while self.doRun:
job = self.queue.get()
if job is PoisonPill:
self.stop()
else:
try:
job.run()
except Exception:
logger.exception("failed to decode job")
self.queue.onError()
finally:
job.unlink()
self.queue.task_done()
def stop(self):
self.doRun = False
class DecoderQueue(Queue):
sharedInstance = None
creationLock = threading.Lock()
@staticmethod
def getSharedInstance():
with DecoderQueue.creationLock:
if DecoderQueue.sharedInstance is None:
DecoderQueue.sharedInstance = DecoderQueue()
return DecoderQueue.sharedInstance
@staticmethod
def stopAll():
with DecoderQueue.creationLock:
if DecoderQueue.sharedInstance is not None:
DecoderQueue.sharedInstance.stop()
DecoderQueue.sharedInstance = None
def __init__(self):
pm = Config.get()
super().__init__(pm["decoding_queue_length"])
self.workers = []
self._setWorkers(pm["decoding_queue_workers"])
self.subscriptions = [
pm.wireProperty("decoding_queue_length", self._setMaxSize),
pm.wireProperty("decoding_queue_workers", self._setWorkers),
]
metrics = Metrics.getSharedInstance()
metrics.addMetric("decoding.queue.length", DirectMetric(self.qsize))
self.inCounter = CounterMetric()
metrics.addMetric("decoding.queue.in", self.inCounter)
self.outCounter = CounterMetric()
metrics.addMetric("decoding.queue.out", self.outCounter)
self.overflowCounter = CounterMetric()
metrics.addMetric("decoding.queue.overflow", self.overflowCounter)
self.errorCounter = CounterMetric()
metrics.addMetric("decoding.queue.error", self.errorCounter)
def _setMaxSize(self, size):
if self.maxsize == size:
return
self.maxsize = size
def _setWorkers(self, workers):
while len(self.workers) > workers:
logger.debug("stopping one worker")
self.workers.pop().stop()
while len(self.workers) < workers:
logger.debug("starting one worker")
self.workers.append(self.newWorker())
def stop(self):
logger.debug("shutting down the queue")
while self.subscriptions:
self.subscriptions.pop().cancel()
try:
# purge all remaining jobs
while not self.empty():
job = self.get()
job.unlink()
self.task_done()
except Empty:
pass
# put() a PoisonPill for all active workers to shut them down
for w in self.workers:
if w.is_alive():
self.put(PoisonPill)
self.join()
def put(self, item, **kwargs):
self.inCounter.inc()
try:
super(DecoderQueue, self).put(item, block=False)
except Full:
self.overflowCounter.inc()
raise
def get(self, **kwargs):
# super.get() is blocking, so it would mess up the stats to inc() first
out = super(DecoderQueue, self).get(**kwargs)
self.outCounter.inc()
return out
def newWorker(self):
worker = QueueWorker(self)
worker.start()
return worker
def onError(self):
self.errorCounter.inc()

140
owrx/audio/wav.py Normal file
View File

@ -0,0 +1,140 @@
from owrx.config.core import CoreConfig
from owrx.audio import AudioChopperProfile
from owrx.audio.queue import QueueJob, DecoderQueue
import threading
import wave
import os
from multiprocessing.connection import Pipe
from datetime import datetime, timedelta
from queue import Full
from typing import List
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class WaveFile(object):
def __init__(self, filename):
self.filename = filename
self.waveFile = wave.open(filename, "wb")
self.waveFile.setnchannels(1)
self.waveFile.setsampwidth(2)
self.waveFile.setframerate(12000)
def close(self):
self.waveFile.close()
def getFileName(self):
return self.filename
def writeframes(self, data):
return self.waveFile.writeframes(data)
def unlink(self):
os.unlink(self.filename)
self.waveFile = None
class AudioWriter(object):
def __init__(self, active_dsp, interval, profiles: List[AudioChopperProfile]):
self.dsp = active_dsp
self.interval = interval
self.profiles = profiles
self.wavefile = None
self.switchingLock = threading.Lock()
self.timer = None
(self.outputReader, self.outputWriter) = Pipe()
def getWaveFile(self):
tmp_dir = CoreConfig().get_temporary_directory()
filename = "{tmp_dir}/openwebrx-audiochopper-master-{id}-{timestamp}.wav".format(
tmp_dir=tmp_dir,
id=id(self),
timestamp=datetime.utcnow().strftime("%y%m%d_%H%M%S"),
)
return WaveFile(filename)
def getNextDecodingTime(self):
t = datetime.utcnow()
zeroed = t.replace(minute=0, second=0, microsecond=0)
delta = t - zeroed
seconds = (int(delta.total_seconds() / self.interval) + 1) * self.interval
t = zeroed + timedelta(seconds=seconds)
logger.debug("scheduling: {0}".format(t))
return t
def cancelTimer(self):
if self.timer:
self.timer.cancel()
self.timer = None
def _scheduleNextSwitch(self):
self.cancelTimer()
delta = self.getNextDecodingTime() - datetime.utcnow()
self.timer = threading.Timer(delta.total_seconds(), self.switchFiles)
self.timer.start()
def switchFiles(self):
with self.switchingLock:
file = self.wavefile
self.wavefile = self.getWaveFile()
file.close()
for profile in self.profiles:
tmp_dir = CoreConfig().get_temporary_directory()
# create hardlinks for the individual profiles
filename = "{tmp_dir}/openwebrx-audiochopper-{pid}-{timestamp}.wav".format(
tmp_dir=tmp_dir,
pid=id(profile),
timestamp=datetime.utcnow().strftime(profile.getFileTimestampFormat()),
)
os.link(file.getFileName(), filename)
job = QueueJob(profile, self.outputWriter, filename, self.dsp.get_operating_freq())
try:
DecoderQueue.getSharedInstance().put(job)
except Full:
logger.warning("decoding queue overflow; dropping one file")
job.unlink()
# our master can be deleted now, the profiles will delete their hardlinked copies after processing
file.unlink()
self._scheduleNextSwitch()
def start(self):
self.wavefile = self.getWaveFile()
self._scheduleNextSwitch()
def write(self, data):
with self.switchingLock:
self.wavefile.writeframes(data)
def stop(self):
self.outputWriter.close()
self.outputWriter = None
# drain messages left in the queue so that the queue can be successfully closed
# this is necessary since python keeps the file descriptors open otherwise
try:
while True:
self.outputReader.recv()
except EOFError:
pass
self.outputReader.close()
self.outputReader = None
self.cancelTimer()
try:
self.wavefile.close()
except Exception:
logger.exception("error closing wave file")
try:
with self.switchingLock:
self.wavefile.unlink()
except Exception:
logger.exception("error removing undecoded file")
self.wavefile = None