From 8371d3b67a4597c2dbde994bbffc70dec8e7636f Mon Sep 17 00:00:00 2001 From: Jakob Ketterl Date: Sat, 28 Dec 2019 00:26:45 +0100 Subject: [PATCH] refactor sources to be more flexible --- owrx/command.py | 66 +++++++++++++++ owrx/connection.py | 2 +- owrx/source/__init__.py | 170 +++++++++++++++------------------------ owrx/source/airspy.py | 6 +- owrx/source/connector.py | 57 +++++++------ owrx/source/direct.py | 53 ++++++++++++ owrx/source/fifi.py | 18 ++++- owrx/source/hackrf.py | 22 ++++- owrx/source/resampler.py | 75 +++-------------- owrx/source/rtl_sdr.py | 15 +--- owrx/source/sdrplay.py | 15 ---- owrx/source/soapy.py | 17 +++- 12 files changed, 283 insertions(+), 233 deletions(-) create mode 100644 owrx/command.py create mode 100644 owrx/source/direct.py diff --git a/owrx/command.py b/owrx/command.py new file mode 100644 index 0000000..c9b4a66 --- /dev/null +++ b/owrx/command.py @@ -0,0 +1,66 @@ +from abc import ABC, abstractmethod + + +class CommandMapper(object): + def __init__(self, base=None, mappings={}, static=None): + self.base = base + self.mappings = mappings + self.static = static + + def map(self, values): + args = [self.mappings[k].map(v) for k, v in values.items() if k in self.mappings] + args = [a for a in args if a != ""] + options = " ".join(args) + command = "{0} {1}".format(self.base, options) + if self.static is not None: + command += " " + self.static + return command + + def setMapping(self, key, mapping): + self.mappings[key] = mapping + return self + + def setMappings(self, mappings): + for k, v in mappings.items(): + self.setMapping(k, v) + return self + + def setBase(self, base): + self.base = base + return self + + def setStatic(self, static): + self.static = static + return self + + +class CommandMapping(ABC): + @abstractmethod + def map(self, value): + pass + + +class Flag(CommandMapping): + def __init__(self, flag): + self.flag = flag + + def map(self, value): + if value is not None and value: + return self.flag + else: + return "" + + +class Option(CommandMapping): + def __init__(self, option): + self.option = option + + def map(self, value): + if value is not None: + if isinstance(value, str) and " " in value: + template = "{0} \"{1}\"" + else: + template = "{0} {1}" + return template.format(self.option, value) + else: + return "" diff --git a/owrx/connection.py b/owrx/connection.py index 8958d7f..8dc1f9a 100644 --- a/owrx/connection.py +++ b/owrx/connection.py @@ -248,7 +248,7 @@ class OpenWebRxReceiverClient(Client): # only the keys in the protected property manager can be overridden from the web protected = ( self.sdr.getProps() - .collect("samp_rate", "center_freq", "rf_gain", "type", "if_gain") + .collect("samp_rate", "center_freq", "rf_gain", "type") .defaults(PropertyManager.getSharedInstance()) ) for key, value in params.items(): diff --git a/owrx/source/__init__.py b/owrx/source/__init__.py index d146745..a1e0068 100644 --- a/owrx/source/__init__.py +++ b/owrx/source/__init__.py @@ -6,13 +6,15 @@ import socket import shlex import time import signal +from abc import ABC, abstractmethod +from owrx.command import CommandMapper import logging logger = logging.getLogger(__name__) -class SdrSource(object): +class SdrSource(ABC): STATE_STOPPED = 0 STATE_STARTING = 1 STATE_RUNNING = 2 @@ -34,6 +36,7 @@ class SdrSource(object): self.activateProfile() self.rtlProps = self.props.collect(*self.getEventNames()).defaults(PropertyManager.getSharedInstance()) self.wireEvents() + self.commandMapper = CommandMapper() self.port = port self.monitor = None @@ -49,32 +52,24 @@ class SdrSource(object): def getEventNames(self): return [ "samp_rate", - "nmux_memory", "center_freq", "ppm", "rf_gain", - "lna_gain", - "rf_amp", - "antenna", - "if_gain", "lfo_offset", ] - def wireEvents(self): - def restart(name, value): - logger.debug("restarting sdr source due to property change: {0} changed to {1}".format(name, value)) - self.stop() - self.start() + def getCommandMapper(self): + return self.commandMapper - self.rtlProps.wire(restart) - - # override this in subclasses - def getCommand(self): + @abstractmethod + def onPropertyChange(self, name, value): pass - # override this in subclasses, if necessary - def getFormatConversion(self): - return None + def wireEvents(self): + self.rtlProps.wire(self.onPropertyChange) + + def getCommand(self): + return [self.getCommandMapper().map(self.getCommandValues())] def activateProfile(self, profile_id=None): profiles = self.props["profiles"] @@ -113,9 +108,6 @@ class SdrSource(object): def getPort(self): return self.port - def useNmux(self): - return True - def getCommandValues(self): dict = self.rtlProps.collect(*self.getEventNames()).__dict__() if "lfo_offset" in dict and dict["lfo_offset"] is not None: @@ -125,81 +117,58 @@ class SdrSource(object): return dict def start(self): - self.modificationLock.acquire() - if self.monitor: - self.modificationLock.release() - return - - props = self.rtlProps - - cmd = self.getCommand().format(**self.getCommandValues()) - - format_conversion = self.getFormatConversion() - if format_conversion is not None: - cmd += " | " + format_conversion - - if self.useNmux(): - nmux_bufcnt = nmux_bufsize = 0 - while nmux_bufsize < props["samp_rate"] / 4: - nmux_bufsize += 4096 - while nmux_bufsize * nmux_bufcnt < props["nmux_memory"] * 1e6: - nmux_bufcnt += 1 - if nmux_bufcnt == 0 or nmux_bufsize == 0: - logger.error( - "Error: nmux_bufsize or nmux_bufcnt is zero. These depend on nmux_memory and samp_rate options in config_webrx.py" - ) - self.modificationLock.release() + with self.modificationLock: + if self.monitor: return - logger.debug("nmux_bufsize = %d, nmux_bufcnt = %d" % (nmux_bufsize, nmux_bufcnt)) - cmd = cmd + " | nmux --bufsize %d --bufcnt %d --port %d --address 127.0.0.1" % ( - nmux_bufsize, - nmux_bufcnt, - self.port, - ) - # don't use shell mode for commands without piping - if "|" in cmd: - self.process = subprocess.Popen(cmd, shell=True, preexec_fn=os.setpgrp) - else: - # preexec_fn can go as soon as there's no piped commands left - # the os.killpg call must be replaced with something more reasonable at the same time - self.process = subprocess.Popen(shlex.split(cmd), preexec_fn=os.setpgrp) - logger.info("Started rtl source: " + cmd) + cmd = self.getCommand() + cmd = [c for c in cmd if c is not None] - available = False + # don't use shell mode for commands without piping + if len(cmd) > 1: + # multiple commands with pipes + cmd = "|".join(cmd) + self.process = subprocess.Popen(cmd, shell=True, preexec_fn=os.setpgrp) + else: + # single command + cmd = cmd[0] + # preexec_fn can go as soon as there's no piped commands left + # the os.killpg call must be replaced with something more reasonable at the same time + self.process = subprocess.Popen(shlex.split(cmd), preexec_fn=os.setpgrp) + logger.info("Started rtl source: " + cmd) - def wait_for_process_to_end(): - rc = self.process.wait() - logger.debug("shut down with RC={0}".format(rc)) - self.monitor = None + available = False - self.monitor = threading.Thread(target=wait_for_process_to_end) - self.monitor.start() + def wait_for_process_to_end(): + rc = self.process.wait() + logger.debug("shut down with RC={0}".format(rc)) + self.monitor = None + + self.monitor = threading.Thread(target=wait_for_process_to_end) + self.monitor.start() + + retries = 1000 + while retries > 0: + retries -= 1 + if self.monitor is None: + break + testsock = socket.socket() + try: + testsock.connect(("127.0.0.1", self.getPort())) + testsock.close() + available = True + break + except: + time.sleep(0.1) + + if not available: + self.failed = True - retries = 1000 - while retries > 0: - retries -= 1 - if self.monitor is None: - break - testsock = socket.socket() try: - testsock.connect(("127.0.0.1", self.getPort())) - testsock.close() - available = True - break - except: - time.sleep(0.1) - - if not available: - self.failed = True - - try: - self.postStart() - except Exception: - logger.exception("Exception during postStart()") - self.failed = True - - self.modificationLock.release() + self.postStart() + except Exception: + logger.exception("Exception during postStart()") + self.failed = True self.setState(SdrSource.STATE_FAILED if self.failed else SdrSource.STATE_RUNNING) @@ -215,24 +184,19 @@ class SdrSource(object): def stop(self): self.setState(SdrSource.STATE_STOPPING) - self.modificationLock.acquire() + with self.modificationLock: - if self.process is not None: - try: - os.killpg(os.getpgid(self.process.pid), signal.SIGTERM) - except ProcessLookupError: - # been killed by something else, ignore - pass - if self.monitor: - self.monitor.join() - self.sleepOnRestart() - self.modificationLock.release() + if self.process is not None: + try: + os.killpg(os.getpgid(self.process.pid), signal.SIGTERM) + except ProcessLookupError: + # been killed by something else, ignore + pass + if self.monitor: + self.monitor.join() self.setState(SdrSource.STATE_STOPPED) - def sleepOnRestart(self): - pass - def hasClients(self, *args): clients = [c for c in self.clients if c.getClientClass() in args] return len(clients) > 0 diff --git a/owrx/source/airspy.py b/owrx/source/airspy.py index d0cd928..086d2f3 100644 --- a/owrx/source/airspy.py +++ b/owrx/source/airspy.py @@ -8,11 +8,12 @@ class AirspySource(SoapyConnectorSource): def getEventNames(self): return super().getEventNames() + ["bias_tee"] + ''' def getCommand(self): - cmd = ( + cmd = [ "soapy_connector -p {port} -c {controlPort}".format(port=self.port, controlPort=self.controlPort) + ' -s {samp_rate} -f {tuner_freq} -g "{rf_gain}" -P {ppm} -d "{device}"' - ) + ] values = self.getCommandValues() if values["iqswap"]: cmd += " -i" @@ -21,3 +22,4 @@ class AirspySource(SoapyConnectorSource): if values["bias_tee"]: cmd += " -t biastee=true" return cmd + ''' diff --git a/owrx/source/connector.py b/owrx/source/connector.py index 5c794a0..0ba4476 100644 --- a/owrx/source/connector.py +++ b/owrx/source/connector.py @@ -1,6 +1,7 @@ from . import SdrSource from owrx.socket import getAvailablePort import socket +from owrx.command import CommandMapper, Flag, Option import logging @@ -12,16 +13,22 @@ class ConnectorSource(SdrSource): super().__init__(id, props, port) self.controlSocket = None self.controlPort = getAvailablePort() + self.getCommandMapper().setMappings({ + "samp_rate": Option("-s"), + "tuner_freq": Option("-f"), + "port": Option("-p"), + "controlPort": Option("-c"), + "device": Option("-d"), + "iqswap": Flag("-i"), + "rtltcp_compat": Flag("-r"), + "ppm": Option("-p"), + "rf_gain": Option("-g") + }) def getEventNames(self): - return [ - "samp_rate", - "center_freq", - "ppm", - "rf_gain", + return super().getEventNames() + [ "device", "iqswap", - "lfo_offset", "rtltcp_compat", ] @@ -29,21 +36,18 @@ class ConnectorSource(SdrSource): logger.debug("sending property change over control socket: {0} changed to {1}".format(prop, value)) self.controlSocket.sendall("{prop}:{value}\n".format(prop=prop, value=value).encode()) - def wireEvents(self): - def reconfigure(prop, value): - if self.monitor is None: - return - if ( - (prop == "center_freq" or prop == "lfo_offset") - and "lfo_offset" in self.rtlProps - and self.rtlProps["lfo_offset"] is not None - ): - freq = self.rtlProps["center_freq"] + self.rtlProps["lfo_offset"] - self.sendControlMessage("center_freq", freq) - else: - self.sendControlMessage(prop, value) - - self.rtlProps.wire(reconfigure) + def onPropertyChange(self, prop, value): + if self.monitor is None: + return + if ( + (prop == "center_freq" or prop == "lfo_offset") + and "lfo_offset" in self.rtlProps + and self.rtlProps["lfo_offset"] is not None + ): + freq = self.rtlProps["center_freq"] + self.rtlProps["lfo_offset"] + self.sendControlMessage("center_freq", freq) + else: + self.sendControlMessage(prop, value) def postStart(self): logger.debug("opening control socket...") @@ -56,8 +60,11 @@ class ConnectorSource(SdrSource): self.controlSocket.close() self.controlSocket = None - def getFormatConversion(self): - return None + def getControlPort(self): + return self.controlPort - def useNmux(self): - return False + def getCommandValues(self): + values = super().getCommandValues() + values["port"] = self.getPort() + values["controlPort"] = self.getControlPort() + return values diff --git a/owrx/source/direct.py b/owrx/source/direct.py new file mode 100644 index 0000000..a2d0a2c --- /dev/null +++ b/owrx/source/direct.py @@ -0,0 +1,53 @@ +from abc import ABCMeta +from . import SdrSource + +import logging + +logger = logging.getLogger(__name__) + + +class DirectSource(SdrSource, metaclass=ABCMeta): + def onPropertyChange(self, name, value): + logger.debug("restarting sdr source due to property change: {0} changed to {1}".format(name, value)) + self.stop() + self.sleepOnRestart() + self.start() + + def getEventNames(self): + return super().getEventNames() + [ + "nmux_memory", + ] + + def getNmuxCommand(self): + props = self.rtlProps + + nmux_bufcnt = nmux_bufsize = 0 + while nmux_bufsize < props["samp_rate"] / 4: + nmux_bufsize += 4096 + while nmux_bufsize * nmux_bufcnt < props["nmux_memory"] * 1e6: + nmux_bufcnt += 1 + if nmux_bufcnt == 0 or nmux_bufsize == 0: + raise ValueError( + "Error: nmux_bufsize or nmux_bufcnt is zero. " + "These depend on nmux_memory and samp_rate options in config_webrx.py" + ) + + return "nmux --bufsize %d --bufcnt %d --port %d --address 127.0.0.1" % ( + nmux_bufsize, + nmux_bufcnt, + self.port, + ) + + def getCommand(self): + return super().getCommand() + [ + self.getFormatConversion(), + self.getNmuxCommand(), + ] + + # override this in subclasses, if necessary + def getFormatConversion(self): + return None + + # override this in subclasses, if necessary + def sleepOnRestart(self): + pass diff --git a/owrx/source/fifi.py b/owrx/source/fifi.py index 7f2852d..07f267a 100644 --- a/owrx/source/fifi.py +++ b/owrx/source/fifi.py @@ -1,9 +1,19 @@ -from . import SdrSource +from owrx.command import Option +from .direct import DirectSource -class FifiSdrSource(SdrSource): - def getCommand(self): - return "arecord -D hw:2,0 -f S16_LE -r {samp_rate} -c2 -" +class FifiSdrSource(DirectSource): + def __init__(self, id, props, port): + super().__init__(id, props, port) + self.getCommandMapper().setBase("arecord").setMappings({ + "device": Option("-D"), + "samp_rate": Option("-r") + }).setStatic("-f S16_LE -c2 -") + + def getEventNames(self): + return super().getEventNames() + [ + "device" + ] def getFormatConversion(self): return "csdr convert_s16_f | csdr gain_ff 30" diff --git a/owrx/source/hackrf.py b/owrx/source/hackrf.py index c27505f..5a576e9 100644 --- a/owrx/source/hackrf.py +++ b/owrx/source/hackrf.py @@ -1,9 +1,23 @@ -from . import SdrSource +from .direct import DirectSource +from owrx.command import Flag, Option -class HackrfSource(SdrSource): - def getCommand(self): - return "hackrf_transfer -s {samp_rate} -f {tuner_freq} -g {rf_gain} -l{lna_gain} -a{rf_amp} -r-" +class HackrfSource(DirectSource): + def __init__(self, id, props, port): + super().__init__(id, props, port) + self.getCommandMapper().setBase("hackrf_transfer").setMappings({ + "samp_rate": Option("-s"), + "tuner_freq": Option("-f"), + "rf_gain": Option("-g"), + "lna_gain": Option("-l"), + "rf_amp": Option("-a") + }).setStatic("-r-") + + def getEventNames(self): + return super().getEventNames() + [ + "lna_gain", + "rf_amp", + ] def getFormatConversion(self): return "csdr convert_s8_f" diff --git a/owrx/source/resampler.py b/owrx/source/resampler.py index ba50ee5..f2f5e87 100644 --- a/owrx/source/resampler.py +++ b/owrx/source/resampler.py @@ -1,3 +1,4 @@ +from .direct import DirectSource from . import SdrSource import subprocess import threading @@ -10,7 +11,10 @@ import logging logger = logging.getLogger(__name__) -class Resampler(SdrSource): +class Resampler(DirectSource): + def onPropertyChange(self, name, value): + logger.warning("Resampler is unable to handle property change ({0} changed to {1})".format(name, value)) + def __init__(self, props, port, sdr): sdrProps = sdr.getProps() self.shift = (sdrProps["center_freq"] - props["center_freq"]) / sdrProps["samp_rate"] @@ -22,77 +26,16 @@ class Resampler(SdrSource): self.sdr = sdr super().__init__(None, props, port) - def start(self): - if self.isFailed(): - return - - self.modificationLock.acquire() - if self.monitor: - self.modificationLock.release() - return - - self.setState(SdrSource.STATE_STARTING) - - props = self.rtlProps - - resampler_command = [ + def getCommand(self): + return [ "nc -v 127.0.0.1 {nc_port}".format(nc_port=self.sdr.getPort()), "csdr shift_addition_cc {shift}".format(shift=self.shift), "csdr fir_decimate_cc {decimation} {ddc_transition_bw} HAMMING".format( decimation=self.decimation, ddc_transition_bw=self.transition_bw ), + self.getNmuxCommand() ] - nmux_bufcnt = nmux_bufsize = 0 - while nmux_bufsize < props["samp_rate"] / 4: - nmux_bufsize += 4096 - while nmux_bufsize * nmux_bufcnt < props["nmux_memory"] * 1e6: - nmux_bufcnt += 1 - if nmux_bufcnt == 0 or nmux_bufsize == 0: - logger.error( - "Error: nmux_bufsize or nmux_bufcnt is zero. These depend on nmux_memory and samp_rate options in config_webrx.py" - ) - self.modificationLock.release() - return - logger.debug("nmux_bufsize = %d, nmux_bufcnt = %d" % (nmux_bufsize, nmux_bufcnt)) - resampler_command += [ - "nmux --bufsize %d --bufcnt %d --port %d --address 127.0.0.1" % (nmux_bufsize, nmux_bufcnt, self.port) - ] - cmd = " | ".join(resampler_command) - logger.debug("resampler command: %s", cmd) - self.process = subprocess.Popen(cmd, shell=True, preexec_fn=os.setpgrp) - logger.info("Started resampler source: " + cmd) - - available = False - - def wait_for_process_to_end(): - rc = self.process.wait() - logger.debug("shut down with RC={0}".format(rc)) - self.monitor = None - - self.monitor = threading.Thread(target=wait_for_process_to_end) - self.monitor.start() - - retries = 1000 - while retries > 0: - retries -= 1 - if self.monitor is None: - break - testsock = socket.socket() - try: - testsock.connect(("127.0.0.1", self.getPort())) - testsock.close() - available = True - break - except: - time.sleep(0.1) - - if not available: - self.failed = True - - self.modificationLock.release() - - self.setState(SdrSource.STATE_FAILED if self.failed else SdrSource.STATE_RUNNING) - def activateProfile(self, profile_id=None): + logger.warning("Resampler does not support setting profiles") pass diff --git a/owrx/source/rtl_sdr.py b/owrx/source/rtl_sdr.py index b4199b1..e7b53da 100644 --- a/owrx/source/rtl_sdr.py +++ b/owrx/source/rtl_sdr.py @@ -2,15 +2,6 @@ from .connector import ConnectorSource class RtlSdrSource(ConnectorSource): - def getCommand(self): - cmd = ( - "rtl_connector -p {port} -c {controlPort}".format(port=self.port, controlPort=self.controlPort) - + " -s {samp_rate} -f {tuner_freq} -g {rf_gain} -P {ppm}" - ) - if "device" in self.rtlProps and self.rtlProps["device"] is not None: - cmd += ' -d "{device}"' - if self.rtlProps["iqswap"]: - cmd += " -i" - if self.rtlProps["rtltcp_compat"]: - cmd += " -r" - return cmd + def __init__(self, id, props, port): + super().__init__(id, props, port) + self.getCommandMapper().setBase("rtl_connector") diff --git a/owrx/source/sdrplay.py b/owrx/source/sdrplay.py index 6ad0c23..dac2983 100644 --- a/owrx/source/sdrplay.py +++ b/owrx/source/sdrplay.py @@ -4,18 +4,3 @@ from .soapy import SoapyConnectorSource class SdrplaySource(SoapyConnectorSource): def getDriver(self): return "sdrplay" - - def getEventNames(self): - return super().getEventNames() + ["antenna"] - - def getCommand(self): - cmd = ( - "soapy_connector -p {port} -c {controlPort}".format(port=self.port, controlPort=self.controlPort) - + ' -s {samp_rate} -f {tuner_freq} -g "{rf_gain}" -P {ppm} -a "{antenna}" -d "{device}"' - ) - values = self.getCommandValues() - if values["iqswap"]: - cmd += " -i" - if self.rtlProps["rtltcp_compat"]: - cmd += " -r" - return cmd diff --git a/owrx/source/soapy.py b/owrx/source/soapy.py index 34b2d09..2f090a8 100644 --- a/owrx/source/soapy.py +++ b/owrx/source/soapy.py @@ -1,14 +1,29 @@ +from abc import ABCMeta, abstractmethod +from owrx.command import Option + from .connector import ConnectorSource -class SoapyConnectorSource(ConnectorSource): +class SoapyConnectorSource(ConnectorSource, metaclass=ABCMeta): + def __init__(self, id, props, port): + super().__init__(id, props, port) + self.getCommandMapper().setBase("soapy_connector").setMappings({ + "antenna": Option("-a") + }) + """ must be implemented by child classes to be able to build a driver-based device selector by default. return value must be the corresponding soapy driver identifier. """ + @abstractmethod def getDriver(self): pass + def getEventNames(self): + return super().getEventNames() + [ + "antenna", + ] + def parseDeviceString(self, dstr): def decodeComponent(c):