openwebrx-clone/owrx/audio/__init__.py

87 lines
2.3 KiB
Python

from owrx.config import Config
from abc import ABC, ABCMeta, abstractmethod
from typing import List
import logging
logger = logging.getLogger(__name__)
class AudioChopperProfile(ABC):
@abstractmethod
def getInterval(self):
pass
@abstractmethod
def getFileTimestampFormat(self):
pass
@abstractmethod
def decoder_commandline(self, file):
pass
class ProfileSourceSubscriber(ABC):
@abstractmethod
def onProfilesChanged(self):
pass
class ProfileSource(ABC):
def __init__(self):
self.subscribers = []
@abstractmethod
def getProfiles(self) -> List[AudioChopperProfile]:
pass
def subscribe(self, subscriber: ProfileSourceSubscriber):
if subscriber in self.subscribers:
return
self.subscribers.append(subscriber)
def unsubscribe(self, subscriber: ProfileSourceSubscriber):
if subscriber not in self.subscribers:
return
self.subscribers.remove(subscriber)
def fireProfilesChanged(self):
for sub in self.subscribers.copy():
try:
sub.onProfilesChanged()
except Exception:
logger.exception("Error while notifying profile subscriptions")
class ConfigWiredProfileSource(ProfileSource, metaclass=ABCMeta):
def __init__(self):
super().__init__()
self.configSub = None
@abstractmethod
def getPropertiesToWire(self) -> List[str]:
pass
def subscribe(self, subscriber: ProfileSourceSubscriber):
super().subscribe(subscriber)
if self.subscribers and self.configSub is None:
self.configSub = Config.get().filter(*self.getPropertiesToWire()).wire(self.fireProfilesChanged)
def unsubscribe(self, subscriber: ProfileSourceSubscriber):
super().unsubscribe(subscriber)
if not self.subscribers and self.configSub is not None:
self.configSub.cancel()
self.configSub = None
def fireProfilesChanged(self, *args):
super().fireProfilesChanged()
class StaticProfileSource(ProfileSource):
def __init__(self, profiles: List[AudioChopperProfile]):
super().__init__()
self.profiles = profiles
def getProfiles(self) -> List[AudioChopperProfile]:
return self.profiles