restore audio chopper decoding

This commit is contained in:
Jakob Ketterl
2021-08-31 16:54:37 +02:00
parent 4a4901fa38
commit 73d326037c
8 changed files with 168 additions and 77 deletions

View File

@ -1,10 +1,10 @@
from owrx.modes import Modes, AudioChopperMode
from csdr.output import Output
from itertools import groupby
import threading
from owrx.audio import ProfileSourceSubscriber
from owrx.audio.wav import AudioWriter
from multiprocessing.connection import Pipe
from csdr.chain import Chain
import pickle
import logging
@ -12,18 +12,18 @@ logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class AudioChopper(threading.Thread, Output, ProfileSourceSubscriber):
def __init__(self, active_dsp, mode_str: str):
self.read_fn = None
class AudioChopper(threading.Thread, Chain, ProfileSourceSubscriber):
# TODO parser typing
def __init__(self, mode_str: str, parser):
self.parser = parser
self.doRun = True
self.dsp = active_dsp
self.writers = []
mode = Modes.findByModulation(mode_str)
if mode is None or not isinstance(mode, AudioChopperMode):
raise ValueError("Mode {} is not an audio chopper mode".format(mode_str))
self.profile_source = mode.get_profile_source()
(self.outputReader, self.outputWriter) = Pipe()
super().__init__()
Chain.__init__(self, [])
def stop_writers(self):
while self.writers:
@ -34,19 +34,20 @@ class AudioChopper(threading.Thread, Output, ProfileSourceSubscriber):
sorted_profiles = sorted(self.profile_source.getProfiles(), key=lambda p: p.getInterval())
groups = {interval: list(group) for interval, group in groupby(sorted_profiles, key=lambda p: p.getInterval())}
writers = [
AudioWriter(self.dsp, self.outputWriter, interval, profiles) for interval, profiles in groups.items()
AudioWriter(self, interval, profiles) for interval, profiles in groups.items()
]
for w in writers:
w.start()
self.writers = writers
def supports_type(self, t):
return t == "audio"
def receive_output(self, t, read_fn):
self.read_fn = read_fn
def setReader(self, reader):
super().setReader(reader)
self.start()
def stop(self):
self.reader.stop()
super().stop()
def run(self) -> None:
logger.debug("Audio chopper starting up")
self.setup_writers()
@ -54,37 +55,24 @@ class AudioChopper(threading.Thread, Output, ProfileSourceSubscriber):
while self.doRun:
data = None
try:
data = self.read_fn(256)
data = self.reader.read()
except ValueError:
pass
if data is None or (isinstance(data, bytes) and len(data) == 0):
if data is None:
self.doRun = False
else:
for w in self.writers:
w.write(data)
w.write(data.tobytes())
logger.debug("Audio chopper shutting down")
self.profile_source.unsubscribe(self)
self.stop_writers()
self.outputWriter.close()
self.outputWriter = None
# drain messages left in the queue so that the queue can be successfully closed
# this is necessary since python keeps the file descriptors open otherwise
try:
while True:
self.outputReader.recv()
except EOFError:
pass
self.outputReader.close()
self.outputReader = None
def onProfilesChanged(self):
logger.debug("profile change received, resetting writers...")
self.setup_writers()
def read(self):
try:
return self.outputReader.recv()
except (EOFError, OSError):
return None
def send(self, profile, line):
data = self.parser.parse(profile, line)
if data is not None:
self.writer.write(pickle.dumps(data))

View File

@ -13,11 +13,10 @@ logger.setLevel(logging.INFO)
class QueueJob(object):
def __init__(self, profile, writer, file, freq):
def __init__(self, profile, writer, file):
self.profile = profile
self.writer = writer
self.file = file
self.freq = freq
def run(self):
logger.debug("processing file %s", self.file)
@ -30,7 +29,7 @@ class QueueJob(object):
)
try:
for line in decoder.stdout:
self.writer.send((self.profile, self.freq, line))
self.writer.send(self.profile, line)
except (OSError, AttributeError):
decoder.stdout.flush()
# TODO uncouple parsing from the output so that decodes can still go to the map and the spotters

View File

@ -47,8 +47,7 @@ class WaveFile(object):
class AudioWriter(object):
def __init__(self, active_dsp, outputWriter, interval, profiles: List[AudioChopperProfile]):
self.dsp = active_dsp
def __init__(self, outputWriter, interval, profiles: List[AudioChopperProfile]):
self.outputWriter = outputWriter
self.interval = interval
self.profiles = profiles
@ -102,7 +101,7 @@ class AudioWriter(object):
logger.exception("Error while linking job files")
continue
job = QueueJob(profile, self.outputWriter, filename, self.dsp.get_operating_freq())
job = QueueJob(profile, self.outputWriter, filename)
try:
DecoderQueue.getSharedInstance().put(job)
except Full: