openwebrx-clone/owrx/audio/chopper.py

92 lines
3.0 KiB
Python

from owrx.modes import Modes, AudioChopperMode
from owrx.audio import AudioChopperProfile
from itertools import groupby
from owrx.audio import ProfileSourceSubscriber
from owrx.audio.wav import AudioWriter
from owrx.audio.queue import QueueJob
from csdr.module import ThreadModule
from pycsdr.types import Format
from abc import ABC, abstractmethod
import pickle
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class AudioChopperParser(ABC):
@abstractmethod
def parse(self, profile: AudioChopperProfile, frequency: int, line: bytes):
pass
class AudioChopper(ThreadModule, ProfileSourceSubscriber):
def __init__(self, mode_str: str, parser: AudioChopperParser):
self.parser = parser
self.dialFrequency = None
self.doRun = True
self.writers = []
mode = Modes.findByModulation(mode_str)
if mode is None or not isinstance(mode, AudioChopperMode):
raise ValueError("Mode {} is not an audio chopper mode".format(mode_str))
self.profile_source = mode.get_profile_source()
super().__init__()
def getInputFormat(self) -> Format:
return Format.SHORT
def getOutputFormat(self) -> Format:
return Format.CHAR
def stop_writers(self):
while self.writers:
self.writers.pop().stop()
def setup_writers(self):
self.stop_writers()
sorted_profiles = sorted(self.profile_source.getProfiles(), key=lambda p: p.getInterval())
groups = {interval: list(group) for interval, group in groupby(sorted_profiles, key=lambda p: p.getInterval())}
writers = [
AudioWriter(self, interval, profiles) for interval, profiles in groups.items()
]
for w in writers:
w.start()
self.writers = writers
def run(self) -> None:
logger.debug("Audio chopper starting up")
self.setup_writers()
self.profile_source.subscribe(self)
while self.doRun:
data = None
try:
data = self.reader.read()
except ValueError:
pass
if data is None:
self.doRun = False
else:
for w in self.writers:
w.write(data.tobytes())
logger.debug("Audio chopper shutting down")
self.profile_source.unsubscribe(self)
self.stop_writers()
def onProfilesChanged(self):
logger.debug("profile change received, resetting writers...")
self.setup_writers()
def setDialFrequency(self, frequency: int) -> None:
self.dialFrequency = frequency
def createJob(self, profile, filename):
return QueueJob(profile, self.dialFrequency, self, filename)
def sendResult(self, result):
for line in result.lines:
data = self.parser.parse(result.profile, result.frequency, line)
if data is not None and self.writer is not None:
self.writer.write(pickle.dumps(data))