openwebrx-clone/owrx/audio/chopper.py

91 lines
2.9 KiB
Python
Raw Permalink Normal View History

from owrx.modes import Modes, AudioChopperMode
from csdr.output import Output
from itertools import groupby
import threading
from owrx.audio import ProfileSourceSubscriber
from owrx.audio.wav import AudioWriter
2021-05-01 14:51:02 +00:00
from multiprocessing.connection import Pipe
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class AudioChopper(threading.Thread, Output, ProfileSourceSubscriber):
def __init__(self, active_dsp, mode_str: str):
self.read_fn = None
self.doRun = True
self.dsp = active_dsp
self.writers = []
mode = Modes.findByModulation(mode_str)
if mode is None or not isinstance(mode, AudioChopperMode):
raise ValueError("Mode {} is not an audio chopper mode".format(mode_str))
self.profile_source = mode.get_profile_source()
(self.outputReader, self.outputWriter) = Pipe()
super().__init__()
def stop_writers(self):
while self.writers:
self.writers.pop().stop()
def setup_writers(self):
self.stop_writers()
sorted_profiles = sorted(self.profile_source.getProfiles(), key=lambda p: p.getInterval())
groups = {interval: list(group) for interval, group in groupby(sorted_profiles, key=lambda p: p.getInterval())}
2021-05-01 14:51:02 +00:00
writers = [
AudioWriter(self.dsp, self.outputWriter, interval, profiles) for interval, profiles in groups.items()
]
2021-04-11 18:10:49 +00:00
for w in writers:
w.start()
2021-04-11 18:10:49 +00:00
self.writers = writers
def supports_type(self, t):
return t == "audio"
def receive_output(self, t, read_fn):
self.read_fn = read_fn
self.start()
def run(self) -> None:
logger.debug("Audio chopper starting up")
self.setup_writers()
self.profile_source.subscribe(self)
while self.doRun:
data = None
try:
data = self.read_fn(256)
except ValueError:
pass
if data is None or (isinstance(data, bytes) and len(data) == 0):
self.doRun = False
else:
for w in self.writers:
w.write(data)
logger.debug("Audio chopper shutting down")
self.profile_source.unsubscribe(self)
self.stop_writers()
self.outputWriter.close()
self.outputWriter = None
# drain messages left in the queue so that the queue can be successfully closed
# this is necessary since python keeps the file descriptors open otherwise
try:
while True:
self.outputReader.recv()
except EOFError:
pass
self.outputReader.close()
self.outputReader = None
def onProfilesChanged(self):
logger.debug("profile change received, resetting writers...")
self.setup_writers()
def read(self):
try:
return self.outputReader.recv()
except (EOFError, OSError):
return None