From 08e0a0af19b1568ee5eb62bc395b9ad1a398c6b2 Mon Sep 17 00:00:00 2001 From: Jakob Ketterl Date: Fri, 10 May 2019 14:58:25 +0200 Subject: [PATCH] start and shutdown dsps in a more controlled manner --- owrx/controllers.py | 3 +-- owrx/source.py | 61 +++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 57 insertions(+), 7 deletions(-) diff --git a/owrx/controllers.py b/owrx/controllers.py index 8676915..b18dfe9 100644 --- a/owrx/controllers.py +++ b/owrx/controllers.py @@ -97,7 +97,6 @@ class OpenWebRxClient(object): self.configProps.unwire(self.sendConfig) self.sdr = SdrService.getSource(id) - self.sdr.start() # send initial config self.configProps = self.sdr.getProps().collect(*OpenWebRxClient.config_keys).defaults(PropertyManager.getSharedInstance()) @@ -105,7 +104,7 @@ class OpenWebRxClient(object): self.configProps.wire(self.sendConfig) self.sendConfig(None, None) - self.sdr.spectrumThread.add_client(self) + self.sdr.getSpectrumThread().add_client(self) def startDsp(self): if self.dsp is None: diff --git a/owrx/source.py b/owrx/source.py index 247e935..4584520 100644 --- a/owrx/source.py +++ b/owrx/source.py @@ -68,11 +68,18 @@ class SdrSource(object): def restart(name, value): print("restarting sdr source due to property change: {0} changed to {1}".format(name, value)) + for c in self.clients: + c.onSdrPause() self.stop() self.start() + for c in self.clients: + c.onSdrResume() self.rtlProps.wire(restart) self.port = port self.monitor = None + self.clients = [] + self.spectrumThread = None + self.modificationLock = threading.Lock() # override these in subclasses as necessary self.command = None @@ -85,7 +92,10 @@ class SdrSource(object): return self.port def start(self): - if self.monitor: return + self.modificationLock.acquire() + if self.monitor: + self.modificationLock.release() + return props = self.rtlProps @@ -106,13 +116,13 @@ class SdrSource(object): while nmux_bufsize * nmux_bufcnt < props["nmux_memory"] * 1e6: nmux_bufcnt += 1 if nmux_bufcnt == 0 or nmux_bufsize == 0: print("[RtlNmuxSource] Error: nmux_bufsize or nmux_bufcnt is zero. These depend on nmux_memory and samp_rate options in config_webrx.py") + self.modificationLock.release() return print("[RtlNmuxSource] nmux_bufsize = %d, nmux_bufcnt = %d" % (nmux_bufsize, nmux_bufcnt)) cmd = start_sdr_command + " | nmux --bufsize %d --bufcnt %d --port %d --address 127.0.0.1" % (nmux_bufsize, nmux_bufcnt, self.port) self.process = subprocess.Popen(cmd, shell=True, preexec_fn=os.setpgrp) print("[RtlNmuxSource] Started rtl source: " + cmd) - # TODO use this to monitor unexpected failures / shutdowns and react accordingly def wait_for_process_to_end(): rc = self.process.wait() print("[RtlNmuxSource] shut down with RC={0}".format(rc)) @@ -121,16 +131,38 @@ class SdrSource(object): self.monitor = threading.Thread(target = wait_for_process_to_end) self.monitor.start() - self.spectrumThread = SpectrumThread(self) + self.modificationLock.release() + + def getSpectrumThread(self): + if self.spectrumThread is None: + self.spectrumThread = SpectrumThread(self) + return self.spectrumThread def stop(self): - os.killpg(os.getpgid(self.process.pid), signal.SIGTERM) + self.modificationLock.acquire() + try: + os.killpg(os.getpgid(self.process.pid), signal.SIGTERM) + except ProcessLookupError: + # been killed by something else, ignore + pass self.monitor.join() self.sleepOnRestart() + self.modificationLock.release() def sleepOnRestart(self): pass + def addClient(self, c): + self.clients.append(c) + self.start() + def removeClient(self, c): + try: + self.clients.remove(c) + except ValueError: + pass + if not self.clients: + self.stop() + class RtlSdrSource(SdrSource): def __init__(self, props, port): super().__init__(props, port) @@ -162,12 +194,13 @@ class SpectrumThread(object): threading.Thread(target = self.run).start() def run(self): + self.sdrSource.addClient(self) props = self.sdrSource.props.collect( "samp_rate", "fft_size", "fft_fps", "fft_voverlap_factor", "fft_compression", "csdr_dynamic_bufsize", "csdr_print_bufsizes", "csdr_through" ).defaults(PropertyManager.getSharedInstance()) - dsp = csdr.dsp() + self.dsp = dsp = csdr.dsp() dsp.nc_port = self.sdrSource.getPort() dsp.set_demodulator("fft") props.getProperty("samp_rate").wire(dsp.set_samp_rate) @@ -204,6 +237,7 @@ class SpectrumThread(object): c.write_spectrum_data(data) dsp.stop() + self.sdrSource.removeClient(self) print("spectrum thread shut down") def add_client(self, c): @@ -224,11 +258,19 @@ class SpectrumThread(object): print("shutting down spectrum thread") self.doRun = False + def onSdrPause(self): + if self.dsp is not None: + self.dsp.stop() + def onSdrResume(self): + if self.dsp is not None: + self.dsp.start() + class DspManager(object): def __init__(self, handler, sdrSource): self.doRun = True self.handler = handler self.sdrSource = sdrSource + self.sdrSource.addClient(self) self.localProps = self.sdrSource.getProps().collect( "audio_compression", "fft_compression", "digimodes_fft_size", "csdr_dynamic_bufsize", @@ -336,10 +378,19 @@ class DspManager(object): self.doRun = False self.runSecondary = False self.dsp.stop() + self.sdrSource.removeClient(self) def setProperty(self, prop, value): self.localProps.getProperty(prop).setValue(value) + def onSdrPause(self): + if self.dsp is not None: + self.dsp.stop() + + def onSdrResume(self): + if self.dsp is not None: + self.dsp.start() + class CpuUsageThread(threading.Thread): sharedInstance = None @staticmethod