start and shutdown dsps in a more controlled manner

This commit is contained in:
Jakob Ketterl 2019-05-10 14:58:25 +02:00
parent b3d5f924c3
commit 08e0a0af19
2 changed files with 57 additions and 7 deletions

View File

@ -97,7 +97,6 @@ class OpenWebRxClient(object):
self.configProps.unwire(self.sendConfig) self.configProps.unwire(self.sendConfig)
self.sdr = SdrService.getSource(id) self.sdr = SdrService.getSource(id)
self.sdr.start()
# send initial config # send initial config
self.configProps = self.sdr.getProps().collect(*OpenWebRxClient.config_keys).defaults(PropertyManager.getSharedInstance()) self.configProps = self.sdr.getProps().collect(*OpenWebRxClient.config_keys).defaults(PropertyManager.getSharedInstance())
@ -105,7 +104,7 @@ class OpenWebRxClient(object):
self.configProps.wire(self.sendConfig) self.configProps.wire(self.sendConfig)
self.sendConfig(None, None) self.sendConfig(None, None)
self.sdr.spectrumThread.add_client(self) self.sdr.getSpectrumThread().add_client(self)
def startDsp(self): def startDsp(self):
if self.dsp is None: if self.dsp is None:

View File

@ -68,11 +68,18 @@ class SdrSource(object):
def restart(name, value): def restart(name, value):
print("restarting sdr source due to property change: {0} changed to {1}".format(name, value)) print("restarting sdr source due to property change: {0} changed to {1}".format(name, value))
for c in self.clients:
c.onSdrPause()
self.stop() self.stop()
self.start() self.start()
for c in self.clients:
c.onSdrResume()
self.rtlProps.wire(restart) self.rtlProps.wire(restart)
self.port = port self.port = port
self.monitor = None self.monitor = None
self.clients = []
self.spectrumThread = None
self.modificationLock = threading.Lock()
# override these in subclasses as necessary # override these in subclasses as necessary
self.command = None self.command = None
@ -85,7 +92,10 @@ class SdrSource(object):
return self.port return self.port
def start(self): def start(self):
if self.monitor: return self.modificationLock.acquire()
if self.monitor:
self.modificationLock.release()
return
props = self.rtlProps props = self.rtlProps
@ -106,13 +116,13 @@ class SdrSource(object):
while nmux_bufsize * nmux_bufcnt < props["nmux_memory"] * 1e6: nmux_bufcnt += 1 while nmux_bufsize * nmux_bufcnt < props["nmux_memory"] * 1e6: nmux_bufcnt += 1
if nmux_bufcnt == 0 or nmux_bufsize == 0: if nmux_bufcnt == 0 or nmux_bufsize == 0:
print("[RtlNmuxSource] Error: nmux_bufsize or nmux_bufcnt is zero. These depend on nmux_memory and samp_rate options in config_webrx.py") print("[RtlNmuxSource] Error: nmux_bufsize or nmux_bufcnt is zero. These depend on nmux_memory and samp_rate options in config_webrx.py")
self.modificationLock.release()
return return
print("[RtlNmuxSource] nmux_bufsize = %d, nmux_bufcnt = %d" % (nmux_bufsize, nmux_bufcnt)) print("[RtlNmuxSource] nmux_bufsize = %d, nmux_bufcnt = %d" % (nmux_bufsize, nmux_bufcnt))
cmd = start_sdr_command + " | nmux --bufsize %d --bufcnt %d --port %d --address 127.0.0.1" % (nmux_bufsize, nmux_bufcnt, self.port) cmd = start_sdr_command + " | nmux --bufsize %d --bufcnt %d --port %d --address 127.0.0.1" % (nmux_bufsize, nmux_bufcnt, self.port)
self.process = subprocess.Popen(cmd, shell=True, preexec_fn=os.setpgrp) self.process = subprocess.Popen(cmd, shell=True, preexec_fn=os.setpgrp)
print("[RtlNmuxSource] Started rtl source: " + cmd) print("[RtlNmuxSource] Started rtl source: " + cmd)
# TODO use this to monitor unexpected failures / shutdowns and react accordingly
def wait_for_process_to_end(): def wait_for_process_to_end():
rc = self.process.wait() rc = self.process.wait()
print("[RtlNmuxSource] shut down with RC={0}".format(rc)) print("[RtlNmuxSource] shut down with RC={0}".format(rc))
@ -121,16 +131,38 @@ class SdrSource(object):
self.monitor = threading.Thread(target = wait_for_process_to_end) self.monitor = threading.Thread(target = wait_for_process_to_end)
self.monitor.start() self.monitor.start()
self.spectrumThread = SpectrumThread(self) self.modificationLock.release()
def getSpectrumThread(self):
if self.spectrumThread is None:
self.spectrumThread = SpectrumThread(self)
return self.spectrumThread
def stop(self): def stop(self):
os.killpg(os.getpgid(self.process.pid), signal.SIGTERM) self.modificationLock.acquire()
try:
os.killpg(os.getpgid(self.process.pid), signal.SIGTERM)
except ProcessLookupError:
# been killed by something else, ignore
pass
self.monitor.join() self.monitor.join()
self.sleepOnRestart() self.sleepOnRestart()
self.modificationLock.release()
def sleepOnRestart(self): def sleepOnRestart(self):
pass pass
def addClient(self, c):
self.clients.append(c)
self.start()
def removeClient(self, c):
try:
self.clients.remove(c)
except ValueError:
pass
if not self.clients:
self.stop()
class RtlSdrSource(SdrSource): class RtlSdrSource(SdrSource):
def __init__(self, props, port): def __init__(self, props, port):
super().__init__(props, port) super().__init__(props, port)
@ -162,12 +194,13 @@ class SpectrumThread(object):
threading.Thread(target = self.run).start() threading.Thread(target = self.run).start()
def run(self): def run(self):
self.sdrSource.addClient(self)
props = self.sdrSource.props.collect( props = self.sdrSource.props.collect(
"samp_rate", "fft_size", "fft_fps", "fft_voverlap_factor", "fft_compression", "samp_rate", "fft_size", "fft_fps", "fft_voverlap_factor", "fft_compression",
"csdr_dynamic_bufsize", "csdr_print_bufsizes", "csdr_through" "csdr_dynamic_bufsize", "csdr_print_bufsizes", "csdr_through"
).defaults(PropertyManager.getSharedInstance()) ).defaults(PropertyManager.getSharedInstance())
dsp = csdr.dsp() self.dsp = dsp = csdr.dsp()
dsp.nc_port = self.sdrSource.getPort() dsp.nc_port = self.sdrSource.getPort()
dsp.set_demodulator("fft") dsp.set_demodulator("fft")
props.getProperty("samp_rate").wire(dsp.set_samp_rate) props.getProperty("samp_rate").wire(dsp.set_samp_rate)
@ -204,6 +237,7 @@ class SpectrumThread(object):
c.write_spectrum_data(data) c.write_spectrum_data(data)
dsp.stop() dsp.stop()
self.sdrSource.removeClient(self)
print("spectrum thread shut down") print("spectrum thread shut down")
def add_client(self, c): def add_client(self, c):
@ -224,11 +258,19 @@ class SpectrumThread(object):
print("shutting down spectrum thread") print("shutting down spectrum thread")
self.doRun = False self.doRun = False
def onSdrPause(self):
if self.dsp is not None:
self.dsp.stop()
def onSdrResume(self):
if self.dsp is not None:
self.dsp.start()
class DspManager(object): class DspManager(object):
def __init__(self, handler, sdrSource): def __init__(self, handler, sdrSource):
self.doRun = True self.doRun = True
self.handler = handler self.handler = handler
self.sdrSource = sdrSource self.sdrSource = sdrSource
self.sdrSource.addClient(self)
self.localProps = self.sdrSource.getProps().collect( self.localProps = self.sdrSource.getProps().collect(
"audio_compression", "fft_compression", "digimodes_fft_size", "csdr_dynamic_bufsize", "audio_compression", "fft_compression", "digimodes_fft_size", "csdr_dynamic_bufsize",
@ -336,10 +378,19 @@ class DspManager(object):
self.doRun = False self.doRun = False
self.runSecondary = False self.runSecondary = False
self.dsp.stop() self.dsp.stop()
self.sdrSource.removeClient(self)
def setProperty(self, prop, value): def setProperty(self, prop, value):
self.localProps.getProperty(prop).setValue(value) self.localProps.getProperty(prop).setValue(value)
def onSdrPause(self):
if self.dsp is not None:
self.dsp.stop()
def onSdrResume(self):
if self.dsp is not None:
self.dsp.start()
class CpuUsageThread(threading.Thread): class CpuUsageThread(threading.Thread):
sharedInstance = None sharedInstance = None
@staticmethod @staticmethod