refactor sources to be more flexible

This commit is contained in:
Jakob Ketterl
2019-12-28 00:26:45 +01:00
parent ca4d9771cc
commit 8371d3b67a
12 changed files with 283 additions and 233 deletions

View File

@ -6,13 +6,15 @@ import socket
import shlex
import time
import signal
from abc import ABC, abstractmethod
from owrx.command import CommandMapper
import logging
logger = logging.getLogger(__name__)
class SdrSource(object):
class SdrSource(ABC):
STATE_STOPPED = 0
STATE_STARTING = 1
STATE_RUNNING = 2
@ -34,6 +36,7 @@ class SdrSource(object):
self.activateProfile()
self.rtlProps = self.props.collect(*self.getEventNames()).defaults(PropertyManager.getSharedInstance())
self.wireEvents()
self.commandMapper = CommandMapper()
self.port = port
self.monitor = None
@ -49,32 +52,24 @@ class SdrSource(object):
def getEventNames(self):
return [
"samp_rate",
"nmux_memory",
"center_freq",
"ppm",
"rf_gain",
"lna_gain",
"rf_amp",
"antenna",
"if_gain",
"lfo_offset",
]
def wireEvents(self):
def restart(name, value):
logger.debug("restarting sdr source due to property change: {0} changed to {1}".format(name, value))
self.stop()
self.start()
def getCommandMapper(self):
return self.commandMapper
self.rtlProps.wire(restart)
# override this in subclasses
def getCommand(self):
@abstractmethod
def onPropertyChange(self, name, value):
pass
# override this in subclasses, if necessary
def getFormatConversion(self):
return None
def wireEvents(self):
self.rtlProps.wire(self.onPropertyChange)
def getCommand(self):
return [self.getCommandMapper().map(self.getCommandValues())]
def activateProfile(self, profile_id=None):
profiles = self.props["profiles"]
@ -113,9 +108,6 @@ class SdrSource(object):
def getPort(self):
return self.port
def useNmux(self):
return True
def getCommandValues(self):
dict = self.rtlProps.collect(*self.getEventNames()).__dict__()
if "lfo_offset" in dict and dict["lfo_offset"] is not None:
@ -125,81 +117,58 @@ class SdrSource(object):
return dict
def start(self):
self.modificationLock.acquire()
if self.monitor:
self.modificationLock.release()
return
props = self.rtlProps
cmd = self.getCommand().format(**self.getCommandValues())
format_conversion = self.getFormatConversion()
if format_conversion is not None:
cmd += " | " + format_conversion
if self.useNmux():
nmux_bufcnt = nmux_bufsize = 0
while nmux_bufsize < props["samp_rate"] / 4:
nmux_bufsize += 4096
while nmux_bufsize * nmux_bufcnt < props["nmux_memory"] * 1e6:
nmux_bufcnt += 1
if nmux_bufcnt == 0 or nmux_bufsize == 0:
logger.error(
"Error: nmux_bufsize or nmux_bufcnt is zero. These depend on nmux_memory and samp_rate options in config_webrx.py"
)
self.modificationLock.release()
with self.modificationLock:
if self.monitor:
return
logger.debug("nmux_bufsize = %d, nmux_bufcnt = %d" % (nmux_bufsize, nmux_bufcnt))
cmd = cmd + " | nmux --bufsize %d --bufcnt %d --port %d --address 127.0.0.1" % (
nmux_bufsize,
nmux_bufcnt,
self.port,
)
# don't use shell mode for commands without piping
if "|" in cmd:
self.process = subprocess.Popen(cmd, shell=True, preexec_fn=os.setpgrp)
else:
# preexec_fn can go as soon as there's no piped commands left
# the os.killpg call must be replaced with something more reasonable at the same time
self.process = subprocess.Popen(shlex.split(cmd), preexec_fn=os.setpgrp)
logger.info("Started rtl source: " + cmd)
cmd = self.getCommand()
cmd = [c for c in cmd if c is not None]
available = False
# don't use shell mode for commands without piping
if len(cmd) > 1:
# multiple commands with pipes
cmd = "|".join(cmd)
self.process = subprocess.Popen(cmd, shell=True, preexec_fn=os.setpgrp)
else:
# single command
cmd = cmd[0]
# preexec_fn can go as soon as there's no piped commands left
# the os.killpg call must be replaced with something more reasonable at the same time
self.process = subprocess.Popen(shlex.split(cmd), preexec_fn=os.setpgrp)
logger.info("Started rtl source: " + cmd)
def wait_for_process_to_end():
rc = self.process.wait()
logger.debug("shut down with RC={0}".format(rc))
self.monitor = None
available = False
self.monitor = threading.Thread(target=wait_for_process_to_end)
self.monitor.start()
def wait_for_process_to_end():
rc = self.process.wait()
logger.debug("shut down with RC={0}".format(rc))
self.monitor = None
self.monitor = threading.Thread(target=wait_for_process_to_end)
self.monitor.start()
retries = 1000
while retries > 0:
retries -= 1
if self.monitor is None:
break
testsock = socket.socket()
try:
testsock.connect(("127.0.0.1", self.getPort()))
testsock.close()
available = True
break
except:
time.sleep(0.1)
if not available:
self.failed = True
retries = 1000
while retries > 0:
retries -= 1
if self.monitor is None:
break
testsock = socket.socket()
try:
testsock.connect(("127.0.0.1", self.getPort()))
testsock.close()
available = True
break
except:
time.sleep(0.1)
if not available:
self.failed = True
try:
self.postStart()
except Exception:
logger.exception("Exception during postStart()")
self.failed = True
self.modificationLock.release()
self.postStart()
except Exception:
logger.exception("Exception during postStart()")
self.failed = True
self.setState(SdrSource.STATE_FAILED if self.failed else SdrSource.STATE_RUNNING)
@ -215,24 +184,19 @@ class SdrSource(object):
def stop(self):
self.setState(SdrSource.STATE_STOPPING)
self.modificationLock.acquire()
with self.modificationLock:
if self.process is not None:
try:
os.killpg(os.getpgid(self.process.pid), signal.SIGTERM)
except ProcessLookupError:
# been killed by something else, ignore
pass
if self.monitor:
self.monitor.join()
self.sleepOnRestart()
self.modificationLock.release()
if self.process is not None:
try:
os.killpg(os.getpgid(self.process.pid), signal.SIGTERM)
except ProcessLookupError:
# been killed by something else, ignore
pass
if self.monitor:
self.monitor.join()
self.setState(SdrSource.STATE_STOPPED)
def sleepOnRestart(self):
pass
def hasClients(self, *args):
clients = [c for c in self.clients if c.getClientClass() in args]
return len(clients) > 0