diff --git a/owrx/metrics.py b/owrx/metrics.py index 11f503f..b1b5778 100644 --- a/owrx/metrics.py +++ b/owrx/metrics.py @@ -1,3 +1,29 @@ +class Metric(object): + def getValue(self): + return 0 + + +class CounterMetric(Metric): + def __init__(self): + self.counter = 0 + + def inc(self, increment=1): + self.counter += increment + + def getValue(self): + return { + "count": self.counter, + } + + +class DirectMetric(Metric): + def __init__(self, getter): + self.getter = getter + + def getValue(self): + return self.getter() + + class Metrics(object): sharedInstance = None @@ -10,21 +36,27 @@ class Metrics(object): def __init__(self): self.metrics = {} - def pushDecodes(self, band, mode, count=1): - if band is None: - band = "unknown" - else: - band = band.getName() + def addMetric(self, name, metric): + self.metrics[name] = metric - if mode is None: - mode = "unknown" + def hasMetric(self, name): + return name in self.metrics - if not band in self.metrics: - self.metrics[band] = {} - if not mode in self.metrics[band]: - self.metrics[band][mode] = {"count": 0} - - self.metrics[band][mode]["count"] += count + def getMetric(self, name): + if not self.hasMetric(name): + return None + return self.metrics[name] def getMetrics(self): - return self.metrics + result = {} + + for (key, metric) in self.metrics.items(): + partial = result + keys = key.split(".") + for keypart in keys[0:-1]: + if not keypart in partial: + partial[keypart] = {} + partial = partial[keypart] + partial[keys[-1]] = metric.getValue() + + return result diff --git a/owrx/wsjt.py b/owrx/wsjt.py index a12bd97..9c29070 100644 --- a/owrx/wsjt.py +++ b/owrx/wsjt.py @@ -11,7 +11,7 @@ import re from queue import Queue, Full from owrx.config import PropertyManager from owrx.bands import Bandplan -from owrx.metrics import Metrics +from owrx.metrics import Metrics, CounterMetric, DirectMetric import logging @@ -48,6 +48,7 @@ class WsjtQueue(Queue): def __init__(self, maxsize, workers): super().__init__(maxsize) self.workers = [self.newWorker() for _ in range(0, workers)] + Metrics.getSharedInstance().addMetric("wsjt.queue.length", DirectMetric(self.qsize)) def put(self, item): super(WsjtQueue, self).put(item, block=False) @@ -249,6 +250,24 @@ class WsjtParser(object): ts = datetime.strptime(instring, dateformat) return int(datetime.combine(date.today(), ts.time()).replace(tzinfo=timezone.utc).timestamp() * 1000) + def pushDecode(self, mode): + metrics = Metrics.getSharedInstance() + band = self.band.getName() + if band is None: + band = "unknown" + + if mode is None: + mode = "unknown" + + name = "wsjt.decodes.{band}.{mode}".format(band=band, mode=mode) + metric = metrics.getMetric(name) + if metric is None: + metric = CounterMetric() + metrics.addMetric(name, metric) + + metric.inc() + + def parse_from_jt9(self, msg): # ft8 sample # '222100 -15 -0.0 508 ~ CQ EA7MJ IM66' @@ -266,7 +285,8 @@ class WsjtParser(object): mode = WsjtParser.modes[modeChar] if modeChar in WsjtParser.modes else "unknown" wsjt_msg = msg[17:53].strip() self.parseLocator(wsjt_msg, mode) - Metrics.getSharedInstance().pushDecodes(self.band, mode) + + self.pushDecode(mode) return { "timestamp": timestamp, "db": float(msg[0:3]), @@ -292,7 +312,7 @@ class WsjtParser(object): # '0052 -29 2.6 0.001486 0 G02CWT IO92 23' wsjt_msg = msg[29:].strip() self.parseWsprMessage(wsjt_msg) - Metrics.getSharedInstance().pushDecodes(self.band, "WSPR") + self.pushDecode("WSPR") return { "timestamp": self.parse_timestamp(msg[0:4], "%H%M"), "db": float(msg[5:8]),