various changes to stabilize sdr switchovers

This commit is contained in:
Jakob Ketterl 2019-05-10 18:30:53 +02:00
parent b17364e701
commit dd3a970497
3 changed files with 54 additions and 27 deletions

View File

@ -7,6 +7,8 @@ class Property(object):
def getValue(self): def getValue(self):
return self.value return self.value
def setValue(self, value): def setValue(self, value):
if (self.value == value):
return self
self.value = value self.value = value
for c in self.callbacks: for c in self.callbacks:
try: try:

View File

@ -1,7 +1,7 @@
import mimetypes import mimetypes
from owrx.websocket import WebSocketConnection from owrx.websocket import WebSocketConnection
from owrx.config import PropertyManager from owrx.config import PropertyManager
from owrx.source import SpectrumThread, DspManager, CpuUsageThread, SdrService from owrx.source import DspManager, CpuUsageThread, SdrService
import json import json
import os import os
from datetime import datetime from datetime import datetime

View File

@ -6,6 +6,7 @@ import time
import os import os
import signal import signal
import sys import sys
import socket
class SdrService(object): class SdrService(object):
sdrProps = None sdrProps = None
@ -76,12 +77,8 @@ class SdrSource(object):
def restart(name, value): def restart(name, value):
print("restarting sdr source due to property change: {0} changed to {1}".format(name, value)) print("restarting sdr source due to property change: {0} changed to {1}".format(name, value))
for c in self.clients:
c.onSdrPause()
self.stop() self.stop()
self.start() self.start()
for c in self.clients:
c.onSdrResume()
self.rtlProps.wire(restart) self.rtlProps.wire(restart)
self.port = port self.port = port
self.monitor = None self.monitor = None
@ -148,6 +145,16 @@ class SdrSource(object):
self.process = subprocess.Popen(cmd, shell=True, preexec_fn=os.setpgrp) self.process = subprocess.Popen(cmd, shell=True, preexec_fn=os.setpgrp)
print("[RtlNmuxSource] Started rtl source: " + cmd) print("[RtlNmuxSource] Started rtl source: " + cmd)
while True:
testsock = socket.socket()
try:
testsock.connect(("127.0.0.1", self.getPort()))
testsock.close()
break
except:
time.sleep(0.1)
def wait_for_process_to_end(): def wait_for_process_to_end():
rc = self.process.wait() rc = self.process.wait()
print("[RtlNmuxSource] shut down with RC={0}".format(rc)) print("[RtlNmuxSource] shut down with RC={0}".format(rc))
@ -158,12 +165,21 @@ class SdrSource(object):
self.modificationLock.release() self.modificationLock.release()
for c in self.clients:
c.onSdrAvailable()
def isAvailable(self):
return self.monitor is not None
def getSpectrumThread(self): def getSpectrumThread(self):
if self.spectrumThread is None: if self.spectrumThread is None:
self.spectrumThread = SpectrumThread(self) self.spectrumThread = SpectrumThread(self)
return self.spectrumThread return self.spectrumThread
def stop(self): def stop(self):
for c in self.clients:
c.onSdrUnavailable()
self.modificationLock.acquire() self.modificationLock.acquire()
try: try:
os.killpg(os.getpgid(self.process.pid), signal.SIGTERM) os.killpg(os.getpgid(self.process.pid), signal.SIGTERM)
@ -214,12 +230,12 @@ class SpectrumThread(object):
self.clients = [] self.clients = []
self.doRun = False self.doRun = False
self.sdrSource = sdrSource self.sdrSource = sdrSource
self.sdrSource.addClient(self)
def start(self): def start(self):
threading.Thread(target = self.run).start() threading.Thread(target = self.run).start()
def run(self): def run(self):
self.sdrSource.addClient(self)
props = self.sdrSource.props.collect( props = self.sdrSource.props.collect(
"samp_rate", "fft_size", "fft_fps", "fft_voverlap_factor", "fft_compression", "samp_rate", "fft_size", "fft_fps", "fft_voverlap_factor", "fft_compression",
"csdr_dynamic_bufsize", "csdr_print_bufsizes", "csdr_through" "csdr_dynamic_bufsize", "csdr_print_bufsizes", "csdr_through"
@ -262,13 +278,12 @@ class SpectrumThread(object):
c.write_spectrum_data(data) c.write_spectrum_data(data)
dsp.stop() dsp.stop()
self.sdrSource.removeClient(self)
print("spectrum thread shut down") print("spectrum thread shut down")
def add_client(self, c): def add_client(self, c):
self.clients.append(c) self.clients.append(c)
if not self.doRun: self.doRun = self.sdrSource.isAvailable()
self.doRun = True if self.doRun:
self.start() self.start()
def remove_client(self, c): def remove_client(self, c):
@ -277,24 +292,28 @@ class SpectrumThread(object):
except ValueError: except ValueError:
pass pass
if not self.clients: if not self.clients:
self.shutdown() self.stop()
def shutdown(self): def stop(self):
print("shutting down spectrum thread") print("stopping spectrum thread")
self.doRun = False self.doRun = False
def onSdrPause(self): def onSdrAvailable(self):
if self.dsp is not None: print("Spectrum Thread: onSdrAvailable")
self.dsp.stop() self.doRun = bool(self.clients)
def onSdrResume(self): if self.doRun:
if self.dsp is not None: self.start()
self.dsp.start()
def onSdrUnavailable(self):
print("Spectrum Thread: onSdrUnavailable")
self.stop()
class DspManager(object): class DspManager(object):
def __init__(self, handler, sdrSource): def __init__(self, handler, sdrSource):
self.doRun = True self.doRun = False
self.handler = handler self.handler = handler
self.sdrSource = sdrSource self.sdrSource = sdrSource
self.dsp = None
self.sdrSource.addClient(self) self.sdrSource.addClient(self)
self.localProps = self.sdrSource.getProps().collect( self.localProps = self.sdrSource.getProps().collect(
@ -360,9 +379,11 @@ class DspManager(object):
super().__init__() super().__init__()
def start(self): def start(self):
self.dsp.start() self.doRun = self.sdrSource.isAvailable()
threading.Thread(target = self.readDspOutput).start() if self.doRun:
threading.Thread(target = self.readSMeterOutput).start() self.dsp.start()
threading.Thread(target = self.readDspOutput).start()
threading.Thread(target = self.readSMeterOutput).start()
def startSecondaryThreads(self): def startSecondaryThreads(self):
self.runSecondary = True self.runSecondary = True
@ -408,14 +429,18 @@ class DspManager(object):
def setProperty(self, prop, value): def setProperty(self, prop, value):
self.localProps.getProperty(prop).setValue(value) self.localProps.getProperty(prop).setValue(value)
def onSdrPause(self): def onSdrAvailable(self):
if not self.doRun:
self.doRun = True
if self.dsp is not None:
self.dsp.start()
threading.Thread(target = self.readDspOutput).start()
threading.Thread(target = self.readSMeterOutput).start()
def onSdrUnavailable(self):
if self.dsp is not None: if self.dsp is not None:
self.dsp.stop() self.dsp.stop()
def onSdrResume(self):
if self.dsp is not None:
self.dsp.start()
class CpuUsageThread(threading.Thread): class CpuUsageThread(threading.Thread):
sharedInstance = None sharedInstance = None
@staticmethod @staticmethod