From dd3a970497365304561e8642e20ae22e9c493c7c Mon Sep 17 00:00:00 2001 From: Jakob Ketterl Date: Fri, 10 May 2019 18:30:53 +0200 Subject: [PATCH] various changes to stabilize sdr switchovers --- owrx/config.py | 2 ++ owrx/controllers.py | 2 +- owrx/source.py | 77 ++++++++++++++++++++++++++++++--------------- 3 files changed, 54 insertions(+), 27 deletions(-) diff --git a/owrx/config.py b/owrx/config.py index 3424a41..14780fb 100644 --- a/owrx/config.py +++ b/owrx/config.py @@ -7,6 +7,8 @@ class Property(object): def getValue(self): return self.value def setValue(self, value): + if (self.value == value): + return self self.value = value for c in self.callbacks: try: diff --git a/owrx/controllers.py b/owrx/controllers.py index 239ee72..d033b2a 100644 --- a/owrx/controllers.py +++ b/owrx/controllers.py @@ -1,7 +1,7 @@ import mimetypes from owrx.websocket import WebSocketConnection from owrx.config import PropertyManager -from owrx.source import SpectrumThread, DspManager, CpuUsageThread, SdrService +from owrx.source import DspManager, CpuUsageThread, SdrService import json import os from datetime import datetime diff --git a/owrx/source.py b/owrx/source.py index 650c15b..9475f67 100644 --- a/owrx/source.py +++ b/owrx/source.py @@ -6,6 +6,7 @@ import time import os import signal import sys +import socket class SdrService(object): sdrProps = None @@ -76,12 +77,8 @@ class SdrSource(object): def restart(name, value): print("restarting sdr source due to property change: {0} changed to {1}".format(name, value)) - for c in self.clients: - c.onSdrPause() self.stop() self.start() - for c in self.clients: - c.onSdrResume() self.rtlProps.wire(restart) self.port = port self.monitor = None @@ -148,6 +145,16 @@ class SdrSource(object): self.process = subprocess.Popen(cmd, shell=True, preexec_fn=os.setpgrp) print("[RtlNmuxSource] Started rtl source: " + cmd) + while True: + testsock = socket.socket() + try: + testsock.connect(("127.0.0.1", self.getPort())) + testsock.close() + break + except: + time.sleep(0.1) + + def wait_for_process_to_end(): rc = self.process.wait() print("[RtlNmuxSource] shut down with RC={0}".format(rc)) @@ -158,12 +165,21 @@ class SdrSource(object): self.modificationLock.release() + for c in self.clients: + c.onSdrAvailable() + + def isAvailable(self): + return self.monitor is not None + def getSpectrumThread(self): if self.spectrumThread is None: self.spectrumThread = SpectrumThread(self) return self.spectrumThread def stop(self): + for c in self.clients: + c.onSdrUnavailable() + self.modificationLock.acquire() try: os.killpg(os.getpgid(self.process.pid), signal.SIGTERM) @@ -214,12 +230,12 @@ class SpectrumThread(object): self.clients = [] self.doRun = False self.sdrSource = sdrSource + self.sdrSource.addClient(self) def start(self): threading.Thread(target = self.run).start() def run(self): - self.sdrSource.addClient(self) props = self.sdrSource.props.collect( "samp_rate", "fft_size", "fft_fps", "fft_voverlap_factor", "fft_compression", "csdr_dynamic_bufsize", "csdr_print_bufsizes", "csdr_through" @@ -262,13 +278,12 @@ class SpectrumThread(object): c.write_spectrum_data(data) dsp.stop() - self.sdrSource.removeClient(self) print("spectrum thread shut down") def add_client(self, c): self.clients.append(c) - if not self.doRun: - self.doRun = True + self.doRun = self.sdrSource.isAvailable() + if self.doRun: self.start() def remove_client(self, c): @@ -277,24 +292,28 @@ class SpectrumThread(object): except ValueError: pass if not self.clients: - self.shutdown() + self.stop() - def shutdown(self): - print("shutting down spectrum thread") + def stop(self): + print("stopping spectrum thread") self.doRun = False - def onSdrPause(self): - if self.dsp is not None: - self.dsp.stop() - def onSdrResume(self): - if self.dsp is not None: - self.dsp.start() + def onSdrAvailable(self): + print("Spectrum Thread: onSdrAvailable") + self.doRun = bool(self.clients) + if self.doRun: + self.start() + + def onSdrUnavailable(self): + print("Spectrum Thread: onSdrUnavailable") + self.stop() class DspManager(object): def __init__(self, handler, sdrSource): - self.doRun = True + self.doRun = False self.handler = handler self.sdrSource = sdrSource + self.dsp = None self.sdrSource.addClient(self) self.localProps = self.sdrSource.getProps().collect( @@ -360,9 +379,11 @@ class DspManager(object): super().__init__() def start(self): - self.dsp.start() - threading.Thread(target = self.readDspOutput).start() - threading.Thread(target = self.readSMeterOutput).start() + self.doRun = self.sdrSource.isAvailable() + if self.doRun: + self.dsp.start() + threading.Thread(target = self.readDspOutput).start() + threading.Thread(target = self.readSMeterOutput).start() def startSecondaryThreads(self): self.runSecondary = True @@ -408,14 +429,18 @@ class DspManager(object): def setProperty(self, prop, value): self.localProps.getProperty(prop).setValue(value) - def onSdrPause(self): + def onSdrAvailable(self): + if not self.doRun: + self.doRun = True + if self.dsp is not None: + self.dsp.start() + threading.Thread(target = self.readDspOutput).start() + threading.Thread(target = self.readSMeterOutput).start() + + def onSdrUnavailable(self): if self.dsp is not None: self.dsp.stop() - def onSdrResume(self): - if self.dsp is not None: - self.dsp.start() - class CpuUsageThread(threading.Thread): sharedInstance = None @staticmethod