From ff34e793a0bde02018a25172fb4248dd147ea98d Mon Sep 17 00:00:00 2001 From: Jakob Ketterl Date: Sat, 19 Sep 2020 20:45:23 +0200 Subject: [PATCH] handle failure of sdr devices asynchronously --- owrx/connection.py | 72 +++++++++++++++++++++++++---------------- owrx/source/__init__.py | 15 ++++++--- 2 files changed, 55 insertions(+), 32 deletions(-) diff --git a/owrx/connection.py b/owrx/connection.py index 9672f7a..1dfd537 100644 --- a/owrx/connection.py +++ b/owrx/connection.py @@ -3,7 +3,7 @@ from owrx.details import ReceiverDetails from owrx.dsp import DspManager from owrx.cpu import CpuUsageThread from owrx.sdr import SdrService -from owrx.source import SdrSource +from owrx.source import SdrSource, SdrSourceEventClient from owrx.client import ClientRegistry, TooManyClientsException from owrx.feature import FeatureDetector from owrx.version import openwebrx_version @@ -107,7 +107,7 @@ class OpenWebRxClient(Client, metaclass=ABCMeta): self.send({"type": "receiver_details", "value": details}) -class OpenWebRxReceiverClient(OpenWebRxClient): +class OpenWebRxReceiverClient(OpenWebRxClient, SdrSourceEventClient): config_keys = [ "waterfall_colors", "waterfall_min_level", @@ -152,6 +152,23 @@ class OpenWebRxReceiverClient(OpenWebRxClient): CpuUsageThread.getSharedInstance().add_client(self) + def onStateChange(self, state): + if state == SdrSource.STATE_RUNNING: + self.handleSdrAvailable() + elif state == SdrSource.STATE_FAILED: + self.handleSdrFailed() + + def handleSdrFailed(self): + logger.warning('SDR device "%s" has failed, selecting new device', self.sdr.getName()) + self.write_log_message('SDR device "{0}" has failed, selecting new device'.format(self.sdr.getName())) + self.setSdr() + + def onBusyStateChange(self, state): + pass + + def getClientClass(self): + return SdrSource.CLIENT_USER + def __sendProfiles(self): profiles = [ {"name": s.getName() + " " + p["name"], "id": sid + "|" + pid} @@ -200,39 +217,35 @@ class OpenWebRxReceiverClient(OpenWebRxClient): logger.warning("message is not json: {0}".format(message)) def setSdr(self, id=None): - while True: - next = None - if id is not None: - next = SdrService.getSource(id) - if next is None: - next = SdrService.getFirstSource() - if next is None: - # exit condition: no sdrs available - logger.warning("no more SDR devices available") - self.handleNoSdrsAvailable() - return + next = None + if id is not None: + next = SdrService.getSource(id) + if next is None: + next = SdrService.getFirstSource() - # exit condition: no change - if next == self.sdr: - return + # exit condition: no change + if next == self.sdr and next is not None: + return - self.stopDsp() + self.stopDsp() - if self.configSub is not None: - self.configSub.cancel() - self.configSub = None + if self.configSub is not None: + self.configSub.cancel() + self.configSub = None - self.sdr = next + if self.sdr is not None: + self.sdr.removeClient(self) - self.getDsp() + if next is None: + # exit condition: no sdrs available + logger.warning("no more SDR devices available") + self.handleNoSdrsAvailable() + return - # found a working sdr, exit the loop - if self.sdr.getState() != SdrSource.STATE_FAILED: - break - - logger.warning('SDR device "%s" has failed, selecing new device', self.sdr.getName()) - self.write_log_message('SDR device "{0}" has failed, selecting new device'.format(self.sdr.getName())) + self.sdr = next + self.sdr.addClient(self) + def handleSdrAvailable(self): # send initial config self.getDsp().setProperties(self.connectionProperties) @@ -261,6 +274,7 @@ class OpenWebRxReceiverClient(OpenWebRxClient): self.__sendProfiles() self.sdr.addSpectrumClient(self) + self.startDsp() def handleNoSdrsAvailable(self): self.write_sdr_error("No SDR Devices available") @@ -269,6 +283,8 @@ class OpenWebRxReceiverClient(OpenWebRxClient): self.getDsp().start() def close(self): + if self.sdr is not None: + self.sdr.removeClient(self) self.stopDsp() CpuUsageThread.getSharedInstance().remove_client(self) ClientRegistry.getSharedInstance().removeClient(self) diff --git a/owrx/source/__init__.py b/owrx/source/__init__.py index 656edbf..c71c5a3 100644 --- a/owrx/source/__init__.py +++ b/owrx/source/__init__.py @@ -157,6 +157,9 @@ class SdrSource(ABC): if self.monitor: return + if self.isFailed(): + return + try: self.preStart() except Exception: @@ -183,16 +186,18 @@ class SdrSource(ABC): def wait_for_process_to_end(): rc = self.process.wait() logger.debug("shut down with RC={0}".format(rc)) + self.monitor = None if self.getState() == SdrSource.STATE_RUNNING: self.failed = True self.setState(SdrSource.STATE_FAILED) - self.monitor = None + else: + self.setState(SdrSource.STATE_STOPPED) self.monitor = threading.Thread(target=wait_for_process_to_end, name="source_monitor") self.monitor.start() retries = 1000 - while retries > 0: + while retries > 0 and not self.isFailed(): retries -= 1 if self.monitor is None: break @@ -248,14 +253,13 @@ class SdrSource(ABC): if self.monitor: self.monitor.join() - self.setState(SdrSource.STATE_STOPPED) - def hasClients(self, *args): clients = [c for c in self.clients if c.getClientClass() in args] return len(clients) > 0 def addClient(self, c: SdrSourceEventClient): self.clients.append(c) + c.onStateChange(self.getState()) hasUsers = self.hasClients(SdrSource.CLIENT_USER) hasBackgroundTasks = self.hasClients(SdrSource.CLIENT_BACKGROUND) if hasUsers or hasBackgroundTasks: @@ -280,6 +284,9 @@ class SdrSource(ABC): self.stop() def addSpectrumClient(self, c): + if c in self.spectrumClients: + return + # local import due to circular depencency from owrx.fft import SpectrumThread