rewire the metrics; make queue length metric available
This commit is contained in:
parent
a11875145b
commit
bc5b16b5e3
@ -1,3 +1,29 @@
|
|||||||
|
class Metric(object):
|
||||||
|
def getValue(self):
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
class CounterMetric(Metric):
|
||||||
|
def __init__(self):
|
||||||
|
self.counter = 0
|
||||||
|
|
||||||
|
def inc(self, increment=1):
|
||||||
|
self.counter += increment
|
||||||
|
|
||||||
|
def getValue(self):
|
||||||
|
return {
|
||||||
|
"count": self.counter,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class DirectMetric(Metric):
|
||||||
|
def __init__(self, getter):
|
||||||
|
self.getter = getter
|
||||||
|
|
||||||
|
def getValue(self):
|
||||||
|
return self.getter()
|
||||||
|
|
||||||
|
|
||||||
class Metrics(object):
|
class Metrics(object):
|
||||||
sharedInstance = None
|
sharedInstance = None
|
||||||
|
|
||||||
@ -10,21 +36,27 @@ class Metrics(object):
|
|||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.metrics = {}
|
self.metrics = {}
|
||||||
|
|
||||||
def pushDecodes(self, band, mode, count=1):
|
def addMetric(self, name, metric):
|
||||||
if band is None:
|
self.metrics[name] = metric
|
||||||
band = "unknown"
|
|
||||||
else:
|
|
||||||
band = band.getName()
|
|
||||||
|
|
||||||
if mode is None:
|
def hasMetric(self, name):
|
||||||
mode = "unknown"
|
return name in self.metrics
|
||||||
|
|
||||||
if not band in self.metrics:
|
def getMetric(self, name):
|
||||||
self.metrics[band] = {}
|
if not self.hasMetric(name):
|
||||||
if not mode in self.metrics[band]:
|
return None
|
||||||
self.metrics[band][mode] = {"count": 0}
|
return self.metrics[name]
|
||||||
|
|
||||||
self.metrics[band][mode]["count"] += count
|
|
||||||
|
|
||||||
def getMetrics(self):
|
def getMetrics(self):
|
||||||
return self.metrics
|
result = {}
|
||||||
|
|
||||||
|
for (key, metric) in self.metrics.items():
|
||||||
|
partial = result
|
||||||
|
keys = key.split(".")
|
||||||
|
for keypart in keys[0:-1]:
|
||||||
|
if not keypart in partial:
|
||||||
|
partial[keypart] = {}
|
||||||
|
partial = partial[keypart]
|
||||||
|
partial[keys[-1]] = metric.getValue()
|
||||||
|
|
||||||
|
return result
|
||||||
|
26
owrx/wsjt.py
26
owrx/wsjt.py
@ -11,7 +11,7 @@ import re
|
|||||||
from queue import Queue, Full
|
from queue import Queue, Full
|
||||||
from owrx.config import PropertyManager
|
from owrx.config import PropertyManager
|
||||||
from owrx.bands import Bandplan
|
from owrx.bands import Bandplan
|
||||||
from owrx.metrics import Metrics
|
from owrx.metrics import Metrics, CounterMetric, DirectMetric
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
@ -48,6 +48,7 @@ class WsjtQueue(Queue):
|
|||||||
def __init__(self, maxsize, workers):
|
def __init__(self, maxsize, workers):
|
||||||
super().__init__(maxsize)
|
super().__init__(maxsize)
|
||||||
self.workers = [self.newWorker() for _ in range(0, workers)]
|
self.workers = [self.newWorker() for _ in range(0, workers)]
|
||||||
|
Metrics.getSharedInstance().addMetric("wsjt.queue.length", DirectMetric(self.qsize))
|
||||||
|
|
||||||
def put(self, item):
|
def put(self, item):
|
||||||
super(WsjtQueue, self).put(item, block=False)
|
super(WsjtQueue, self).put(item, block=False)
|
||||||
@ -249,6 +250,24 @@ class WsjtParser(object):
|
|||||||
ts = datetime.strptime(instring, dateformat)
|
ts = datetime.strptime(instring, dateformat)
|
||||||
return int(datetime.combine(date.today(), ts.time()).replace(tzinfo=timezone.utc).timestamp() * 1000)
|
return int(datetime.combine(date.today(), ts.time()).replace(tzinfo=timezone.utc).timestamp() * 1000)
|
||||||
|
|
||||||
|
def pushDecode(self, mode):
|
||||||
|
metrics = Metrics.getSharedInstance()
|
||||||
|
band = self.band.getName()
|
||||||
|
if band is None:
|
||||||
|
band = "unknown"
|
||||||
|
|
||||||
|
if mode is None:
|
||||||
|
mode = "unknown"
|
||||||
|
|
||||||
|
name = "wsjt.decodes.{band}.{mode}".format(band=band, mode=mode)
|
||||||
|
metric = metrics.getMetric(name)
|
||||||
|
if metric is None:
|
||||||
|
metric = CounterMetric()
|
||||||
|
metrics.addMetric(name, metric)
|
||||||
|
|
||||||
|
metric.inc()
|
||||||
|
|
||||||
|
|
||||||
def parse_from_jt9(self, msg):
|
def parse_from_jt9(self, msg):
|
||||||
# ft8 sample
|
# ft8 sample
|
||||||
# '222100 -15 -0.0 508 ~ CQ EA7MJ IM66'
|
# '222100 -15 -0.0 508 ~ CQ EA7MJ IM66'
|
||||||
@ -266,7 +285,8 @@ class WsjtParser(object):
|
|||||||
mode = WsjtParser.modes[modeChar] if modeChar in WsjtParser.modes else "unknown"
|
mode = WsjtParser.modes[modeChar] if modeChar in WsjtParser.modes else "unknown"
|
||||||
wsjt_msg = msg[17:53].strip()
|
wsjt_msg = msg[17:53].strip()
|
||||||
self.parseLocator(wsjt_msg, mode)
|
self.parseLocator(wsjt_msg, mode)
|
||||||
Metrics.getSharedInstance().pushDecodes(self.band, mode)
|
|
||||||
|
self.pushDecode(mode)
|
||||||
return {
|
return {
|
||||||
"timestamp": timestamp,
|
"timestamp": timestamp,
|
||||||
"db": float(msg[0:3]),
|
"db": float(msg[0:3]),
|
||||||
@ -292,7 +312,7 @@ class WsjtParser(object):
|
|||||||
# '0052 -29 2.6 0.001486 0 G02CWT IO92 23'
|
# '0052 -29 2.6 0.001486 0 G02CWT IO92 23'
|
||||||
wsjt_msg = msg[29:].strip()
|
wsjt_msg = msg[29:].strip()
|
||||||
self.parseWsprMessage(wsjt_msg)
|
self.parseWsprMessage(wsjt_msg)
|
||||||
Metrics.getSharedInstance().pushDecodes(self.band, "WSPR")
|
self.pushDecode("WSPR")
|
||||||
return {
|
return {
|
||||||
"timestamp": self.parse_timestamp(msg[0:4], "%H%M"),
|
"timestamp": self.parse_timestamp(msg[0:4], "%H%M"),
|
||||||
"db": float(msg[5:8]),
|
"db": float(msg[5:8]),
|
||||||
|
Loading…
Reference in New Issue
Block a user