openwebrx-clone/owrx/service/__init__.py

404 lines
14 KiB
Python
Raw Normal View History

2019-07-21 21:39:11 +00:00
import threading
from owrx.source import SdrSourceEventClient, SdrSourceState, SdrClientClass
from owrx.sdr import SdrService
2019-07-21 21:39:11 +00:00
from owrx.bands import Bandplan
2021-04-09 16:16:25 +00:00
from csdr.output import Output
2019-07-21 21:39:11 +00:00
from owrx.wsjt import WsjtParser
from owrx.aprs import AprsParser
2020-04-14 19:27:50 +00:00
from owrx.js8 import Js8Parser
2021-02-11 18:31:44 +00:00
from owrx.config.core import CoreConfig
from owrx.config import Config
from owrx.source.resampler import Resampler
from owrx.property import PropertyLayer, PropertyDeleted
2020-04-14 19:27:50 +00:00
from js8py import Js8Frame
from abc import ABCMeta, abstractmethod
from owrx.service.schedule import ServiceScheduler
2021-08-31 19:53:15 +00:00
from owrx.service.chain import ServiceDemodulatorChain
from owrx.modes import Modes, DigitalMode
from typing import Union
from csdr.chain.demodulator import BaseDemodulatorChain, SecondaryDemodulator, FixedAudioRateChain
from csdr.chain.analog import NFm, Ssb
from csdr.chain.digimodes import AudioChopperDemodulator
2019-07-21 21:39:11 +00:00
import logging
logger = logging.getLogger(__name__)
2021-04-09 16:16:25 +00:00
class ServiceOutput(Output, metaclass=ABCMeta):
2019-07-21 21:39:11 +00:00
def __init__(self, frequency):
self.frequency = frequency
@abstractmethod
def getParser(self):
# abstract method; implement in subclasses
pass
def receive_output(self, t, read_fn):
parser = self.getParser()
parser.setDialFrequency(self.frequency)
target = self.pump(read_fn, parser.parse)
2020-08-14 18:22:25 +00:00
threading.Thread(target=target, name="service_output_receive").start()
2019-07-21 21:39:11 +00:00
class WsjtServiceOutput(ServiceOutput):
def getParser(self):
return WsjtParser(WsjtHandler())
def supports_type(self, t):
2019-08-11 09:37:45 +00:00
return t == "wsjt_demod"
2019-07-21 21:39:11 +00:00
class AprsServiceOutput(ServiceOutput):
def getParser(self):
return AprsParser(AprsHandler())
def supports_type(self, t):
return t == "packet_demod"
2020-04-14 19:27:50 +00:00
class Js8ServiceOutput(ServiceOutput):
def getParser(self):
return Js8Parser(Js8Handler())
def supports_type(self, t):
return t == "js8_demod"
class ServiceHandler(SdrSourceEventClient):
2019-07-21 21:39:11 +00:00
def __init__(self, source):
self.lock = threading.RLock()
2019-07-21 21:39:11 +00:00
self.services = []
self.source = source
self.startupTimer = None
self.activitySub = None
self.running = False
props = self.source.getProps()
self.enabledSub = props.wireProperty("services", self._receiveEvent)
2021-02-26 22:50:58 +00:00
self.decodersSub = None
# need to call _start() manually if property is not set since the default is True, but the initial call is only
# made if the property is present
if "services" not in props:
self._start()
def _receiveEvent(self, state):
# deletion means fall back to default, which is True
if state is PropertyDeleted:
state = True
if self.running == state:
return
if state:
self._start()
else:
self._stop()
def _start(self):
self.running = True
2019-07-21 21:39:11 +00:00
self.source.addClient(self)
props = self.source.getProps()
self.activitySub = props.filter("center_freq", "samp_rate").wire(self.onFrequencyChange)
2021-02-26 22:50:58 +00:00
self.decodersSub = Config.get().wireProperty("services_decoders", self.onFrequencyChange)
if self.source.isAvailable():
2021-02-26 22:50:58 +00:00
self._scheduleServiceStartup()
def _stop(self):
if self.activitySub is not None:
self.activitySub.cancel()
self.activitySub = None
2021-02-26 22:50:58 +00:00
if self.decodersSub is not None:
self.decodersSub.cancel()
self.decodersSub = None
self._cancelStartupTimer()
self.source.removeClient(self)
self.stopServices()
self.running = False
2021-02-20 21:54:07 +00:00
def getClientClass(self) -> SdrClientClass:
return SdrClientClass.INACTIVE
2019-07-21 21:39:11 +00:00
2021-02-20 21:54:07 +00:00
def onStateChange(self, state: SdrSourceState):
if state is SdrSourceState.RUNNING:
2021-02-26 22:50:58 +00:00
self._scheduleServiceStartup()
2021-02-20 21:54:07 +00:00
elif state is SdrSourceState.STOPPING:
2019-11-15 21:13:00 +00:00
logger.debug("sdr source becoming unavailable; stopping services.")
self.stopServices()
def onFail(self):
logger.debug("sdr source failed; stopping services.")
self.stopServices()
def onShutdown(self):
logger.debug("sdr source is shutting down; shutting down service handler, too.")
self.shutdown()
2021-03-18 18:59:10 +00:00
def onEnable(self):
self._scheduleServiceStartup()
2019-07-21 21:39:11 +00:00
def isSupported(self, mode):
configured = Config.get()["services_decoders"]
available = [m.modulation for m in Modes.getAvailableServices()]
return mode in configured and mode in available
2019-07-21 21:39:11 +00:00
def shutdown(self):
self._stop()
if self.enabledSub is not None:
self.enabledSub.cancel()
self.enabledSub = None
2019-07-21 21:39:11 +00:00
def stopServices(self):
with self.lock:
services = self.services
self.services = []
2019-07-21 21:39:11 +00:00
for service in services:
service.stop()
2019-07-21 21:39:11 +00:00
2020-12-30 16:18:46 +00:00
def onFrequencyChange(self, changes):
self.stopServices()
2019-07-21 21:39:11 +00:00
if not self.source.isAvailable():
return
2021-02-26 22:50:58 +00:00
self._scheduleServiceStartup()
def _cancelStartupTimer(self):
if self.startupTimer:
self.startupTimer.cancel()
self.startupTimer = None
2021-02-26 22:50:58 +00:00
def _scheduleServiceStartup(self):
self._cancelStartupTimer()
self.startupTimer = threading.Timer(10, self.updateServices)
self.startupTimer.start()
2019-07-28 10:11:22 +00:00
def updateServices(self):
with self.lock:
logger.debug("re-scheduling services due to sdr changes")
self.stopServices()
if not self.source.isAvailable():
logger.debug("sdr source is unavailable")
return
cf = self.source.getProps()["center_freq"]
sr = self.source.getProps()["samp_rate"]
srh = sr / 2
frequency_range = (cf - srh, cf + srh)
dials = [
dial
2021-01-20 16:01:46 +00:00
for dial in Bandplan.getSharedInstance().collectDialFrequencies(frequency_range)
if self.isSupported(dial["mode"])
]
if not dials:
logger.debug("no services available")
return
groups = self.optimizeResampling(dials, sr)
if groups is None:
for dial in dials:
2021-01-20 16:01:46 +00:00
self.services.append(self.setupService(dial["mode"], dial["frequency"], self.source))
else:
for group in groups:
if len(group) > 1:
cf = self.get_center_frequency(group)
bw = self.get_bandwidth(group)
logger.debug("group center frequency: {0}, bandwidth: {1}".format(cf, bw))
resampler_props = PropertyLayer()
resampler_props["center_freq"] = cf
resampler_props["samp_rate"] = bw
resampler = Resampler(resampler_props, self.source)
resampler.start()
for dial in group:
self.services.append(self.setupService(dial["mode"], dial["frequency"], resampler))
# resampler goes in after the services since it must not be shutdown as long as the services are
# still running
self.services.append(resampler)
else:
dial = group[0]
self.services.append(self.setupService(dial["mode"], dial["frequency"], self.source))
def get_min_max(self, group):
frequencies = sorted(group, key=lambda f: f["frequency"])
lowest = frequencies[0]
min = lowest["frequency"] + Modes.findByModulation(lowest["mode"]).get_bandpass().low_cut
highest = frequencies[-1]
max = highest["frequency"] + Modes.findByModulation(highest["mode"]).get_bandpass().high_cut
return min, max
def get_center_frequency(self, group):
min, max = self.get_min_max(group)
return (min + max) / 2
def get_bandwidth(self, group):
minFreq, maxFreq = self.get_min_max(group)
# minimum bandwidth for a resampler: 25kHz
2021-05-07 22:38:00 +00:00
return max((maxFreq - minFreq) * 1.15, 25000)
def optimizeResampling(self, freqs, bandwidth):
freqs = sorted(freqs, key=lambda f: f["frequency"])
2019-09-13 21:03:05 +00:00
distances = [
{
"frequency": freqs[i]["frequency"],
"distance": freqs[i + 1]["frequency"] - freqs[i]["frequency"],
}
2019-09-13 21:03:05 +00:00
for i in range(0, len(freqs) - 1)
]
distances = [d for d in distances if d["distance"] > 0]
distances = sorted(distances, key=lambda f: f["distance"], reverse=True)
def calculate_usage(num_splits):
splits = sorted([f["frequency"] for f in distances[0:num_splits]])
previous = 0
groups = []
for split in splits:
groups.append([f for f in freqs if previous < f["frequency"] <= split])
previous = split
groups.append([f for f in freqs if previous < f["frequency"]])
def get_total_bandwidth(group):
if len(group) > 1:
return bandwidth + len(group) * self.get_bandwidth(group)
else:
return bandwidth
total_bandwidth = sum([get_total_bandwidth(group) for group in groups])
return {
"num_splits": num_splits,
"total_bandwidth": total_bandwidth,
"groups": groups,
}
usages = [calculate_usage(i) for i in range(0, len(freqs))]
2019-10-13 16:28:58 +00:00
# another possible outcome might be that it's best not to resample at all. this is a special case.
usages += [
{
"num_splits": None,
"total_bandwidth": bandwidth * len(freqs),
"groups": [freqs],
}
]
results = sorted(usages, key=lambda f: f["total_bandwidth"])
for r in results:
2021-01-20 16:01:46 +00:00
logger.debug("splits: {0}, total: {1}".format(r["num_splits"], r["total_bandwidth"]))
best = results[0]
if best["num_splits"] is None:
return None
return best["groups"]
def setupService(self, mode, frequency, source):
2019-07-21 21:39:11 +00:00
logger.debug("setting up service {0} on frequency {1}".format(mode, frequency))
# TODO selecting outputs will need some more intelligence here
if mode == "packet":
output = AprsServiceOutput(frequency)
2020-04-14 19:27:50 +00:00
elif mode == "js8":
output = Js8ServiceOutput(frequency)
else:
output = WsjtServiceOutput(frequency)
2021-08-31 19:53:15 +00:00
modeObject = Modes.findByModulation(mode)
2021-08-31 19:53:15 +00:00
if not isinstance(modeObject, DigitalMode):
logger.warning("mode is not a digimode: %s", mode)
return None
demod = self._getDemodulator(modeObject.get_modulation(), source.getProps())
secondaryDemod = self._getSecondaryDemodulator(modeObject.modulation)
center_freq = source.getProps()["center_freq"]
sampleRate = source.getProps()["samp_rate"]
shift = (center_freq - frequency) / sampleRate
bandpass = modeObject.get_bandpass()
chain = ServiceDemodulatorChain(demod, secondaryDemod, sampleRate, shift)
chain.setBandPass(bandpass.low_cut, bandpass.high_cut)
chain.setReader(source.getBuffer().getReader())
return chain
# TODO move this elsewhere
def _getDemodulator(self, demod: Union[str, BaseDemodulatorChain], props):
if isinstance(demod, BaseDemodulatorChain):
return demod
# TODO: move this to Modes
demodChain = None
if demod == "nfm":
demodChain = NFm(props["output_rate"])
elif demod in ["usb", "lsb", "cw"]:
demodChain = Ssb()
return demodChain
# TODO move this elsewhere
def _getSecondaryDemodulator(self, mod):
if isinstance(mod, SecondaryDemodulator):
return mod
# TODO add remaining modes
if mod in ["ft8", "wspr", "jt65", "jt9", "ft4", "fst4", "fst4w", "q65"]:
return AudioChopperDemodulator(mod, WsjtParser())
return None
2019-07-21 21:39:11 +00:00
class WsjtHandler(object):
def write_wsjt_message(self, msg):
pass
class AprsHandler(object):
def write_aprs_data(self, data):
pass
2020-04-14 19:27:50 +00:00
class Js8Handler(object):
def write_js8_message(self, frame: Js8Frame, freq: int):
pass
2019-09-15 19:10:30 +00:00
class Services(object):
handlers = {}
schedulers = {}
2019-11-23 00:12:21 +00:00
2019-07-21 21:39:11 +00:00
@staticmethod
2019-09-15 19:10:30 +00:00
def start():
2021-02-26 16:53:06 +00:00
config = Config.get()
config.wireProperty("services_enabled", Services._receiveEnabledEvent)
activeSources = SdrService.getActiveSources()
activeSources.wire(Services._receiveDeviceEvent)
for key, source in activeSources.items():
Services.schedulers[key] = ServiceScheduler(source)
2021-02-26 16:53:06 +00:00
@staticmethod
def _receiveEnabledEvent(state):
2021-02-26 16:53:06 +00:00
if state:
for key, source in SdrService.getActiveSources().__dict__().items():
Services.handlers[key] = ServiceHandler(source)
2021-02-26 16:53:06 +00:00
else:
2021-03-20 16:24:00 +00:00
for handler in list(Services.handlers.values()):
handler.shutdown()
Services.handlers = {}
@staticmethod
def _receiveDeviceEvent(changes):
for key, source in changes.items():
if source is PropertyDeleted:
if key in Services.handlers:
Services.handlers[key].shutdown()
del Services.handlers[key]
if key in Services.schedulers:
Services.schedulers[key].shutdown()
del Services.schedulers[key]
else:
Services.schedulers[key] = ServiceScheduler(source)
if Config.get()["services_enabled"]:
Services.handlers[key] = ServiceHandler(source)
2019-10-27 11:16:17 +00:00
@staticmethod
def stop():
2021-03-20 16:24:00 +00:00
for handler in list(Services.handlers.values()):
handler.shutdown()
Services.handlers = {}
2021-03-20 16:24:00 +00:00
for scheduler in list(Services.schedulers.values()):
scheduler.shutdown()
Services.schedulers = {}