diff --git a/owrx/reporting.py b/owrx/reporting.py deleted file mode 100644 index 32e996c..0000000 --- a/owrx/reporting.py +++ /dev/null @@ -1,58 +0,0 @@ -import threading -from abc import ABC, abstractmethod -from owrx.config import Config - - -class Reporter(ABC): - @abstractmethod - def stop(self): - pass - - @abstractmethod - def spot(self, spot): - pass - - @abstractmethod - def getSupportedModes(self): - return [] - - -class ReportingEngine(object): - creationLock = threading.Lock() - sharedInstance = None - - @staticmethod - def getSharedInstance(): - with ReportingEngine.creationLock: - if ReportingEngine.sharedInstance is None: - ReportingEngine.sharedInstance = ReportingEngine() - return ReportingEngine.sharedInstance - - @staticmethod - def stopAll(): - with ReportingEngine.creationLock: - if ReportingEngine.sharedInstance is not None: - ReportingEngine.sharedInstance.stop() - - def __init__(self): - self.reporters = [] - config = Config.get() - if "pskreporter_enabled" in config and config["pskreporter_enabled"]: - # inline import due to circular dependencies - from owrx.pskreporter import PskReporter - - self.reporters += [PskReporter()] - if "wsprnet_enabled" in config and config["wsprnet_enabled"]: - # inline import due to circular dependencies - from owrx.wsprnet import WsprnetReporter - - self.reporters += [WsprnetReporter()] - - def stop(self): - for r in self.reporters: - r.stop() - - def spot(self, spot): - for r in self.reporters: - if spot["mode"] in r.getSupportedModes(): - r.spot(spot) diff --git a/owrx/reporting/__init__.py b/owrx/reporting/__init__.py new file mode 100644 index 0000000..f65feab --- /dev/null +++ b/owrx/reporting/__init__.py @@ -0,0 +1,57 @@ +import threading +from owrx.config import Config +from owrx.reporting.reporter import Reporter +from owrx.reporting.pskreporter import PskReporter +from owrx.reporting.wsprnet import WsprnetReporter +import logging + +logger = logging.getLogger(__name__) + + +class ReportingEngine(object): + creationLock = threading.Lock() + sharedInstance = None + + reporterClasses = { + "pskreporter_enabled": PskReporter, + "wsprnet_enabled": WsprnetReporter, + } + + @staticmethod + def getSharedInstance(): + with ReportingEngine.creationLock: + if ReportingEngine.sharedInstance is None: + ReportingEngine.sharedInstance = ReportingEngine() + return ReportingEngine.sharedInstance + + @staticmethod + def stopAll(): + with ReportingEngine.creationLock: + if ReportingEngine.sharedInstance is not None: + ReportingEngine.sharedInstance.stop() + + def __init__(self): + self.reporters = [] + self.configSub = Config.get().filter(*ReportingEngine.reporterClasses.keys()).wire(self.setupReporters) + self.setupReporters() + + def setupReporters(self, *args): + config = Config.get() + for configKey, reporterClass in ReportingEngine.reporterClasses.items(): + if configKey in config and config[configKey]: + if not any(isinstance(r, reporterClass) for r in self.reporters): + self.reporters += [reporterClass()] + else: + for reporter in [r for r in self.reporters if isinstance(r, reporterClass)]: + reporter.stop() + self.reporters.remove(reporter) + + def stop(self): + for r in self.reporters: + r.stop() + self.configSub.cancel() + + def spot(self, spot): + for r in self.reporters: + if spot["mode"] in r.getSupportedModes(): + r.spot(spot) diff --git a/owrx/pskreporter.py b/owrx/reporting/pskreporter.py similarity index 98% rename from owrx/pskreporter.py rename to owrx/reporting/pskreporter.py index 6f0ee11..aa88be3 100644 --- a/owrx/pskreporter.py +++ b/owrx/reporting/pskreporter.py @@ -9,7 +9,7 @@ from owrx.config import Config from owrx.version import openwebrx_version from owrx.locator import Locator from owrx.metrics import Metrics, CounterMetric -from owrx.reporting import Reporter +from owrx.reporting.reporter import Reporter logger = logging.getLogger(__name__) @@ -22,6 +22,8 @@ class PskReporter(Reporter): def stop(self): self.cancelTimer() + with self.spotLock: + self.spots = [] def __init__(self): self.spots = [] diff --git a/owrx/reporting/reporter.py b/owrx/reporting/reporter.py new file mode 100644 index 0000000..5ccb741 --- /dev/null +++ b/owrx/reporting/reporter.py @@ -0,0 +1,15 @@ +from abc import ABC, abstractmethod + + +class Reporter(ABC): + @abstractmethod + def stop(self): + pass + + @abstractmethod + def spot(self, spot): + pass + + @abstractmethod + def getSupportedModes(self): + return [] diff --git a/owrx/wsprnet.py b/owrx/reporting/wsprnet.py similarity index 84% rename from owrx/wsprnet.py rename to owrx/reporting/wsprnet.py index 2a1e9d0..e744bd4 100644 --- a/owrx/wsprnet.py +++ b/owrx/reporting/wsprnet.py @@ -1,4 +1,4 @@ -from owrx.reporting import Reporter +from owrx.reporting.reporter import Reporter from owrx.version import openwebrx_version from owrx.config import Config from owrx.locator import Locator @@ -12,14 +12,13 @@ from datetime import datetime, timezone logger = logging.getLogger(__name__) +PoisonPill = object() + + class Worker(threading.Thread): def __init__(self, queue: Queue): self.queue = queue self.doRun = True - # some constants that we don't expect to change - config = Config.get() - self.callsign = config["wsprnet_callsign"] - self.locator = Locator.fromCoordinates(config["receiver_gps"]) super().__init__(daemon=True) @@ -27,8 +26,11 @@ class Worker(threading.Thread): while self.doRun: try: spot = self.queue.get() - self.uploadSpot(spot) - self.queue.task_done() + if spot is PoisonPill: + self.doRun = False + else: + self.uploadSpot(spot) + self.queue.task_done() except Exception: logger.exception("Exception while uploading WSPRNet spot") @@ -40,6 +42,7 @@ class Worker(threading.Thread): return interval def uploadSpot(self, spot): + config = Config.get() # function=wspr&date=210114&time=1732&sig=-15&dt=0.5&drift=0&tqrg=7.040019&tcall=DF2UU&tgrid=JN48&dbm=37&version=2.3.0-rc3&rcall=DD5JFK&rgrid=JN58SC&rqrg=7.040047&mode=2 # {'timestamp': 1610655960000, 'db': -23.0, 'dt': 0.3, 'freq': 7040048, 'drift': -1, 'msg': 'LA3JJ JO59 37', 'callsign': 'LA3JJ', 'locator': 'JO59', 'mode': 'WSPR'} date = datetime.fromtimestamp(spot["timestamp"] / 1000, tz=timezone.utc) @@ -57,9 +60,8 @@ class Worker(threading.Thread): "tgrid": spot["locator"], "dbm": spot["dbm"], "version": openwebrx_version, - "rcall": self.callsign, - "rgrid": self.locator, - # mode 2 = WSPR 2 minutes + "rcall": config["wsprnet_callsign"], + "rgrid": Locator.fromCoordinates(config["receiver_gps"]), "mode": self._getMode(spot), } ).encode() @@ -79,7 +81,10 @@ class WsprnetReporter(Reporter): metrics.addMetric("wsprnet.spots", self.spotCounter) def stop(self): - pass + while not self.queue.empty(): + self.queue.get(timeout=1) + self.queue.task_done() + self.queue.put(PoisonPill) def spot(self, spot): try: diff --git a/setup.py b/setup.py index bcafa1b..b0101f0 100644 --- a/setup.py +++ b/setup.py @@ -22,6 +22,7 @@ setup( "owrx.property", "owrx.form", "owrx.config", + "owrx.reporting", "csdr", "htdocs", "owrxadmin",