From 292fe80acfa1696edc69a056242d969adf3f4c63 Mon Sep 17 00:00:00 2001 From: Jakob Ketterl Date: Sat, 21 Dec 2019 20:58:28 +0100 Subject: [PATCH] break apart the ever-growing owrx/source.py --- owrx/__main__.py | 2 +- owrx/client.py | 50 ++ owrx/connection.py | 5 +- owrx/controllers.py | 2 +- owrx/cpu.py | 73 +++ owrx/dsp.py | 152 ++++++ owrx/fft.py | 92 ++++ owrx/sdr.py | 90 ++++ owrx/service.py | 5 +- owrx/source.py | 975 --------------------------------------- owrx/source/__init__.py | 286 ++++++++++++ owrx/source/airspy.py | 23 + owrx/source/airspyhf.py | 6 + owrx/source/connector.py | 63 +++ owrx/source/fifi.py | 9 + owrx/source/hackrf.py | 9 + owrx/source/resampler.py | 98 ++++ owrx/source/rtl_sdr.py | 16 + owrx/source/sdrplay.py | 21 + owrx/source/soapy.py | 46 ++ 20 files changed, 1043 insertions(+), 980 deletions(-) create mode 100644 owrx/client.py create mode 100644 owrx/cpu.py create mode 100644 owrx/dsp.py create mode 100644 owrx/fft.py create mode 100644 owrx/sdr.py delete mode 100644 owrx/source.py create mode 100644 owrx/source/__init__.py create mode 100644 owrx/source/airspy.py create mode 100644 owrx/source/airspyhf.py create mode 100644 owrx/source/connector.py create mode 100644 owrx/source/fifi.py create mode 100644 owrx/source/hackrf.py create mode 100644 owrx/source/resampler.py create mode 100644 owrx/source/rtl_sdr.py create mode 100644 owrx/source/sdrplay.py create mode 100644 owrx/source/soapy.py diff --git a/owrx/__main__.py b/owrx/__main__.py index 499731d..d4ef1b6 100644 --- a/owrx/__main__.py +++ b/owrx/__main__.py @@ -2,7 +2,7 @@ from http.server import HTTPServer from owrx.http import RequestHandler from owrx.config import PropertyManager from owrx.feature import FeatureDetector -from owrx.source import SdrService +from owrx.sdr import SdrService from socketserver import ThreadingMixIn from owrx.sdrhu import SdrHuUpdater from owrx.service import Services diff --git a/owrx/client.py b/owrx/client.py new file mode 100644 index 0000000..4fda3d4 --- /dev/null +++ b/owrx/client.py @@ -0,0 +1,50 @@ +from owrx.config import PropertyManager +from owrx.metrics import Metrics, DirectMetric +import threading + +import logging + +logger = logging.getLogger(__name__) + + +class TooManyClientsException(Exception): + pass + + +class ClientRegistry(object): + sharedInstance = None + creationLock = threading.Lock() + + @staticmethod + def getSharedInstance(): + with ClientRegistry.creationLock: + if ClientRegistry.sharedInstance is None: + ClientRegistry.sharedInstance = ClientRegistry() + return ClientRegistry.sharedInstance + + def __init__(self): + self.clients = [] + Metrics.getSharedInstance().addMetric("openwebrx.users", DirectMetric(self.clientCount)) + super().__init__() + + def broadcast(self): + n = self.clientCount() + for c in self.clients: + c.write_clients(n) + + def addClient(self, client): + pm = PropertyManager.getSharedInstance() + if len(self.clients) >= pm["max_clients"]: + raise TooManyClientsException() + self.clients.append(client) + self.broadcast() + + def clientCount(self): + return len(self.clients) + + def removeClient(self, client): + try: + self.clients.remove(client) + except ValueError: + pass + self.broadcast() diff --git a/owrx/connection.py b/owrx/connection.py index ce05f07..58e91d9 100644 --- a/owrx/connection.py +++ b/owrx/connection.py @@ -1,5 +1,8 @@ from owrx.config import PropertyManager -from owrx.source import DspManager, CpuUsageThread, SdrService, ClientRegistry +from owrx.dsp import DspManager +from owrx.cpu import CpuUsageThread +from owrx.sdr import SdrService +from owrx.client import ClientRegistry from owrx.feature import FeatureDetector from owrx.version import openwebrx_version from owrx.bands import Bandplan diff --git a/owrx/controllers.py b/owrx/controllers.py index 93932bb..3b636a2 100644 --- a/owrx/controllers.py +++ b/owrx/controllers.py @@ -6,7 +6,7 @@ from datetime import datetime from string import Template from owrx.websocket import WebSocketConnection from owrx.config import PropertyManager -from owrx.source import ClientRegistry +from owrx.client import ClientRegistry from owrx.connection import WebSocketMessageHandler from owrx.version import openwebrx_version from owrx.feature import FeatureDetector diff --git a/owrx/cpu.py b/owrx/cpu.py new file mode 100644 index 0000000..28c9066 --- /dev/null +++ b/owrx/cpu.py @@ -0,0 +1,73 @@ +import threading + +import logging + +logger = logging.getLogger(__name__) + + +class CpuUsageThread(threading.Thread): + sharedInstance = None + + @staticmethod + def getSharedInstance(): + if CpuUsageThread.sharedInstance is None: + CpuUsageThread.sharedInstance = CpuUsageThread() + return CpuUsageThread.sharedInstance + + def __init__(self): + self.clients = [] + self.doRun = True + self.last_worktime = 0 + self.last_idletime = 0 + self.endEvent = threading.Event() + super().__init__() + + def run(self): + while self.doRun: + try: + cpu_usage = self.get_cpu_usage() + except: + cpu_usage = 0 + for c in self.clients: + c.write_cpu_usage(cpu_usage) + self.endEvent.wait(timeout=3) + logger.debug("cpu usage thread shut down") + + def get_cpu_usage(self): + try: + f = open("/proc/stat", "r") + except: + return 0 # Workaround, possibly we're on a Mac + line = "" + while not "cpu " in line: + line = f.readline() + f.close() + spl = line.split(" ") + worktime = int(spl[2]) + int(spl[3]) + int(spl[4]) + idletime = int(spl[5]) + dworktime = worktime - self.last_worktime + didletime = idletime - self.last_idletime + rate = float(dworktime) / (didletime + dworktime) + self.last_worktime = worktime + self.last_idletime = idletime + if self.last_worktime == 0: + return 0 + return rate + + def add_client(self, c): + self.clients.append(c) + if not self.is_alive(): + self.start() + + def remove_client(self, c): + try: + self.clients.remove(c) + except ValueError: + pass + if not self.clients: + self.shutdown() + + def shutdown(self): + CpuUsageThread.sharedInstance = None + self.doRun = False + self.endEvent.set() diff --git a/owrx/dsp.py b/owrx/dsp.py new file mode 100644 index 0000000..c7f7b76 --- /dev/null +++ b/owrx/dsp.py @@ -0,0 +1,152 @@ +from owrx.config import PropertyManager +from owrx.meta import MetaParser +from owrx.wsjt import WsjtParser +from owrx.aprs import AprsParser +from owrx.source import SdrSource +from csdr import csdr +import threading + +import logging + +logger = logging.getLogger(__name__) + + +class DspManager(csdr.output): + def __init__(self, handler, sdrSource): + self.handler = handler + self.sdrSource = sdrSource + self.metaParser = MetaParser(self.handler) + self.wsjtParser = WsjtParser(self.handler) + self.aprsParser = AprsParser(self.handler) + + self.localProps = ( + self.sdrSource.getProps() + .collect( + "audio_compression", + "fft_compression", + "digimodes_fft_size", + "csdr_dynamic_bufsize", + "csdr_print_bufsizes", + "csdr_through", + "digimodes_enable", + "samp_rate", + "digital_voice_unvoiced_quality", + "dmr_filter", + "temporary_directory", + "center_freq", + ) + .defaults(PropertyManager.getSharedInstance()) + ) + + self.dsp = csdr.dsp(self) + self.dsp.nc_port = self.sdrSource.getPort() + + def set_low_cut(cut): + bpf = self.dsp.get_bpf() + bpf[0] = cut + self.dsp.set_bpf(*bpf) + + def set_high_cut(cut): + bpf = self.dsp.get_bpf() + bpf[1] = cut + self.dsp.set_bpf(*bpf) + + def set_dial_freq(key, value): + freq = self.localProps["center_freq"] + self.localProps["offset_freq"] + self.wsjtParser.setDialFrequency(freq) + self.aprsParser.setDialFrequency(freq) + self.metaParser.setDialFrequency(freq) + + self.subscriptions = [ + self.localProps.getProperty("audio_compression").wire(self.dsp.set_audio_compression), + self.localProps.getProperty("fft_compression").wire(self.dsp.set_fft_compression), + self.localProps.getProperty("digimodes_fft_size").wire(self.dsp.set_secondary_fft_size), + self.localProps.getProperty("samp_rate").wire(self.dsp.set_samp_rate), + self.localProps.getProperty("output_rate").wire(self.dsp.set_output_rate), + self.localProps.getProperty("offset_freq").wire(self.dsp.set_offset_freq), + self.localProps.getProperty("squelch_level").wire(self.dsp.set_squelch_level), + self.localProps.getProperty("low_cut").wire(set_low_cut), + self.localProps.getProperty("high_cut").wire(set_high_cut), + self.localProps.getProperty("mod").wire(self.dsp.set_demodulator), + self.localProps.getProperty("digital_voice_unvoiced_quality").wire(self.dsp.set_unvoiced_quality), + self.localProps.getProperty("dmr_filter").wire(self.dsp.set_dmr_filter), + self.localProps.getProperty("temporary_directory").wire(self.dsp.set_temporary_directory), + self.localProps.collect("center_freq", "offset_freq").wire(set_dial_freq), + ] + + self.dsp.set_offset_freq(0) + self.dsp.set_bpf(-4000, 4000) + self.dsp.csdr_dynamic_bufsize = self.localProps["csdr_dynamic_bufsize"] + self.dsp.csdr_print_bufsizes = self.localProps["csdr_print_bufsizes"] + self.dsp.csdr_through = self.localProps["csdr_through"] + + if self.localProps["digimodes_enable"]: + + def set_secondary_mod(mod): + if mod == False: + mod = None + self.dsp.set_secondary_demodulator(mod) + if mod is not None: + self.handler.write_secondary_dsp_config( + { + "secondary_fft_size": self.localProps["digimodes_fft_size"], + "if_samp_rate": self.dsp.if_samp_rate(), + "secondary_bw": self.dsp.secondary_bw(), + } + ) + + self.subscriptions += [ + self.localProps.getProperty("secondary_mod").wire(set_secondary_mod), + self.localProps.getProperty("secondary_offset_freq").wire(self.dsp.set_secondary_offset_freq), + ] + + self.sdrSource.addClient(self) + + super().__init__() + + def start(self): + if self.sdrSource.isAvailable(): + self.dsp.start() + + def receive_output(self, t, read_fn): + logger.debug("adding new output of type %s", t) + writers = { + "audio": self.handler.write_dsp_data, + "smeter": self.handler.write_s_meter_level, + "secondary_fft": self.handler.write_secondary_fft, + "secondary_demod": self.handler.write_secondary_demod, + "meta": self.metaParser.parse, + "wsjt_demod": self.wsjtParser.parse, + "packet_demod": self.aprsParser.parse, + } + write = writers[t] + + threading.Thread(target=self.pump(read_fn, write)).start() + + def stop(self): + self.dsp.stop() + self.sdrSource.removeClient(self) + for sub in self.subscriptions: + sub.cancel() + self.subscriptions = [] + + def setProperty(self, prop, value): + self.localProps.getProperty(prop).setValue(value) + + def getClientClass(self): + return SdrSource.CLIENT_USER + + def onStateChange(self, state): + if state == SdrSource.STATE_RUNNING: + logger.debug("received STATE_RUNNING, attempting DspSource restart") + self.dsp.start() + elif state == SdrSource.STATE_STOPPING: + logger.debug("received STATE_STOPPING, shutting down DspSource") + self.dsp.stop() + elif state == SdrSource.STATE_FAILED: + logger.debug("received STATE_FAILED, shutting down DspSource") + self.dsp.stop() + self.handler.handleSdrFailure("sdr device failed") + + def onBusyStateChange(self, state): + pass diff --git a/owrx/fft.py b/owrx/fft.py new file mode 100644 index 0000000..246f110 --- /dev/null +++ b/owrx/fft.py @@ -0,0 +1,92 @@ +from owrx.config import PropertyManager +from csdr import csdr +import threading +from owrx.source import SdrSource + +import logging + +logger = logging.getLogger(__name__) + + +class SpectrumThread(csdr.output): + def __init__(self, sdrSource): + self.sdrSource = sdrSource + super().__init__() + + self.props = props = self.sdrSource.props.collect( + "samp_rate", + "fft_size", + "fft_fps", + "fft_voverlap_factor", + "fft_compression", + "csdr_dynamic_bufsize", + "csdr_print_bufsizes", + "csdr_through", + "temporary_directory", + ).defaults(PropertyManager.getSharedInstance()) + + self.dsp = dsp = csdr.dsp(self) + dsp.nc_port = self.sdrSource.getPort() + dsp.set_demodulator("fft") + + def set_fft_averages(key, value): + samp_rate = props["samp_rate"] + fft_size = props["fft_size"] + fft_fps = props["fft_fps"] + fft_voverlap_factor = props["fft_voverlap_factor"] + + dsp.set_fft_averages( + int(round(1.0 * samp_rate / fft_size / fft_fps / (1.0 - fft_voverlap_factor))) + if fft_voverlap_factor > 0 + else 0 + ) + + self.subscriptions = [ + props.getProperty("samp_rate").wire(dsp.set_samp_rate), + props.getProperty("fft_size").wire(dsp.set_fft_size), + props.getProperty("fft_fps").wire(dsp.set_fft_fps), + props.getProperty("fft_compression").wire(dsp.set_fft_compression), + props.getProperty("temporary_directory").wire(dsp.set_temporary_directory), + props.collect("samp_rate", "fft_size", "fft_fps", "fft_voverlap_factor").wire(set_fft_averages), + ] + + set_fft_averages(None, None) + + dsp.csdr_dynamic_bufsize = props["csdr_dynamic_bufsize"] + dsp.csdr_print_bufsizes = props["csdr_print_bufsizes"] + dsp.csdr_through = props["csdr_through"] + logger.debug("Spectrum thread initialized successfully.") + + def start(self): + self.sdrSource.addClient(self) + if self.sdrSource.isAvailable(): + self.dsp.start() + + def supports_type(self, t): + return t == "audio" + + def receive_output(self, type, read_fn): + if self.props["csdr_dynamic_bufsize"]: + read_fn(8) # dummy read to skip bufsize & preamble + logger.debug("Note: CSDR_DYNAMIC_BUFSIZE_ON = 1") + + threading.Thread(target=self.pump(read_fn, self.sdrSource.writeSpectrumData)).start() + + def stop(self): + self.dsp.stop() + self.sdrSource.removeClient(self) + for c in self.subscriptions: + c.cancel() + self.subscriptions = [] + + def getClientClass(self): + return SdrSource.CLIENT_USER + + def onStateChange(self, state): + if state in [SdrSource.STATE_STOPPING, SdrSource.STATE_FAILED]: + self.dsp.stop() + elif state == SdrSource.STATE_RUNNING: + self.dsp.start() + + def onBusyStateChange(self, state): + pass diff --git a/owrx/sdr.py b/owrx/sdr.py new file mode 100644 index 0000000..71bda62 --- /dev/null +++ b/owrx/sdr.py @@ -0,0 +1,90 @@ +from owrx.config import PropertyManager +from owrx.feature import FeatureDetector, UnknownFeatureException +import sys + +import logging + +logger = logging.getLogger(__name__) + + +class SdrService(object): + sdrProps = None + sources = {} + lastPort = None + + @staticmethod + def getNextPort(): + pm = PropertyManager.getSharedInstance() + (start, end) = pm["iq_port_range"] + if SdrService.lastPort is None: + SdrService.lastPort = start + else: + SdrService.lastPort += 1 + if SdrService.lastPort > end: + raise IndexError("no more available ports to start more sdrs") + return SdrService.lastPort + + @staticmethod + def loadProps(): + if SdrService.sdrProps is None: + pm = PropertyManager.getSharedInstance() + featureDetector = FeatureDetector() + + def loadIntoPropertyManager(dict: dict): + propertyManager = PropertyManager() + for (name, value) in dict.items(): + propertyManager[name] = value + return propertyManager + + def sdrTypeAvailable(value): + try: + if not featureDetector.is_available(value["type"]): + logger.error( + 'The RTL source type "{0}" is not available. please check requirements.'.format( + value["type"] + ) + ) + return False + return True + except UnknownFeatureException: + logger.error( + 'The RTL source type "{0}" is invalid. Please check your configuration'.format(value["type"]) + ) + return False + + # transform all dictionary items into PropertyManager object, filtering out unavailable ones + SdrService.sdrProps = { + name: loadIntoPropertyManager(value) for (name, value) in pm["sdrs"].items() if sdrTypeAvailable(value) + } + logger.info( + "SDR sources loaded. Availables SDRs: {0}".format( + ", ".join(map(lambda x: x["name"], SdrService.sdrProps.values())) + ) + ) + + @staticmethod + def getSource(id=None): + SdrService.loadProps() + sources = SdrService.getSources() + if not sources: + return None + if id is None: + # TODO: configure default sdr in config? right now it will pick the first one off the list. + id = list(sources.keys())[0] + + if not id in sources: + return None + return sources[id] + + @staticmethod + def getSources(): + SdrService.loadProps() + for id in SdrService.sdrProps.keys(): + if not id in SdrService.sources: + props = SdrService.sdrProps[id] + sdrType = props["type"] + className = "".join(x for x in sdrType.title() if x.isalnum()) + "Source" + module = __import__("owrx.source.{0}".format(sdrType), fromlist=[className]) + cls = getattr(module, className) + SdrService.sources[id] = cls(id, props, SdrService.getNextPort()) + return {key: s for key, s in SdrService.sources.items() if not s.isFailed()} diff --git a/owrx/service.py b/owrx/service.py index 4e68200..aa65079 100644 --- a/owrx/service.py +++ b/owrx/service.py @@ -1,13 +1,14 @@ import threading from owrx.socket import getAvailablePort from datetime import datetime, timezone, timedelta -from owrx.source import SdrService, SdrSource +from owrx.source import SdrSource +from owrx.sdr import SdrService from owrx.bands import Bandplan from csdr.csdr import dsp, output from owrx.wsjt import WsjtParser from owrx.aprs import AprsParser from owrx.config import PropertyManager -from owrx.source import Resampler +from owrx.source.resampler import Resampler from owrx.feature import FeatureDetector import logging diff --git a/owrx/source.py b/owrx/source.py deleted file mode 100644 index 5593895..0000000 --- a/owrx/source.py +++ /dev/null @@ -1,975 +0,0 @@ -import subprocess -from owrx.config import PropertyManager -from owrx.feature import FeatureDetector, UnknownFeatureException -from owrx.meta import MetaParser -from owrx.wsjt import WsjtParser -from owrx.aprs import AprsParser -from owrx.metrics import Metrics, DirectMetric -from owrx.socket import getAvailablePort -import threading -from csdr import csdr -import time -import os -import signal -import sys -import socket -import logging -import shlex - -logger = logging.getLogger(__name__) - - -class SdrService(object): - sdrProps = None - sources = {} - lastPort = None - - @staticmethod - def getNextPort(): - pm = PropertyManager.getSharedInstance() - (start, end) = pm["iq_port_range"] - if SdrService.lastPort is None: - SdrService.lastPort = start - else: - SdrService.lastPort += 1 - if SdrService.lastPort > end: - raise IndexError("no more available ports to start more sdrs") - return SdrService.lastPort - - @staticmethod - def loadProps(): - if SdrService.sdrProps is None: - pm = PropertyManager.getSharedInstance() - featureDetector = FeatureDetector() - - def loadIntoPropertyManager(dict: dict): - propertyManager = PropertyManager() - for (name, value) in dict.items(): - propertyManager[name] = value - return propertyManager - - def sdrTypeAvailable(value): - try: - if not featureDetector.is_available(value["type"]): - logger.error( - 'The RTL source type "{0}" is not available. please check requirements.'.format( - value["type"] - ) - ) - return False - return True - except UnknownFeatureException: - logger.error( - 'The RTL source type "{0}" is invalid. Please check your configuration'.format(value["type"]) - ) - return False - - # transform all dictionary items into PropertyManager object, filtering out unavailable ones - SdrService.sdrProps = { - name: loadIntoPropertyManager(value) for (name, value) in pm["sdrs"].items() if sdrTypeAvailable(value) - } - logger.info( - "SDR sources loaded. Availables SDRs: {0}".format( - ", ".join(map(lambda x: x["name"], SdrService.sdrProps.values())) - ) - ) - - @staticmethod - def getSource(id=None): - SdrService.loadProps() - sources = SdrService.getSources() - if not sources: - return None - if id is None: - # TODO: configure default sdr in config? right now it will pick the first one off the list. - id = list(sources.keys())[0] - - if not id in sources: - return None - return sources[id] - - @staticmethod - def getSources(): - SdrService.loadProps() - for id in SdrService.sdrProps.keys(): - if not id in SdrService.sources: - props = SdrService.sdrProps[id] - className = "".join(x for x in props["type"].title() if x.isalnum()) + "Source" - cls = getattr(sys.modules[__name__], className) - SdrService.sources[id] = cls(id, props, SdrService.getNextPort()) - return {key: s for key, s in SdrService.sources.items() if not s.isFailed()} - - -class SdrSource(object): - STATE_STOPPED = 0 - STATE_STARTING = 1 - STATE_RUNNING = 2 - STATE_STOPPING = 3 - STATE_TUNING = 4 - STATE_FAILED = 5 - - BUSYSTATE_IDLE = 0 - BUSYSTATE_BUSY = 1 - - CLIENT_INACTIVE = 0 - CLIENT_BACKGROUND = 1 - CLIENT_USER = 2 - - def __init__(self, id, props, port): - self.id = id - self.props = props - self.profile_id = None - self.activateProfile() - self.rtlProps = self.props.collect(*self.getEventNames()).defaults(PropertyManager.getSharedInstance()) - self.wireEvents() - - self.port = port - self.monitor = None - self.clients = [] - self.spectrumClients = [] - self.spectrumThread = None - self.process = None - self.modificationLock = threading.Lock() - self.failed = False - self.state = SdrSource.STATE_STOPPED - self.busyState = SdrSource.BUSYSTATE_IDLE - - def getEventNames(self): - return [ - "samp_rate", - "nmux_memory", - "center_freq", - "ppm", - "rf_gain", - "lna_gain", - "rf_amp", - "antenna", - "if_gain", - "lfo_offset", - ] - - def wireEvents(self): - def restart(name, value): - logger.debug("restarting sdr source due to property change: {0} changed to {1}".format(name, value)) - self.stop() - self.start() - - self.rtlProps.wire(restart) - - # override this in subclasses - def getCommand(self): - pass - - # override this in subclasses, if necessary - def getFormatConversion(self): - return None - - def activateProfile(self, profile_id=None): - profiles = self.props["profiles"] - if profile_id is None: - profile_id = list(profiles.keys())[0] - if profile_id == self.profile_id: - return - logger.debug("activating profile {0}".format(profile_id)) - self.profile_id = profile_id - profile = profiles[profile_id] - self.props["profile_id"] = profile_id - for (key, value) in profile.items(): - # skip the name, that would overwrite the source name. - if key == "name": - continue - self.props[key] = value - - def getId(self): - return self.id - - def getProfileId(self): - return self.profile_id - - def getProfiles(self): - return self.props["profiles"] - - def getName(self): - return self.props["name"] - - def getProps(self): - return self.props - - def getPort(self): - return self.port - - def useNmux(self): - return True - - def getCommandValues(self): - dict = self.rtlProps.collect(*self.getEventNames()).__dict__() - if "lfo_offset" in dict and dict["lfo_offset"] is not None: - dict["tuner_freq"] = dict["center_freq"] + dict["lfo_offset"] - else: - dict["tuner_freq"] = dict["center_freq"] - return dict - - def start(self): - self.modificationLock.acquire() - if self.monitor: - self.modificationLock.release() - return - - props = self.rtlProps - - cmd = self.getCommand().format(**self.getCommandValues()) - - format_conversion = self.getFormatConversion() - if format_conversion is not None: - cmd += " | " + format_conversion - - if self.useNmux(): - nmux_bufcnt = nmux_bufsize = 0 - while nmux_bufsize < props["samp_rate"] / 4: - nmux_bufsize += 4096 - while nmux_bufsize * nmux_bufcnt < props["nmux_memory"] * 1e6: - nmux_bufcnt += 1 - if nmux_bufcnt == 0 or nmux_bufsize == 0: - logger.error( - "Error: nmux_bufsize or nmux_bufcnt is zero. These depend on nmux_memory and samp_rate options in config_webrx.py" - ) - self.modificationLock.release() - return - logger.debug("nmux_bufsize = %d, nmux_bufcnt = %d" % (nmux_bufsize, nmux_bufcnt)) - cmd = cmd + " | nmux --bufsize %d --bufcnt %d --port %d --address 127.0.0.1" % ( - nmux_bufsize, - nmux_bufcnt, - self.port, - ) - - # don't use shell mode for commands without piping - if "|" in cmd: - self.process = subprocess.Popen(cmd, shell=True, preexec_fn=os.setpgrp) - else: - # preexec_fn can go as soon as there's no piped commands left - # the os.killpg call must be replaced with something more reasonable at the same time - self.process = subprocess.Popen(shlex.split(cmd), preexec_fn=os.setpgrp) - logger.info("Started rtl source: " + cmd) - - available = False - - def wait_for_process_to_end(): - rc = self.process.wait() - logger.debug("shut down with RC={0}".format(rc)) - self.monitor = None - - self.monitor = threading.Thread(target=wait_for_process_to_end) - self.monitor.start() - - retries = 1000 - while retries > 0: - retries -= 1 - if self.monitor is None: - break - testsock = socket.socket() - try: - testsock.connect(("127.0.0.1", self.getPort())) - testsock.close() - available = True - break - except: - time.sleep(0.1) - - if not available: - self.failed = True - - self.postStart() - - self.modificationLock.release() - - self.setState(SdrSource.STATE_FAILED if self.failed else SdrSource.STATE_RUNNING) - - def postStart(self): - pass - - def isAvailable(self): - return self.monitor is not None - - def isFailed(self): - return self.failed - - def stop(self): - self.setState(SdrSource.STATE_STOPPING) - - self.modificationLock.acquire() - - if self.process is not None: - try: - os.killpg(os.getpgid(self.process.pid), signal.SIGTERM) - except ProcessLookupError: - # been killed by something else, ignore - pass - if self.monitor: - self.monitor.join() - self.sleepOnRestart() - self.modificationLock.release() - - self.setState(SdrSource.STATE_STOPPED) - - def sleepOnRestart(self): - pass - - def hasClients(self, *args): - clients = [c for c in self.clients if c.getClientClass() in args] - return len(clients) > 0 - - def addClient(self, c): - self.clients.append(c) - hasUsers = self.hasClients(SdrSource.CLIENT_USER) - hasBackgroundTasks = self.hasClients(SdrSource.CLIENT_BACKGROUND) - if hasUsers or hasBackgroundTasks: - self.start() - self.setBusyState(SdrSource.BUSYSTATE_BUSY if hasUsers else SdrSource.BUSYSTATE_IDLE) - - def removeClient(self, c): - try: - self.clients.remove(c) - except ValueError: - pass - - hasUsers = self.hasClients(SdrSource.CLIENT_USER) - hasBackgroundTasks = self.hasClients(SdrSource.CLIENT_BACKGROUND) - self.setBusyState(SdrSource.BUSYSTATE_BUSY if hasUsers else SdrSource.BUSYSTATE_IDLE) - if not hasUsers and not hasBackgroundTasks: - self.stop() - - def addSpectrumClient(self, c): - self.spectrumClients.append(c) - if self.spectrumThread is None: - self.spectrumThread = SpectrumThread(self) - self.spectrumThread.start() - - def removeSpectrumClient(self, c): - try: - self.spectrumClients.remove(c) - except ValueError: - pass - if not self.spectrumClients and self.spectrumThread is not None: - self.spectrumThread.stop() - self.spectrumThread = None - - def writeSpectrumData(self, data): - for c in self.spectrumClients: - c.write_spectrum_data(data) - - def setState(self, state): - if state == self.state: - return - self.state = state - for c in self.clients: - c.onStateChange(state) - - def setBusyState(self, state): - if state == self.busyState: - return - self.busyState = state - for c in self.clients: - c.onBusyStateChange(state) - - -class Resampler(SdrSource): - def __init__(self, props, port, sdr): - sdrProps = sdr.getProps() - self.shift = (sdrProps["center_freq"] - props["center_freq"]) / sdrProps["samp_rate"] - self.decimation = int(float(sdrProps["samp_rate"]) / props["samp_rate"]) - if_samp_rate = sdrProps["samp_rate"] / self.decimation - self.transition_bw = 0.15 * (if_samp_rate / float(sdrProps["samp_rate"])) - props["samp_rate"] = if_samp_rate - - self.sdr = sdr - super().__init__(None, props, port) - - def start(self): - if self.isFailed(): - return - - self.modificationLock.acquire() - if self.monitor: - self.modificationLock.release() - return - - self.setState(SdrSource.STATE_STARTING) - - props = self.rtlProps - - resampler_command = [ - "nc -v 127.0.0.1 {nc_port}".format(nc_port=self.sdr.getPort()), - "csdr shift_addition_cc {shift}".format(shift=self.shift), - "csdr fir_decimate_cc {decimation} {ddc_transition_bw} HAMMING".format( - decimation=self.decimation, ddc_transition_bw=self.transition_bw - ), - ] - - nmux_bufcnt = nmux_bufsize = 0 - while nmux_bufsize < props["samp_rate"] / 4: - nmux_bufsize += 4096 - while nmux_bufsize * nmux_bufcnt < props["nmux_memory"] * 1e6: - nmux_bufcnt += 1 - if nmux_bufcnt == 0 or nmux_bufsize == 0: - logger.error( - "Error: nmux_bufsize or nmux_bufcnt is zero. These depend on nmux_memory and samp_rate options in config_webrx.py" - ) - self.modificationLock.release() - return - logger.debug("nmux_bufsize = %d, nmux_bufcnt = %d" % (nmux_bufsize, nmux_bufcnt)) - resampler_command += [ - "nmux --bufsize %d --bufcnt %d --port %d --address 127.0.0.1" % (nmux_bufsize, nmux_bufcnt, self.port) - ] - cmd = " | ".join(resampler_command) - logger.debug("resampler command: %s", cmd) - self.process = subprocess.Popen(cmd, shell=True, preexec_fn=os.setpgrp) - logger.info("Started resampler source: " + cmd) - - available = False - - def wait_for_process_to_end(): - rc = self.process.wait() - logger.debug("shut down with RC={0}".format(rc)) - self.monitor = None - - self.monitor = threading.Thread(target=wait_for_process_to_end) - self.monitor.start() - - retries = 1000 - while retries > 0: - retries -= 1 - if self.monitor is None: - break - testsock = socket.socket() - try: - testsock.connect(("127.0.0.1", self.getPort())) - testsock.close() - available = True - break - except: - time.sleep(0.1) - - if not available: - self.failed = True - - self.modificationLock.release() - - self.setState(SdrSource.STATE_FAILED if self.failed else SdrSource.STATE_RUNNING) - - def activateProfile(self, profile_id=None): - pass - - -class FifiSdrSource(SdrSource): - def getCommand(self): - return "arecord -D hw:2,0 -f S16_LE -r {samp_rate} -c2 -" - - def getFormatConversion(self): - return "csdr convert_s16_f | csdr gain_ff 30" - - -class ConnectorSource(SdrSource): - def __init__(self, id, props, port): - super().__init__(id, props, port) - self.controlSocket = None - self.controlPort = getAvailablePort() - - def getEventNames(self): - return [ - "samp_rate", - "center_freq", - "ppm", - "rf_gain", - "device", - "iqswap", - "lfo_offset", - "rtltcp_compat", - ] - - def sendControlMessage(self, prop, value): - logger.debug("sending property change over control socket: {0} changed to {1}".format(prop, value)) - self.controlSocket.sendall("{prop}:{value}\n".format(prop=prop, value=value).encode()) - - def wireEvents(self): - def reconfigure(prop, value): - if self.monitor is None: - return - if ( - (prop == "center_freq" or prop == "lfo_offset") - and "lfo_offset" in self.rtlProps - and self.rtlProps["lfo_offset"] is not None - ): - freq = self.rtlProps["center_freq"] + self.rtlProps["lfo_offset"] - self.sendControlMessage("center_freq", freq) - else: - self.sendControlMessage(prop, value) - - self.rtlProps.wire(reconfigure) - - def postStart(self): - logger.debug("opening control socket...") - self.controlSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.controlSocket.connect(("localhost", self.controlPort)) - - def stop(self): - super().stop() - if self.controlSocket: - self.controlSocket.close() - self.controlSocket = None - - def getFormatConversion(self): - return None - - def useNmux(self): - return False - - -class RtlSdrSource(ConnectorSource): - def getCommand(self): - cmd = ( - "rtl_connector -p {port} -c {controlPort}".format(port=self.port, controlPort=self.controlPort) - + " -s {samp_rate} -f {tuner_freq} -g {rf_gain} -P {ppm}" - ) - if "device" in self.rtlProps and self.rtlProps["device"] is not None: - cmd += ' -d "{device}"' - if self.rtlProps["iqswap"]: - cmd += " -i" - if self.rtlProps["rtltcp_compat"]: - cmd += " -r" - return cmd - - -class SoapyConnectorSource(ConnectorSource): - """ - must be implemented by child classes to be able to build a driver-based device selector by default. - return value must be the corresponding soapy driver identifier. - """ - def getDriver(self): - pass - - def parseDeviceString(self, dstr): - - def decodeComponent(c): - kv = c.split("=", 1) - if len(kv) < 2: - return c - else: - return {kv[0]: kv[1]} - - return [decodeComponent(c) for c in dstr.split(",")] - - def encodeDeviceString(self, dobj): - - def encodeComponent(c): - if isinstance(c, str): - return c - else: - return ",".join(["{0}={1}".format(key, value) for key, value in c.items()]) - - return ",".join([encodeComponent(c) for c in dobj]) - - """ - this method always attempts to inject a driver= part into the soapysdr query, depending on what connector was used. - this prevents the soapy_connector from using the wrong device in scenarios where there's no same-type sdrs. - """ - def getCommandValues(self): - values = super().getCommandValues() - if "device" in values and values["device"] is not None: - parsed = self.parseDeviceString(values["device"]) - parsed = [v for v in parsed if "driver" not in v] - parsed += [{"driver": self.getDriver()}] - values["device"] = self.encodeDeviceString(parsed) - else: - values["device"] = "driver={0}".format(self.getDriver()) - return values - - -class SdrplaySource(SoapyConnectorSource): - def getDriver(self): - return "sdrplay" - - def getEventNames(self): - return super().getEventNames() + ["antenna"] - - def getCommand(self): - cmd = ( - "soapy_connector -p {port} -c {controlPort}".format(port=self.port, controlPort=self.controlPort) - + ' -s {samp_rate} -f {tuner_freq} -g "{rf_gain}" -P {ppm} -a "{antenna}" -d "{device}"' - ) - values = self.getCommandValues() - if values["iqswap"]: - cmd += " -i" - if self.rtlProps["rtltcp_compat"]: - cmd += " -r" - return cmd - - -class AirspySource(SoapyConnectorSource): - def getDriver(self): - return "airspy" - - def getEventNames(self): - return super().getEventNames() + ["bias_tee"] - - def getCommand(self): - cmd = ( - "soapy_connector -p {port} -c {controlPort}".format(port=self.port, controlPort=self.controlPort) - + ' -s {samp_rate} -f {tuner_freq} -g "{rf_gain}" -P {ppm} -d "{device}"' - ) - values = self.getCommandValues() - if values["iqswap"]: - cmd += " -i" - if self.rtlProps["rtltcp_compat"]: - cmd += " -r" - if values["bias_tee"]: - cmd += " -t biastee=true" - return cmd - - -class AirspyhfSource(AirspySource): - def getDriver(self): - return "airspyhf" - - -class HackrfSource(SdrSource): - def getCommand(self): - return "hackrf_transfer -s {samp_rate} -f {tuner_freq} -g {rf_gain} -l{lna_gain} -a{rf_amp} -r-" - - def getFormatConversion(self): - return "csdr convert_s8_f" - - -class SpectrumThread(csdr.output): - def __init__(self, sdrSource): - self.sdrSource = sdrSource - super().__init__() - - self.props = props = self.sdrSource.props.collect( - "samp_rate", - "fft_size", - "fft_fps", - "fft_voverlap_factor", - "fft_compression", - "csdr_dynamic_bufsize", - "csdr_print_bufsizes", - "csdr_through", - "temporary_directory", - ).defaults(PropertyManager.getSharedInstance()) - - self.dsp = dsp = csdr.dsp(self) - dsp.nc_port = self.sdrSource.getPort() - dsp.set_demodulator("fft") - - def set_fft_averages(key, value): - samp_rate = props["samp_rate"] - fft_size = props["fft_size"] - fft_fps = props["fft_fps"] - fft_voverlap_factor = props["fft_voverlap_factor"] - - dsp.set_fft_averages( - int(round(1.0 * samp_rate / fft_size / fft_fps / (1.0 - fft_voverlap_factor))) - if fft_voverlap_factor > 0 - else 0 - ) - - self.subscriptions = [ - props.getProperty("samp_rate").wire(dsp.set_samp_rate), - props.getProperty("fft_size").wire(dsp.set_fft_size), - props.getProperty("fft_fps").wire(dsp.set_fft_fps), - props.getProperty("fft_compression").wire(dsp.set_fft_compression), - props.getProperty("temporary_directory").wire(dsp.set_temporary_directory), - props.collect("samp_rate", "fft_size", "fft_fps", "fft_voverlap_factor").wire(set_fft_averages), - ] - - set_fft_averages(None, None) - - dsp.csdr_dynamic_bufsize = props["csdr_dynamic_bufsize"] - dsp.csdr_print_bufsizes = props["csdr_print_bufsizes"] - dsp.csdr_through = props["csdr_through"] - logger.debug("Spectrum thread initialized successfully.") - - def start(self): - self.sdrSource.addClient(self) - if self.sdrSource.isAvailable(): - self.dsp.start() - - def supports_type(self, t): - return t == "audio" - - def receive_output(self, type, read_fn): - if self.props["csdr_dynamic_bufsize"]: - read_fn(8) # dummy read to skip bufsize & preamble - logger.debug("Note: CSDR_DYNAMIC_BUFSIZE_ON = 1") - - threading.Thread(target=self.pump(read_fn, self.sdrSource.writeSpectrumData)).start() - - def stop(self): - self.dsp.stop() - self.sdrSource.removeClient(self) - for c in self.subscriptions: - c.cancel() - self.subscriptions = [] - - def getClientClass(self): - return SdrSource.CLIENT_USER - - def onStateChange(self, state): - if state in [SdrSource.STATE_STOPPING, SdrSource.STATE_FAILED]: - self.dsp.stop() - elif state == SdrSource.STATE_RUNNING: - self.dsp.start() - - def onBusyStateChange(self, state): - pass - - -class DspManager(csdr.output): - def __init__(self, handler, sdrSource): - self.handler = handler - self.sdrSource = sdrSource - self.metaParser = MetaParser(self.handler) - self.wsjtParser = WsjtParser(self.handler) - self.aprsParser = AprsParser(self.handler) - - self.localProps = ( - self.sdrSource.getProps() - .collect( - "audio_compression", - "fft_compression", - "digimodes_fft_size", - "csdr_dynamic_bufsize", - "csdr_print_bufsizes", - "csdr_through", - "digimodes_enable", - "samp_rate", - "digital_voice_unvoiced_quality", - "dmr_filter", - "temporary_directory", - "center_freq", - ) - .defaults(PropertyManager.getSharedInstance()) - ) - - self.dsp = csdr.dsp(self) - self.dsp.nc_port = self.sdrSource.getPort() - - def set_low_cut(cut): - bpf = self.dsp.get_bpf() - bpf[0] = cut - self.dsp.set_bpf(*bpf) - - def set_high_cut(cut): - bpf = self.dsp.get_bpf() - bpf[1] = cut - self.dsp.set_bpf(*bpf) - - def set_dial_freq(key, value): - freq = self.localProps["center_freq"] + self.localProps["offset_freq"] - self.wsjtParser.setDialFrequency(freq) - self.aprsParser.setDialFrequency(freq) - self.metaParser.setDialFrequency(freq) - - self.subscriptions = [ - self.localProps.getProperty("audio_compression").wire(self.dsp.set_audio_compression), - self.localProps.getProperty("fft_compression").wire(self.dsp.set_fft_compression), - self.localProps.getProperty("digimodes_fft_size").wire(self.dsp.set_secondary_fft_size), - self.localProps.getProperty("samp_rate").wire(self.dsp.set_samp_rate), - self.localProps.getProperty("output_rate").wire(self.dsp.set_output_rate), - self.localProps.getProperty("offset_freq").wire(self.dsp.set_offset_freq), - self.localProps.getProperty("squelch_level").wire(self.dsp.set_squelch_level), - self.localProps.getProperty("low_cut").wire(set_low_cut), - self.localProps.getProperty("high_cut").wire(set_high_cut), - self.localProps.getProperty("mod").wire(self.dsp.set_demodulator), - self.localProps.getProperty("digital_voice_unvoiced_quality").wire(self.dsp.set_unvoiced_quality), - self.localProps.getProperty("dmr_filter").wire(self.dsp.set_dmr_filter), - self.localProps.getProperty("temporary_directory").wire(self.dsp.set_temporary_directory), - self.localProps.collect("center_freq", "offset_freq").wire(set_dial_freq), - ] - - self.dsp.set_offset_freq(0) - self.dsp.set_bpf(-4000, 4000) - self.dsp.csdr_dynamic_bufsize = self.localProps["csdr_dynamic_bufsize"] - self.dsp.csdr_print_bufsizes = self.localProps["csdr_print_bufsizes"] - self.dsp.csdr_through = self.localProps["csdr_through"] - - if self.localProps["digimodes_enable"]: - - def set_secondary_mod(mod): - if mod == False: - mod = None - self.dsp.set_secondary_demodulator(mod) - if mod is not None: - self.handler.write_secondary_dsp_config( - { - "secondary_fft_size": self.localProps["digimodes_fft_size"], - "if_samp_rate": self.dsp.if_samp_rate(), - "secondary_bw": self.dsp.secondary_bw(), - } - ) - - self.subscriptions += [ - self.localProps.getProperty("secondary_mod").wire(set_secondary_mod), - self.localProps.getProperty("secondary_offset_freq").wire(self.dsp.set_secondary_offset_freq), - ] - - self.sdrSource.addClient(self) - - super().__init__() - - def start(self): - if self.sdrSource.isAvailable(): - self.dsp.start() - - def receive_output(self, t, read_fn): - logger.debug("adding new output of type %s", t) - writers = { - "audio": self.handler.write_dsp_data, - "smeter": self.handler.write_s_meter_level, - "secondary_fft": self.handler.write_secondary_fft, - "secondary_demod": self.handler.write_secondary_demod, - "meta": self.metaParser.parse, - "wsjt_demod": self.wsjtParser.parse, - "packet_demod": self.aprsParser.parse, - } - write = writers[t] - - threading.Thread(target=self.pump(read_fn, write)).start() - - def stop(self): - self.dsp.stop() - self.sdrSource.removeClient(self) - for sub in self.subscriptions: - sub.cancel() - self.subscriptions = [] - - def setProperty(self, prop, value): - self.localProps.getProperty(prop).setValue(value) - - def getClientClass(self): - return SdrSource.CLIENT_USER - - def onStateChange(self, state): - if state == SdrSource.STATE_RUNNING: - logger.debug("received STATE_RUNNING, attempting DspSource restart") - self.dsp.start() - elif state == SdrSource.STATE_STOPPING: - logger.debug("received STATE_STOPPING, shutting down DspSource") - self.dsp.stop() - elif state == SdrSource.STATE_FAILED: - logger.debug("received STATE_FAILED, shutting down DspSource") - self.dsp.stop() - self.handler.handleSdrFailure("sdr device failed") - - def onBusyStateChange(self, state): - pass - - -class CpuUsageThread(threading.Thread): - sharedInstance = None - - @staticmethod - def getSharedInstance(): - if CpuUsageThread.sharedInstance is None: - CpuUsageThread.sharedInstance = CpuUsageThread() - return CpuUsageThread.sharedInstance - - def __init__(self): - self.clients = [] - self.doRun = True - self.last_worktime = 0 - self.last_idletime = 0 - self.endEvent = threading.Event() - super().__init__() - - def run(self): - while self.doRun: - try: - cpu_usage = self.get_cpu_usage() - except: - cpu_usage = 0 - for c in self.clients: - c.write_cpu_usage(cpu_usage) - self.endEvent.wait(timeout=3) - logger.debug("cpu usage thread shut down") - - def get_cpu_usage(self): - try: - f = open("/proc/stat", "r") - except: - return 0 # Workaround, possibly we're on a Mac - line = "" - while not "cpu " in line: - line = f.readline() - f.close() - spl = line.split(" ") - worktime = int(spl[2]) + int(spl[3]) + int(spl[4]) - idletime = int(spl[5]) - dworktime = worktime - self.last_worktime - didletime = idletime - self.last_idletime - rate = float(dworktime) / (didletime + dworktime) - self.last_worktime = worktime - self.last_idletime = idletime - if self.last_worktime == 0: - return 0 - return rate - - def add_client(self, c): - self.clients.append(c) - if not self.is_alive(): - self.start() - - def remove_client(self, c): - try: - self.clients.remove(c) - except ValueError: - pass - if not self.clients: - self.shutdown() - - def shutdown(self): - CpuUsageThread.sharedInstance = None - self.doRun = False - self.endEvent.set() - - -class TooManyClientsException(Exception): - pass - - -class ClientRegistry(object): - sharedInstance = None - creationLock = threading.Lock() - - @staticmethod - def getSharedInstance(): - with ClientRegistry.creationLock: - if ClientRegistry.sharedInstance is None: - ClientRegistry.sharedInstance = ClientRegistry() - return ClientRegistry.sharedInstance - - def __init__(self): - self.clients = [] - Metrics.getSharedInstance().addMetric("openwebrx.users", DirectMetric(self.clientCount)) - super().__init__() - - def broadcast(self): - n = self.clientCount() - for c in self.clients: - c.write_clients(n) - - def addClient(self, client): - pm = PropertyManager.getSharedInstance() - if len(self.clients) >= pm["max_clients"]: - raise TooManyClientsException() - self.clients.append(client) - self.broadcast() - - def clientCount(self): - return len(self.clients) - - def removeClient(self, client): - try: - self.clients.remove(client) - except ValueError: - pass - self.broadcast() diff --git a/owrx/source/__init__.py b/owrx/source/__init__.py new file mode 100644 index 0000000..a3c61e5 --- /dev/null +++ b/owrx/source/__init__.py @@ -0,0 +1,286 @@ +from owrx.config import PropertyManager +import threading +import subprocess +import os +import socket +import shlex +import time +import signal + +import logging + +logger = logging.getLogger(__name__) + + +class SdrSource(object): + STATE_STOPPED = 0 + STATE_STARTING = 1 + STATE_RUNNING = 2 + STATE_STOPPING = 3 + STATE_TUNING = 4 + STATE_FAILED = 5 + + BUSYSTATE_IDLE = 0 + BUSYSTATE_BUSY = 1 + + CLIENT_INACTIVE = 0 + CLIENT_BACKGROUND = 1 + CLIENT_USER = 2 + + def __init__(self, id, props, port): + self.id = id + self.props = props + self.profile_id = None + self.activateProfile() + self.rtlProps = self.props.collect(*self.getEventNames()).defaults(PropertyManager.getSharedInstance()) + self.wireEvents() + + self.port = port + self.monitor = None + self.clients = [] + self.spectrumClients = [] + self.spectrumThread = None + self.process = None + self.modificationLock = threading.Lock() + self.failed = False + self.state = SdrSource.STATE_STOPPED + self.busyState = SdrSource.BUSYSTATE_IDLE + + def getEventNames(self): + return [ + "samp_rate", + "nmux_memory", + "center_freq", + "ppm", + "rf_gain", + "lna_gain", + "rf_amp", + "antenna", + "if_gain", + "lfo_offset", + ] + + def wireEvents(self): + def restart(name, value): + logger.debug("restarting sdr source due to property change: {0} changed to {1}".format(name, value)) + self.stop() + self.start() + + self.rtlProps.wire(restart) + + # override this in subclasses + def getCommand(self): + pass + + # override this in subclasses, if necessary + def getFormatConversion(self): + return None + + def activateProfile(self, profile_id=None): + profiles = self.props["profiles"] + if profile_id is None: + profile_id = list(profiles.keys())[0] + if profile_id == self.profile_id: + return + logger.debug("activating profile {0}".format(profile_id)) + self.profile_id = profile_id + profile = profiles[profile_id] + self.props["profile_id"] = profile_id + for (key, value) in profile.items(): + # skip the name, that would overwrite the source name. + if key == "name": + continue + self.props[key] = value + + def getId(self): + return self.id + + def getProfileId(self): + return self.profile_id + + def getProfiles(self): + return self.props["profiles"] + + def getName(self): + return self.props["name"] + + def getProps(self): + return self.props + + def getPort(self): + return self.port + + def useNmux(self): + return True + + def getCommandValues(self): + dict = self.rtlProps.collect(*self.getEventNames()).__dict__() + if "lfo_offset" in dict and dict["lfo_offset"] is not None: + dict["tuner_freq"] = dict["center_freq"] + dict["lfo_offset"] + else: + dict["tuner_freq"] = dict["center_freq"] + return dict + + def start(self): + self.modificationLock.acquire() + if self.monitor: + self.modificationLock.release() + return + + props = self.rtlProps + + cmd = self.getCommand().format(**self.getCommandValues()) + + format_conversion = self.getFormatConversion() + if format_conversion is not None: + cmd += " | " + format_conversion + + if self.useNmux(): + nmux_bufcnt = nmux_bufsize = 0 + while nmux_bufsize < props["samp_rate"] / 4: + nmux_bufsize += 4096 + while nmux_bufsize * nmux_bufcnt < props["nmux_memory"] * 1e6: + nmux_bufcnt += 1 + if nmux_bufcnt == 0 or nmux_bufsize == 0: + logger.error( + "Error: nmux_bufsize or nmux_bufcnt is zero. These depend on nmux_memory and samp_rate options in config_webrx.py" + ) + self.modificationLock.release() + return + logger.debug("nmux_bufsize = %d, nmux_bufcnt = %d" % (nmux_bufsize, nmux_bufcnt)) + cmd = cmd + " | nmux --bufsize %d --bufcnt %d --port %d --address 127.0.0.1" % ( + nmux_bufsize, + nmux_bufcnt, + self.port, + ) + + # don't use shell mode for commands without piping + if "|" in cmd: + self.process = subprocess.Popen(cmd, shell=True, preexec_fn=os.setpgrp) + else: + # preexec_fn can go as soon as there's no piped commands left + # the os.killpg call must be replaced with something more reasonable at the same time + self.process = subprocess.Popen(shlex.split(cmd), preexec_fn=os.setpgrp) + logger.info("Started rtl source: " + cmd) + + available = False + + def wait_for_process_to_end(): + rc = self.process.wait() + logger.debug("shut down with RC={0}".format(rc)) + self.monitor = None + + self.monitor = threading.Thread(target=wait_for_process_to_end) + self.monitor.start() + + retries = 1000 + while retries > 0: + retries -= 1 + if self.monitor is None: + break + testsock = socket.socket() + try: + testsock.connect(("127.0.0.1", self.getPort())) + testsock.close() + available = True + break + except: + time.sleep(0.1) + + if not available: + self.failed = True + + self.postStart() + + self.modificationLock.release() + + self.setState(SdrSource.STATE_FAILED if self.failed else SdrSource.STATE_RUNNING) + + def postStart(self): + pass + + def isAvailable(self): + return self.monitor is not None + + def isFailed(self): + return self.failed + + def stop(self): + self.setState(SdrSource.STATE_STOPPING) + + self.modificationLock.acquire() + + if self.process is not None: + try: + os.killpg(os.getpgid(self.process.pid), signal.SIGTERM) + except ProcessLookupError: + # been killed by something else, ignore + pass + if self.monitor: + self.monitor.join() + self.sleepOnRestart() + self.modificationLock.release() + + self.setState(SdrSource.STATE_STOPPED) + + def sleepOnRestart(self): + pass + + def hasClients(self, *args): + clients = [c for c in self.clients if c.getClientClass() in args] + return len(clients) > 0 + + def addClient(self, c): + self.clients.append(c) + hasUsers = self.hasClients(SdrSource.CLIENT_USER) + hasBackgroundTasks = self.hasClients(SdrSource.CLIENT_BACKGROUND) + if hasUsers or hasBackgroundTasks: + self.start() + self.setBusyState(SdrSource.BUSYSTATE_BUSY if hasUsers else SdrSource.BUSYSTATE_IDLE) + + def removeClient(self, c): + try: + self.clients.remove(c) + except ValueError: + pass + + hasUsers = self.hasClients(SdrSource.CLIENT_USER) + hasBackgroundTasks = self.hasClients(SdrSource.CLIENT_BACKGROUND) + self.setBusyState(SdrSource.BUSYSTATE_BUSY if hasUsers else SdrSource.BUSYSTATE_IDLE) + if not hasUsers and not hasBackgroundTasks: + self.stop() + + def addSpectrumClient(self, c): + self.spectrumClients.append(c) + if self.spectrumThread is None: + # local import due to circular depencency + from owrx.fft import SpectrumThread + self.spectrumThread = SpectrumThread(self) + self.spectrumThread.start() + + def removeSpectrumClient(self, c): + try: + self.spectrumClients.remove(c) + except ValueError: + pass + if not self.spectrumClients and self.spectrumThread is not None: + self.spectrumThread.stop() + self.spectrumThread = None + + def writeSpectrumData(self, data): + for c in self.spectrumClients: + c.write_spectrum_data(data) + + def setState(self, state): + if state == self.state: + return + self.state = state + for c in self.clients: + c.onStateChange(state) + + def setBusyState(self, state): + if state == self.busyState: + return + self.busyState = state + for c in self.clients: + c.onBusyStateChange(state) diff --git a/owrx/source/airspy.py b/owrx/source/airspy.py new file mode 100644 index 0000000..d0cd928 --- /dev/null +++ b/owrx/source/airspy.py @@ -0,0 +1,23 @@ +from .soapy import SoapyConnectorSource + + +class AirspySource(SoapyConnectorSource): + def getDriver(self): + return "airspy" + + def getEventNames(self): + return super().getEventNames() + ["bias_tee"] + + def getCommand(self): + cmd = ( + "soapy_connector -p {port} -c {controlPort}".format(port=self.port, controlPort=self.controlPort) + + ' -s {samp_rate} -f {tuner_freq} -g "{rf_gain}" -P {ppm} -d "{device}"' + ) + values = self.getCommandValues() + if values["iqswap"]: + cmd += " -i" + if self.rtlProps["rtltcp_compat"]: + cmd += " -r" + if values["bias_tee"]: + cmd += " -t biastee=true" + return cmd diff --git a/owrx/source/airspyhf.py b/owrx/source/airspyhf.py new file mode 100644 index 0000000..2324205 --- /dev/null +++ b/owrx/source/airspyhf.py @@ -0,0 +1,6 @@ +from .airspy import AirspySource + + +class AirspyhfSource(AirspySource): + def getDriver(self): + return "airspyhf" diff --git a/owrx/source/connector.py b/owrx/source/connector.py new file mode 100644 index 0000000..5c794a0 --- /dev/null +++ b/owrx/source/connector.py @@ -0,0 +1,63 @@ +from . import SdrSource +from owrx.socket import getAvailablePort +import socket + +import logging + +logger = logging.getLogger(__name__) + + +class ConnectorSource(SdrSource): + def __init__(self, id, props, port): + super().__init__(id, props, port) + self.controlSocket = None + self.controlPort = getAvailablePort() + + def getEventNames(self): + return [ + "samp_rate", + "center_freq", + "ppm", + "rf_gain", + "device", + "iqswap", + "lfo_offset", + "rtltcp_compat", + ] + + def sendControlMessage(self, prop, value): + logger.debug("sending property change over control socket: {0} changed to {1}".format(prop, value)) + self.controlSocket.sendall("{prop}:{value}\n".format(prop=prop, value=value).encode()) + + def wireEvents(self): + def reconfigure(prop, value): + if self.monitor is None: + return + if ( + (prop == "center_freq" or prop == "lfo_offset") + and "lfo_offset" in self.rtlProps + and self.rtlProps["lfo_offset"] is not None + ): + freq = self.rtlProps["center_freq"] + self.rtlProps["lfo_offset"] + self.sendControlMessage("center_freq", freq) + else: + self.sendControlMessage(prop, value) + + self.rtlProps.wire(reconfigure) + + def postStart(self): + logger.debug("opening control socket...") + self.controlSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.controlSocket.connect(("localhost", self.controlPort)) + + def stop(self): + super().stop() + if self.controlSocket: + self.controlSocket.close() + self.controlSocket = None + + def getFormatConversion(self): + return None + + def useNmux(self): + return False diff --git a/owrx/source/fifi.py b/owrx/source/fifi.py new file mode 100644 index 0000000..7f2852d --- /dev/null +++ b/owrx/source/fifi.py @@ -0,0 +1,9 @@ +from . import SdrSource + + +class FifiSdrSource(SdrSource): + def getCommand(self): + return "arecord -D hw:2,0 -f S16_LE -r {samp_rate} -c2 -" + + def getFormatConversion(self): + return "csdr convert_s16_f | csdr gain_ff 30" diff --git a/owrx/source/hackrf.py b/owrx/source/hackrf.py new file mode 100644 index 0000000..c27505f --- /dev/null +++ b/owrx/source/hackrf.py @@ -0,0 +1,9 @@ +from . import SdrSource + + +class HackrfSource(SdrSource): + def getCommand(self): + return "hackrf_transfer -s {samp_rate} -f {tuner_freq} -g {rf_gain} -l{lna_gain} -a{rf_amp} -r-" + + def getFormatConversion(self): + return "csdr convert_s8_f" diff --git a/owrx/source/resampler.py b/owrx/source/resampler.py new file mode 100644 index 0000000..ba50ee5 --- /dev/null +++ b/owrx/source/resampler.py @@ -0,0 +1,98 @@ +from . import SdrSource +import subprocess +import threading +import os +import socket +import time + +import logging + +logger = logging.getLogger(__name__) + + +class Resampler(SdrSource): + def __init__(self, props, port, sdr): + sdrProps = sdr.getProps() + self.shift = (sdrProps["center_freq"] - props["center_freq"]) / sdrProps["samp_rate"] + self.decimation = int(float(sdrProps["samp_rate"]) / props["samp_rate"]) + if_samp_rate = sdrProps["samp_rate"] / self.decimation + self.transition_bw = 0.15 * (if_samp_rate / float(sdrProps["samp_rate"])) + props["samp_rate"] = if_samp_rate + + self.sdr = sdr + super().__init__(None, props, port) + + def start(self): + if self.isFailed(): + return + + self.modificationLock.acquire() + if self.monitor: + self.modificationLock.release() + return + + self.setState(SdrSource.STATE_STARTING) + + props = self.rtlProps + + resampler_command = [ + "nc -v 127.0.0.1 {nc_port}".format(nc_port=self.sdr.getPort()), + "csdr shift_addition_cc {shift}".format(shift=self.shift), + "csdr fir_decimate_cc {decimation} {ddc_transition_bw} HAMMING".format( + decimation=self.decimation, ddc_transition_bw=self.transition_bw + ), + ] + + nmux_bufcnt = nmux_bufsize = 0 + while nmux_bufsize < props["samp_rate"] / 4: + nmux_bufsize += 4096 + while nmux_bufsize * nmux_bufcnt < props["nmux_memory"] * 1e6: + nmux_bufcnt += 1 + if nmux_bufcnt == 0 or nmux_bufsize == 0: + logger.error( + "Error: nmux_bufsize or nmux_bufcnt is zero. These depend on nmux_memory and samp_rate options in config_webrx.py" + ) + self.modificationLock.release() + return + logger.debug("nmux_bufsize = %d, nmux_bufcnt = %d" % (nmux_bufsize, nmux_bufcnt)) + resampler_command += [ + "nmux --bufsize %d --bufcnt %d --port %d --address 127.0.0.1" % (nmux_bufsize, nmux_bufcnt, self.port) + ] + cmd = " | ".join(resampler_command) + logger.debug("resampler command: %s", cmd) + self.process = subprocess.Popen(cmd, shell=True, preexec_fn=os.setpgrp) + logger.info("Started resampler source: " + cmd) + + available = False + + def wait_for_process_to_end(): + rc = self.process.wait() + logger.debug("shut down with RC={0}".format(rc)) + self.monitor = None + + self.monitor = threading.Thread(target=wait_for_process_to_end) + self.monitor.start() + + retries = 1000 + while retries > 0: + retries -= 1 + if self.monitor is None: + break + testsock = socket.socket() + try: + testsock.connect(("127.0.0.1", self.getPort())) + testsock.close() + available = True + break + except: + time.sleep(0.1) + + if not available: + self.failed = True + + self.modificationLock.release() + + self.setState(SdrSource.STATE_FAILED if self.failed else SdrSource.STATE_RUNNING) + + def activateProfile(self, profile_id=None): + pass diff --git a/owrx/source/rtl_sdr.py b/owrx/source/rtl_sdr.py new file mode 100644 index 0000000..b4199b1 --- /dev/null +++ b/owrx/source/rtl_sdr.py @@ -0,0 +1,16 @@ +from .connector import ConnectorSource + + +class RtlSdrSource(ConnectorSource): + def getCommand(self): + cmd = ( + "rtl_connector -p {port} -c {controlPort}".format(port=self.port, controlPort=self.controlPort) + + " -s {samp_rate} -f {tuner_freq} -g {rf_gain} -P {ppm}" + ) + if "device" in self.rtlProps and self.rtlProps["device"] is not None: + cmd += ' -d "{device}"' + if self.rtlProps["iqswap"]: + cmd += " -i" + if self.rtlProps["rtltcp_compat"]: + cmd += " -r" + return cmd diff --git a/owrx/source/sdrplay.py b/owrx/source/sdrplay.py new file mode 100644 index 0000000..6ad0c23 --- /dev/null +++ b/owrx/source/sdrplay.py @@ -0,0 +1,21 @@ +from .soapy import SoapyConnectorSource + + +class SdrplaySource(SoapyConnectorSource): + def getDriver(self): + return "sdrplay" + + def getEventNames(self): + return super().getEventNames() + ["antenna"] + + def getCommand(self): + cmd = ( + "soapy_connector -p {port} -c {controlPort}".format(port=self.port, controlPort=self.controlPort) + + ' -s {samp_rate} -f {tuner_freq} -g "{rf_gain}" -P {ppm} -a "{antenna}" -d "{device}"' + ) + values = self.getCommandValues() + if values["iqswap"]: + cmd += " -i" + if self.rtlProps["rtltcp_compat"]: + cmd += " -r" + return cmd diff --git a/owrx/source/soapy.py b/owrx/source/soapy.py new file mode 100644 index 0000000..34b2d09 --- /dev/null +++ b/owrx/source/soapy.py @@ -0,0 +1,46 @@ +from .connector import ConnectorSource + + +class SoapyConnectorSource(ConnectorSource): + """ + must be implemented by child classes to be able to build a driver-based device selector by default. + return value must be the corresponding soapy driver identifier. + """ + def getDriver(self): + pass + + def parseDeviceString(self, dstr): + + def decodeComponent(c): + kv = c.split("=", 1) + if len(kv) < 2: + return c + else: + return {kv[0]: kv[1]} + + return [decodeComponent(c) for c in dstr.split(",")] + + def encodeDeviceString(self, dobj): + + def encodeComponent(c): + if isinstance(c, str): + return c + else: + return ",".join(["{0}={1}".format(key, value) for key, value in c.items()]) + + return ",".join([encodeComponent(c) for c in dobj]) + + """ + this method always attempts to inject a driver= part into the soapysdr query, depending on what connector was used. + this prevents the soapy_connector from using the wrong device in scenarios where there's no same-type sdrs. + """ + def getCommandValues(self): + values = super().getCommandValues() + if "device" in values and values["device"] is not None: + parsed = self.parseDeviceString(values["device"]) + parsed = [v for v in parsed if "driver" not in v] + parsed += [{"driver": self.getDriver()}] + values["device"] = self.encodeDeviceString(parsed) + else: + values["device"] = "driver={0}".format(self.getDriver()) + return values