diff --git a/owrx/audio/__init__.py b/owrx/audio/__init__.py index 6971aeb..170bde3 100644 --- a/owrx/audio/__init__.py +++ b/owrx/audio/__init__.py @@ -1,4 +1,10 @@ -from abc import ABC, abstractmethod +from owrx.config import Config +from abc import ABC, ABCMeta, abstractmethod +from typing import List + +import logging + +logger = logging.getLogger(__name__) class AudioChopperProfile(ABC): @@ -13,3 +19,68 @@ class AudioChopperProfile(ABC): @abstractmethod def decoder_commandline(self, file): pass + + +class ProfileSourceSubscriber(ABC): + @abstractmethod + def onProfilesChanged(self): + pass + + +class ProfileSource(ABC): + def __init__(self): + self.subscribers = [] + + @abstractmethod + def getProfiles(self) -> List[AudioChopperProfile]: + pass + + def subscribe(self, subscriber: ProfileSourceSubscriber): + if subscriber in self.subscribers: + return + self.subscribers.append(subscriber) + + def unsubscribe(self, subscriber: ProfileSourceSubscriber): + if subscriber not in self.subscribers: + return + self.subscribers.remove(subscriber) + + def fireProfilesChanged(self): + for sub in self.subscribers.copy(): + try: + sub.onProfilesChanged() + except Exception: + logger.exception("Error while notifying profile subscriptions") + + +class ConfigWiredProfileSource(ProfileSource, metaclass=ABCMeta): + def __init__(self): + super().__init__() + self.configSub = None + + @abstractmethod + def getPropertiesToWire(self) -> List[str]: + pass + + def subscribe(self, subscriber: ProfileSourceSubscriber): + super().subscribe(subscriber) + if self.subscribers and self.configSub is None: + self.configSub = Config.get().filter(*self.getPropertiesToWire()).wire(self.fireProfilesChanged) + + def unsubscribe(self, subscriber: ProfileSourceSubscriber): + super().unsubscribe(subscriber) + if not self.subscribers and self.configSub is not None: + self.configSub.cancel() + self.configSub = None + + def fireProfilesChanged(self, *args): + super().fireProfilesChanged() + + +class StaticProfileSource(ProfileSource): + def __init__(self, profiles: List[AudioChopperProfile]): + super().__init__() + self.profiles = profiles + + def getProfiles(self) -> List[AudioChopperProfile]: + return self.profiles diff --git a/owrx/audio/chopper.py b/owrx/audio/chopper.py index 3a35436..52bbfa8 100644 --- a/owrx/audio/chopper.py +++ b/owrx/audio/chopper.py @@ -1,10 +1,10 @@ from owrx.modes import Modes, AudioChopperMode from csdr.output import Output from itertools import groupby -from abc import ABCMeta import threading +from owrx.audio import ProfileSourceSubscriber from owrx.audio.wav import AudioWriter -from multiprocessing.connection import wait +from multiprocessing.connection import Pipe, wait import logging @@ -12,26 +12,45 @@ logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) -class AudioChopper(threading.Thread, Output, metaclass=ABCMeta): +class AudioChopper(threading.Thread, Output, ProfileSourceSubscriber): def __init__(self, active_dsp, mode_str: str): + self.read_fn = None + self.doRun = True + self.dsp = active_dsp + self.writers = [] mode = Modes.findByModulation(mode_str) if mode is None or not isinstance(mode, AudioChopperMode): raise ValueError("Mode {} is not an audio chopper mode".format(mode_str)) - sorted_profiles = sorted(mode.getProfiles(), key=lambda p: p.getInterval()) - groups = {interval: list(group) for interval, group in groupby(sorted_profiles, key=lambda p: p.getInterval())} - self.read_fn = None - self.writers = [AudioWriter(active_dsp, interval, profiles) for interval, profiles in groups.items()] - self.doRun = True + self.profile_source = mode.get_profile_source() + self.writersChangedOut = None + self.writersChangedIn = None super().__init__() + def stop_writers(self): + while self.writers: + self.writers.pop().stop() + + def setup_writers(self): + self.stop_writers() + sorted_profiles = sorted(self.profile_source.getProfiles(), key=lambda p: p.getInterval()) + groups = {interval: list(group) for interval, group in groupby(sorted_profiles, key=lambda p: p.getInterval())} + self.writers = [AudioWriter(self.dsp, interval, profiles) for interval, profiles in groups.items()] + for w in self.writers: + w.start() + self.writersChangedOut.send(None) + + def supports_type(self, t): + return t == "audio" + def receive_output(self, t, read_fn): self.read_fn = read_fn self.start() def run(self) -> None: logger.debug("Audio chopper starting up") - for w in self.writers: - w.start() + self.writersChangedOut, self.writersChangedIn = Pipe() + self.setup_writers() + self.profile_source.subscribe(self) while self.doRun: data = None try: @@ -45,12 +64,22 @@ class AudioChopper(threading.Thread, Output, metaclass=ABCMeta): w.write(data) logger.debug("Audio chopper shutting down") - for w in self.writers: - w.stop() + self.profile_source.unsubscribe(self) + self.stop_writers() + self.writersChangedOut.close() + self.writersChangedIn.close() + + def onProfilesChanged(self): + logger.debug("profile change received, resetting writers...") + self.setup_writers() def read(self): - try: - readers = wait([w.outputReader for w in self.writers]) - return [r.recv() for r in readers] - except (EOFError, OSError): - return None + while True: + try: + readers = wait([w.outputReader for w in self.writers] + [self.writersChangedIn]) + received = [(r, r.recv()) for r in readers] + data = [d for r, d in received if r is not self.writersChangedIn] + if data: + return data + except (EOFError, OSError): + return None diff --git a/owrx/js8.py b/owrx/js8.py index caee60d..d4bbdb9 100644 --- a/owrx/js8.py +++ b/owrx/js8.py @@ -1,32 +1,20 @@ -from .audio import AudioChopperProfile -from .parser import Parser +from owrx.audio import AudioChopperProfile, ConfigWiredProfileSource +from owrx.parser import Parser import re from js8py import Js8 from js8py.frames import Js8FrameHeartbeat, Js8FrameCompound -from .map import Map, LocatorLocation -from .metrics import Metrics, CounterMetric -from .config import Config +from owrx.map import Map, LocatorLocation +from owrx.metrics import Metrics, CounterMetric +from owrx.config import Config from abc import ABCMeta, abstractmethod from owrx.reporting import ReportingEngine +from typing import List import logging logger = logging.getLogger(__name__) -class Js8Profiles(object): - @staticmethod - def getEnabledProfiles(): - config = Config.get() - profiles = config["js8_enabled_profiles"] if "js8_enabled_profiles" in config else [] - return [Js8Profiles.loadProfile(p) for p in profiles] - - @staticmethod - def loadProfile(profileName): - className = "Js8{0}Profile".format(profileName[0].upper() + profileName[1:].lower()) - return globals()[className]() - - class Js8Profile(AudioChopperProfile, metaclass=ABCMeta): def decoding_depth(self): pm = Config.get() @@ -47,6 +35,20 @@ class Js8Profile(AudioChopperProfile, metaclass=ABCMeta): pass +class Js8ProfileSource(ConfigWiredProfileSource): + def getPropertiesToWire(self) -> List[str]: + return ["js8_enabled_profiles"] + + def getProfiles(self) -> List[AudioChopperProfile]: + config = Config.get() + profiles = config["js8_enabled_profiles"] if "js8_enabled_profiles" in config else [] + return [self._loadProfile(p) for p in profiles] + + def _loadProfile(self, profileName): + className = "Js8{0}Profile".format(profileName[0].upper() + profileName[1:].lower()) + return globals()[className]() + + class Js8NormalProfile(Js8Profile): def getInterval(self): return 15 diff --git a/owrx/modes.py b/owrx/modes.py index f34cbc4..ede30bf 100644 --- a/owrx/modes.py +++ b/owrx/modes.py @@ -1,4 +1,5 @@ from owrx.feature import FeatureDetector +from owrx.audio import ProfileSource from functools import reduce from abc import ABCMeta, abstractmethod @@ -59,7 +60,7 @@ class AudioChopperMode(DigitalMode, metaclass=ABCMeta): super().__init__(modulation, name, ["usb"], bandpass=bandpass, requirements=requirements, service=True) @abstractmethod - def getProfiles(self): + def get_profile_source(self) -> ProfileSource: pass @@ -69,10 +70,10 @@ class WsjtMode(AudioChopperMode): requirements = ["wsjt-x"] super().__init__(modulation, name, bandpass=bandpass, requirements=requirements) - def getProfiles(self): + def get_profile_source(self) -> ProfileSource: # inline import due to circular dependencies - from owrx.wsjt import WsjtProfile - return WsjtProfile.getProfiles(self.modulation) + from owrx.wsjt import WsjtProfiles + return WsjtProfiles.getSource(self.modulation) class Js8Mode(AudioChopperMode): @@ -81,10 +82,10 @@ class Js8Mode(AudioChopperMode): requirements = ["js8call"] super().__init__(modulation, name, bandpass, requirements) - def getProfiles(self): + def get_profile_source(self) -> ProfileSource: # inline import due to circular dependencies - from owrx.js8 import Js8Profiles - return Js8Profiles.getEnabledProfiles() + from owrx.js8 import Js8ProfileSource + return Js8ProfileSource() class Modes(object): diff --git a/owrx/wsjt.py b/owrx/wsjt.py index 9cafe68..4bd96d5 100644 --- a/owrx/wsjt.py +++ b/owrx/wsjt.py @@ -1,10 +1,12 @@ from datetime import datetime, timezone +from typing import List + from owrx.map import Map, LocatorLocation import re from owrx.metrics import Metrics, CounterMetric from owrx.reporting import ReportingEngine from owrx.parser import Parser -from owrx.audio import AudioChopperProfile +from owrx.audio import AudioChopperProfile, StaticProfileSource, ConfigWiredProfileSource from abc import ABC, ABCMeta, abstractmethod from owrx.config import Config from enum import Enum @@ -39,24 +41,69 @@ class WsjtProfile(AudioChopperProfile, metaclass=ABCMeta): def getMode(self): pass + +class Fst4ProfileSource(ConfigWiredProfileSource): + def getPropertiesToWire(self) -> List[str]: + return ["fst4_enabled_intervals"] + + def getProfiles(self) -> List[AudioChopperProfile]: + config = Config.get() + profiles = config["fst4_enabled_intervals"] if "fst4_enabled_intervals" in config else [] + return [Fst4Profile(i) for i in profiles if i in Fst4Profile.availableIntervals] + + +class Fst4wProfileSource(ConfigWiredProfileSource): + def getPropertiesToWire(self) -> List[str]: + return ["fst4w_enabled_intervals"] + + def getProfiles(self) -> List[AudioChopperProfile]: + config = Config.get() + profiles = config["fst4w_enabled_intervals"] if "fst4w_enabled_intervals" in config else [] + return [Fst4wProfile(i) for i in profiles if i in Fst4wProfile.availableIntervals] + + +class Q65ProfileSource(ConfigWiredProfileSource): + def getPropertiesToWire(self) -> List[str]: + return ["q65_enabled_combinations"] + + def getProfiles(self) -> List[AudioChopperProfile]: + config = Config.get() + profiles = config["q65_enabled_combinations"] if "q65_enabled_combinations" in config else [] + + def buildProfile(modestring): + try: + mode = Q65Mode[modestring[0]] + interval = Q65Interval(int(modestring[1:])) + if interval.is_available(mode): + return Q65Profile(interval, mode) + except (ValueError, KeyError): + pass + logger.warning('"%s" is not a valid Q65 mode, or an invalid mode string, ignoring', modestring) + return None + + mapped = [buildProfile(m) for m in profiles] + return [p for p in mapped if p is not None] + + +class WsjtProfiles(object): @staticmethod - def getProfiles(mode: str): + def getSource(mode: str): if mode == "ft8": - return [Ft8Profile()] + return StaticProfileSource([Ft8Profile()]) elif mode == "wspr": - return [WsprProfile()] + return StaticProfileSource([WsprProfile()]) elif mode == "jt65": - return [Jt65Profile()] + return StaticProfileSource([Jt65Profile()]) elif mode == "jt9": - return [Jt9Profile()] + return StaticProfileSource([Jt9Profile()]) elif mode == "ft4": - return [Ft4Profile()] + return StaticProfileSource([Ft4Profile()]) elif mode == "fst4": - return Fst4Profile.getEnabledProfiles() + return Fst4ProfileSource() elif mode == "fst4w": - return Fst4wProfile.getEnabledProfiles() + return Fst4wProfileSource() elif mode == "q65": - return Q65Profile.getEnabledProfiles() + return Q65ProfileSource() class Ft8Profile(WsjtProfile): @@ -133,12 +180,6 @@ class Fst4Profile(WsjtProfile): def getMode(self): return "FST4" - @staticmethod - def getEnabledProfiles(): - config = Config.get() - profiles = config["fst4_enabled_intervals"] if "fst4_enabled_intervals" in config else [] - return [Fst4Profile(i) for i in profiles if i in Fst4Profile.availableIntervals] - class Fst4wProfile(WsjtProfile): availableIntervals = [120, 300, 900, 1800] @@ -155,12 +196,6 @@ class Fst4wProfile(WsjtProfile): def getMode(self): return "FST4W" - @staticmethod - def getEnabledProfiles(): - config = Config.get() - profiles = config["fst4w_enabled_intervals"] if "fst4w_enabled_intervals" in config else [] - return [Fst4wProfile(i) for i in profiles if i in Fst4wProfile.availableIntervals] - class Q65Mode(Enum): # value is the bandwidth multiplier according to https://physics.princeton.edu/pulsar/k1jt/Q65_Quick_Start.pdf @@ -209,25 +244,6 @@ class Q65Profile(WsjtProfile): def decoder_commandline(self, file): return ["jt9", "--q65", "-p", str(self.interval), "-b", self.mode.name, "-d", str(self.decoding_depth()), file] - @staticmethod - def getEnabledProfiles(): - config = Config.get() - profiles = config["q65_enabled_combinations"] if "q65_enabled_combinations" in config else [] - - def buildProfile(modestring): - try: - mode = Q65Mode[modestring[0]] - interval = Q65Interval(int(modestring[1:])) - if interval.is_available(mode): - return Q65Profile(interval, mode) - except (ValueError, KeyError): - pass - logger.warning('"%s" is not a valid Q65 mode, or an invalid mode string, ignoring', modestring) - return None - - mapped = [buildProfile(m) for m in profiles] - return [p for p in mapped if p is not None] - class WsjtParser(Parser): def parse(self, messages):