handle failure of sdr devices asynchronously

This commit is contained in:
Jakob Ketterl 2020-09-19 20:45:23 +02:00
parent 31295efbff
commit ff34e793a0
2 changed files with 55 additions and 32 deletions

View File

@ -3,7 +3,7 @@ from owrx.details import ReceiverDetails
from owrx.dsp import DspManager from owrx.dsp import DspManager
from owrx.cpu import CpuUsageThread from owrx.cpu import CpuUsageThread
from owrx.sdr import SdrService from owrx.sdr import SdrService
from owrx.source import SdrSource from owrx.source import SdrSource, SdrSourceEventClient
from owrx.client import ClientRegistry, TooManyClientsException from owrx.client import ClientRegistry, TooManyClientsException
from owrx.feature import FeatureDetector from owrx.feature import FeatureDetector
from owrx.version import openwebrx_version from owrx.version import openwebrx_version
@ -107,7 +107,7 @@ class OpenWebRxClient(Client, metaclass=ABCMeta):
self.send({"type": "receiver_details", "value": details}) self.send({"type": "receiver_details", "value": details})
class OpenWebRxReceiverClient(OpenWebRxClient): class OpenWebRxReceiverClient(OpenWebRxClient, SdrSourceEventClient):
config_keys = [ config_keys = [
"waterfall_colors", "waterfall_colors",
"waterfall_min_level", "waterfall_min_level",
@ -152,6 +152,23 @@ class OpenWebRxReceiverClient(OpenWebRxClient):
CpuUsageThread.getSharedInstance().add_client(self) CpuUsageThread.getSharedInstance().add_client(self)
def onStateChange(self, state):
if state == SdrSource.STATE_RUNNING:
self.handleSdrAvailable()
elif state == SdrSource.STATE_FAILED:
self.handleSdrFailed()
def handleSdrFailed(self):
logger.warning('SDR device "%s" has failed, selecting new device', self.sdr.getName())
self.write_log_message('SDR device "{0}" has failed, selecting new device'.format(self.sdr.getName()))
self.setSdr()
def onBusyStateChange(self, state):
pass
def getClientClass(self):
return SdrSource.CLIENT_USER
def __sendProfiles(self): def __sendProfiles(self):
profiles = [ profiles = [
{"name": s.getName() + " " + p["name"], "id": sid + "|" + pid} {"name": s.getName() + " " + p["name"], "id": sid + "|" + pid}
@ -200,20 +217,14 @@ class OpenWebRxReceiverClient(OpenWebRxClient):
logger.warning("message is not json: {0}".format(message)) logger.warning("message is not json: {0}".format(message))
def setSdr(self, id=None): def setSdr(self, id=None):
while True:
next = None next = None
if id is not None: if id is not None:
next = SdrService.getSource(id) next = SdrService.getSource(id)
if next is None: if next is None:
next = SdrService.getFirstSource() next = SdrService.getFirstSource()
if next is None:
# exit condition: no sdrs available
logger.warning("no more SDR devices available")
self.handleNoSdrsAvailable()
return
# exit condition: no change # exit condition: no change
if next == self.sdr: if next == self.sdr and next is not None:
return return
self.stopDsp() self.stopDsp()
@ -222,17 +233,19 @@ class OpenWebRxReceiverClient(OpenWebRxClient):
self.configSub.cancel() self.configSub.cancel()
self.configSub = None self.configSub = None
if self.sdr is not None:
self.sdr.removeClient(self)
if next is None:
# exit condition: no sdrs available
logger.warning("no more SDR devices available")
self.handleNoSdrsAvailable()
return
self.sdr = next self.sdr = next
self.sdr.addClient(self)
self.getDsp() def handleSdrAvailable(self):
# found a working sdr, exit the loop
if self.sdr.getState() != SdrSource.STATE_FAILED:
break
logger.warning('SDR device "%s" has failed, selecing new device', self.sdr.getName())
self.write_log_message('SDR device "{0}" has failed, selecting new device'.format(self.sdr.getName()))
# send initial config # send initial config
self.getDsp().setProperties(self.connectionProperties) self.getDsp().setProperties(self.connectionProperties)
@ -261,6 +274,7 @@ class OpenWebRxReceiverClient(OpenWebRxClient):
self.__sendProfiles() self.__sendProfiles()
self.sdr.addSpectrumClient(self) self.sdr.addSpectrumClient(self)
self.startDsp()
def handleNoSdrsAvailable(self): def handleNoSdrsAvailable(self):
self.write_sdr_error("No SDR Devices available") self.write_sdr_error("No SDR Devices available")
@ -269,6 +283,8 @@ class OpenWebRxReceiverClient(OpenWebRxClient):
self.getDsp().start() self.getDsp().start()
def close(self): def close(self):
if self.sdr is not None:
self.sdr.removeClient(self)
self.stopDsp() self.stopDsp()
CpuUsageThread.getSharedInstance().remove_client(self) CpuUsageThread.getSharedInstance().remove_client(self)
ClientRegistry.getSharedInstance().removeClient(self) ClientRegistry.getSharedInstance().removeClient(self)

View File

@ -157,6 +157,9 @@ class SdrSource(ABC):
if self.monitor: if self.monitor:
return return
if self.isFailed():
return
try: try:
self.preStart() self.preStart()
except Exception: except Exception:
@ -183,16 +186,18 @@ class SdrSource(ABC):
def wait_for_process_to_end(): def wait_for_process_to_end():
rc = self.process.wait() rc = self.process.wait()
logger.debug("shut down with RC={0}".format(rc)) logger.debug("shut down with RC={0}".format(rc))
self.monitor = None
if self.getState() == SdrSource.STATE_RUNNING: if self.getState() == SdrSource.STATE_RUNNING:
self.failed = True self.failed = True
self.setState(SdrSource.STATE_FAILED) self.setState(SdrSource.STATE_FAILED)
self.monitor = None else:
self.setState(SdrSource.STATE_STOPPED)
self.monitor = threading.Thread(target=wait_for_process_to_end, name="source_monitor") self.monitor = threading.Thread(target=wait_for_process_to_end, name="source_monitor")
self.monitor.start() self.monitor.start()
retries = 1000 retries = 1000
while retries > 0: while retries > 0 and not self.isFailed():
retries -= 1 retries -= 1
if self.monitor is None: if self.monitor is None:
break break
@ -248,14 +253,13 @@ class SdrSource(ABC):
if self.monitor: if self.monitor:
self.monitor.join() self.monitor.join()
self.setState(SdrSource.STATE_STOPPED)
def hasClients(self, *args): def hasClients(self, *args):
clients = [c for c in self.clients if c.getClientClass() in args] clients = [c for c in self.clients if c.getClientClass() in args]
return len(clients) > 0 return len(clients) > 0
def addClient(self, c: SdrSourceEventClient): def addClient(self, c: SdrSourceEventClient):
self.clients.append(c) self.clients.append(c)
c.onStateChange(self.getState())
hasUsers = self.hasClients(SdrSource.CLIENT_USER) hasUsers = self.hasClients(SdrSource.CLIENT_USER)
hasBackgroundTasks = self.hasClients(SdrSource.CLIENT_BACKGROUND) hasBackgroundTasks = self.hasClients(SdrSource.CLIENT_BACKGROUND)
if hasUsers or hasBackgroundTasks: if hasUsers or hasBackgroundTasks:
@ -280,6 +284,9 @@ class SdrSource(ABC):
self.stop() self.stop()
def addSpectrumClient(self, c): def addSpectrumClient(self, c):
if c in self.spectrumClients:
return
# local import due to circular depencency # local import due to circular depencency
from owrx.fft import SpectrumThread from owrx.fft import SpectrumThread