openwebrx-clone/owrx/service/__init__.py

276 lines
9.5 KiB
Python
Raw Normal View History

2019-07-21 21:39:11 +00:00
import threading
from owrx.source import SdrSource
from owrx.sdr import SdrService
2019-07-21 21:39:11 +00:00
from owrx.bands import Bandplan
2019-12-08 16:15:48 +00:00
from csdr.csdr import dsp, output
2019-07-21 21:39:11 +00:00
from owrx.wsjt import WsjtParser
from owrx.aprs import AprsParser
from owrx.config import Config
from owrx.source.resampler import Resampler
from owrx.feature import FeatureDetector
from owrx.property import PropertyLayer
from abc import ABCMeta, abstractmethod
from .schedule import ServiceScheduler
2019-07-21 21:39:11 +00:00
import logging
logger = logging.getLogger(__name__)
class ServiceOutput(output, metaclass=ABCMeta):
2019-07-21 21:39:11 +00:00
def __init__(self, frequency):
self.frequency = frequency
@abstractmethod
def getParser(self):
# abstract method; implement in subclasses
pass
def receive_output(self, t, read_fn):
parser = self.getParser()
parser.setDialFrequency(self.frequency)
target = self.pump(read_fn, parser.parse)
2019-07-21 21:39:11 +00:00
threading.Thread(target=target).start()
class WsjtServiceOutput(ServiceOutput):
def getParser(self):
return WsjtParser(WsjtHandler())
def supports_type(self, t):
2019-08-11 09:37:45 +00:00
return t == "wsjt_demod"
2019-07-21 21:39:11 +00:00
class AprsServiceOutput(ServiceOutput):
def getParser(self):
return AprsParser(AprsHandler())
def supports_type(self, t):
return t == "packet_demod"
2019-07-21 21:39:11 +00:00
class ServiceHandler(object):
def __init__(self, source):
self.lock = threading.Lock()
2019-07-21 21:39:11 +00:00
self.services = []
self.source = source
self.startupTimer = None
2019-07-21 21:39:11 +00:00
self.source.addClient(self)
props = self.source.getProps()
props.collect("center_freq", "samp_rate").wire(self.onFrequencyChange)
if self.source.isAvailable():
self.scheduleServiceStartup()
self.scheduler = None
if "schedule" in props or "scheduler" in props:
self.scheduler = ServiceScheduler(self.source)
def getClientClass(self):
2019-11-16 14:40:12 +00:00
return SdrSource.CLIENT_INACTIVE
2019-07-21 21:39:11 +00:00
2019-11-15 21:13:00 +00:00
def onStateChange(self, state):
if state == SdrSource.STATE_RUNNING:
self.scheduleServiceStartup()
elif state == SdrSource.STATE_STOPPING:
logger.debug("sdr source becoming unavailable; stopping services.")
self.stopServices()
elif state == SdrSource.STATE_FAILED:
logger.debug("sdr source failed; stopping services.")
self.stopServices()
if self.scheduler:
self.scheduler.shutdown()
def onBusyStateChange(self, state):
pass
2019-07-21 21:39:11 +00:00
def isSupported(self, mode):
# TODO this should be in a more central place (the frontend also needs this)
requirements = {
2019-11-23 00:12:21 +00:00
"ft8": "wsjt-x",
"ft4": "wsjt-x",
"jt65": "wsjt-x",
"jt9": "wsjt-x",
"wspr": "wsjt-x",
"packet": "packet",
}
fd = FeatureDetector()
# this looks overly complicated... but i'd like modes with no requirements to be always available without
# being listed in the hash above
unavailable = [mode for mode, req in requirements.items() if not fd.is_available(req)]
configured = Config.get()["services_decoders"]
available = [mode for mode in configured if mode not in unavailable]
return mode in available
2019-07-21 21:39:11 +00:00
def shutdown(self):
self.stopServices()
self.source.removeClient(self)
if self.scheduler:
self.scheduler.shutdown()
2019-07-21 21:39:11 +00:00
def stopServices(self):
with self.lock:
services = self.services
self.services = []
2019-07-21 21:39:11 +00:00
for service in services:
service.stop()
2019-07-21 21:39:11 +00:00
def onFrequencyChange(self, key, value):
self.stopServices()
2019-07-21 21:39:11 +00:00
if not self.source.isAvailable():
return
self.scheduleServiceStartup()
def scheduleServiceStartup(self):
if self.startupTimer:
self.startupTimer.cancel()
self.startupTimer = threading.Timer(10, self.updateServices)
self.startupTimer.start()
2019-07-28 10:11:22 +00:00
def updateServices(self):
logger.debug("re-scheduling services due to sdr changes")
2019-07-21 21:39:11 +00:00
self.stopServices()
if not self.source.isAvailable():
logger.debug("sdr source is unavailable")
return
2019-07-21 21:39:11 +00:00
cf = self.source.getProps()["center_freq"]
sr = self.source.getProps()["samp_rate"]
srh = sr / 2
2019-07-21 21:39:11 +00:00
frequency_range = (cf - srh, cf + srh)
dials = [
dial
2019-08-11 09:37:45 +00:00
for dial in Bandplan.getSharedInstance().collectDialFrequencies(frequency_range)
if self.isSupported(dial["mode"])
]
2019-07-21 21:39:11 +00:00
if not dials:
logger.debug("no services available")
return
with self.lock:
self.services = []
groups = self.optimizeResampling(dials, sr)
if groups is None:
for dial in dials:
self.services.append(self.setupService(dial["mode"], dial["frequency"], self.source))
else:
for group in groups:
frequencies = sorted([f["frequency"] for f in group])
min = frequencies[0]
max = frequencies[-1]
cf = (min + max) / 2
bw = max - min
logger.debug("group center frequency: {0}, bandwidth: {1}".format(cf, bw))
resampler_props = PropertyLayer()
resampler_props["center_freq"] = cf
# TODO the + 24000 is a temporary fix since the resampling optimizer does not account for required bandwidths
resampler_props["samp_rate"] = bw + 24000
2019-12-31 14:27:33 +00:00
resampler = Resampler(resampler_props, self.source)
resampler.start()
for dial in group:
self.services.append(self.setupService(dial["mode"], dial["frequency"], resampler))
# resampler goes in after the services since it must not be shutdown as long as the services are still running
self.services.append(resampler)
def optimizeResampling(self, freqs, bandwidth):
freqs = sorted(freqs, key=lambda f: f["frequency"])
2019-09-13 21:03:05 +00:00
distances = [
{"frequency": freqs[i]["frequency"], "distance": freqs[i + 1]["frequency"] - freqs[i]["frequency"]}
for i in range(0, len(freqs) - 1)
]
distances = [d for d in distances if d["distance"] > 0]
distances = sorted(distances, key=lambda f: f["distance"], reverse=True)
def calculate_usage(num_splits):
splits = sorted([f["frequency"] for f in distances[0:num_splits]])
previous = 0
groups = []
for split in splits:
groups.append([f for f in freqs if previous < f["frequency"] <= split])
previous = split
groups.append([f for f in freqs if previous < f["frequency"]])
def get_bandwitdh(group):
freqs = sorted([f["frequency"] for f in group])
# the group will process the full BW once, plus the reduced BW once for each group member
return bandwidth + len(group) * (freqs[-1] - freqs[0] + 24000)
total_bandwidth = sum([get_bandwitdh(group) for group in groups])
2019-09-13 21:03:05 +00:00
return {"num_splits": num_splits, "total_bandwidth": total_bandwidth, "groups": groups}
usages = [calculate_usage(i) for i in range(0, len(freqs))]
2019-10-13 16:28:58 +00:00
# another possible outcome might be that it's best not to resample at all. this is a special case.
2019-09-13 21:03:05 +00:00
usages += [{"num_splits": None, "total_bandwidth": bandwidth * len(freqs), "groups": [freqs]}]
results = sorted(usages, key=lambda f: f["total_bandwidth"])
for r in results:
logger.debug("splits: {0}, total: {1}".format(r["num_splits"], r["total_bandwidth"]))
best = results[0]
if best["num_splits"] is None:
return None
return best["groups"]
def setupService(self, mode, frequency, source):
2019-07-21 21:39:11 +00:00
logger.debug("setting up service {0} on frequency {1}".format(mode, frequency))
# TODO selecting outputs will need some more intelligence here
if mode == "packet":
output = AprsServiceOutput(frequency)
else:
output = WsjtServiceOutput(frequency)
d = dsp(output)
d.nc_port = source.getPort()
d.set_offset_freq(frequency - source.getProps()["center_freq"])
2019-08-11 22:02:39 +00:00
if mode == "packet":
d.set_demodulator("nfm")
d.set_bpf(-4000, 4000)
2019-09-15 10:23:11 +00:00
elif mode == "wspr":
d.set_demodulator("usb")
# WSPR only samples between 1400 and 1600 Hz
d.set_bpf(1350, 1650)
2019-08-11 22:02:39 +00:00
else:
d.set_demodulator("usb")
d.set_bpf(0, 3000)
2019-07-21 21:39:11 +00:00
d.set_secondary_demodulator(mode)
d.set_audio_compression("none")
d.set_samp_rate(source.getProps()["samp_rate"])
d.set_service()
2019-07-21 21:39:11 +00:00
d.start()
return d
class WsjtHandler(object):
def write_wsjt_message(self, msg):
pass
class AprsHandler(object):
def write_aprs_data(self, data):
pass
2019-09-15 19:10:30 +00:00
class Services(object):
2019-10-27 11:16:17 +00:00
handlers = []
2019-11-23 00:12:21 +00:00
2019-07-21 21:39:11 +00:00
@staticmethod
2019-09-15 19:10:30 +00:00
def start():
if not Config.get()["services_enabled"]:
2019-07-28 11:29:45 +00:00
return
2019-07-21 21:39:11 +00:00
for source in SdrService.getSources().values():
props = source.getProps()
if "services" not in props or props["services"] is not False:
Services.handlers.append(ServiceHandler(source))
2019-10-27 11:16:17 +00:00
@staticmethod
def stop():
for handler in Services.handlers:
handler.shutdown()
2019-10-27 11:16:17 +00:00
Services.handlers = []