390 lines
13 KiB
Python
Raw Permalink Normal View History

import threading
import wave
from datetime import datetime, timedelta, timezone
import subprocess
2019-07-06 20:03:17 +02:00
import os
from multiprocessing.connection import Pipe
2019-07-06 22:21:47 +02:00
from owrx.map import Map, LocatorLocation
import re
2019-08-22 21:24:36 +02:00
from queue import Queue, Full
from owrx.config import PropertyManager
from owrx.bands import Bandplan
from owrx.metrics import Metrics, CounterMetric, DirectMetric
from owrx.pskreporter import PskReporter
import logging
logger = logging.getLogger(__name__)
2019-09-22 12:57:59 +02:00
logger.setLevel(logging.INFO)
class WsjtQueueWorker(threading.Thread):
def __init__(self, queue):
self.queue = queue
self.doRun = True
super().__init__(daemon=True)
def run(self) -> None:
while self.doRun:
(processor, file) = self.queue.get()
try:
logger.debug("processing file %s", file)
processor.decode(file)
except Exception:
logger.exception("failed to decode job")
2019-09-18 01:46:09 +02:00
self.queue.onError()
self.queue.task_done()
class WsjtQueue(Queue):
sharedInstance = None
creationLock = threading.Lock()
@staticmethod
def getSharedInstance():
with WsjtQueue.creationLock:
if WsjtQueue.sharedInstance is None:
pm = PropertyManager.getSharedInstance()
WsjtQueue.sharedInstance = WsjtQueue(maxsize=pm["wsjt_queue_length"], workers=pm["wsjt_queue_workers"])
return WsjtQueue.sharedInstance
def __init__(self, maxsize, workers):
super().__init__(maxsize)
2019-09-13 00:16:36 +02:00
metrics = Metrics.getSharedInstance()
metrics.addMetric("wsjt.queue.length", DirectMetric(self.qsize))
self.inCounter = CounterMetric()
metrics.addMetric("wsjt.queue.in", self.inCounter)
self.outCounter = CounterMetric()
metrics.addMetric("wsjt.queue.out", self.outCounter)
2019-09-15 12:23:35 +02:00
self.overflowCounter = CounterMetric()
metrics.addMetric("wsjt.queue.overflow", self.overflowCounter)
2019-09-18 01:46:09 +02:00
self.errorCounter = CounterMetric()
metrics.addMetric("wsjt.queue.error", self.errorCounter)
self.workers = [self.newWorker() for _ in range(0, workers)]
def put(self, item):
2019-09-13 00:16:36 +02:00
self.inCounter.inc()
2019-09-15 12:23:35 +02:00
try:
super(WsjtQueue, self).put(item, block=False)
except Full:
self.overflowCounter.inc()
raise
2019-09-13 00:16:36 +02:00
def get(self, **kwargs):
# super.get() is blocking, so it would mess up the stats to inc() first
out = super(WsjtQueue, self).get(**kwargs)
self.outCounter.inc()
return out
def newWorker(self):
worker = WsjtQueueWorker(self)
worker.start()
return worker
2019-09-18 01:46:09 +02:00
def onError(self):
self.errorCounter.inc()
2019-07-13 23:16:25 +02:00
class WsjtChopper(threading.Thread):
def __init__(self, source):
self.source = source
self.tmp_dir = PropertyManager.getSharedInstance()["temporary_directory"]
(self.wavefilename, self.wavefile) = self.getWaveFile()
self.switchingLock = threading.Lock()
self.timer = None
2019-07-06 20:03:17 +02:00
(self.outputReader, self.outputWriter) = Pipe()
self.doRun = True
super().__init__()
def getWaveFile(self):
2019-07-13 23:16:25 +02:00
filename = "{tmp_dir}/openwebrx-wsjtchopper-{id}-{timestamp}.wav".format(
tmp_dir=self.tmp_dir, id=id(self), timestamp=datetime.utcnow().strftime(self.fileTimestampFormat)
2019-07-11 20:48:02 +02:00
)
wavefile = wave.open(filename, "wb")
wavefile.setnchannels(1)
wavefile.setsampwidth(2)
wavefile.setframerate(12000)
return filename, wavefile
def getNextDecodingTime(self):
t = datetime.utcnow()
2019-07-13 23:16:25 +02:00
zeroed = t.replace(minute=0, second=0, microsecond=0)
delta = t - zeroed
seconds = (int(delta.total_seconds() / self.interval) + 1) * self.interval
t = zeroed + timedelta(seconds=seconds)
logger.debug("scheduling: {0}".format(t))
return t
def cancelTimer(self):
if self.timer:
self.timer.cancel()
def _scheduleNextSwitch(self):
if self.doRun:
delta = self.getNextDecodingTime() - datetime.utcnow()
self.timer = threading.Timer(delta.total_seconds(), self.switchFiles)
self.timer.start()
def switchFiles(self):
self.switchingLock.acquire()
file = self.wavefile
filename = self.wavefilename
(self.wavefilename, self.wavefile) = self.getWaveFile()
self.switchingLock.release()
file.close()
2019-08-22 21:24:36 +02:00
try:
WsjtQueue.getSharedInstance().put((self, filename))
except Full:
logger.warning("wsjt decoding queue overflow; dropping one file")
os.unlink(filename)
self._scheduleNextSwitch()
2019-07-13 23:16:25 +02:00
def decoder_commandline(self, file):
2019-07-20 12:47:10 +02:00
"""
2019-07-13 23:16:25 +02:00
must be overridden in child classes
2019-07-20 12:47:10 +02:00
"""
2019-07-13 23:16:25 +02:00
return []
def decode(self, file):
2019-09-13 23:03:05 +02:00
decoder = subprocess.Popen(
self.decoder_commandline(file), stdout=subprocess.PIPE, cwd=self.tmp_dir, preexec_fn=lambda: os.nice(10)
)
while True:
line = decoder.stdout.readline()
if line is None or (isinstance(line, bytes) and len(line) == 0):
break
self.outputWriter.send(line)
rc = decoder.wait()
if rc != 0:
logger.warning("decoder return code: %i", rc)
os.unlink(file)
def run(self) -> None:
2019-07-13 23:16:25 +02:00
logger.debug("WSJT chopper starting up")
self._scheduleNextSwitch()
while self.doRun:
data = self.source.read(256)
if data is None or (isinstance(data, bytes) and len(data) == 0):
self.doRun = False
else:
self.switchingLock.acquire()
self.wavefile.writeframes(data)
self.switchingLock.release()
2019-07-13 23:16:25 +02:00
logger.debug("WSJT chopper shutting down")
2019-07-06 20:03:17 +02:00
self.outputReader.close()
self.outputWriter.close()
self.cancelTimer()
2019-07-12 19:34:04 +02:00
try:
os.unlink(self.wavefilename)
except Exception:
logger.exception("error removing undecoded file")
2019-07-06 20:03:17 +02:00
def read(self):
try:
return self.outputReader.recv()
except EOFError:
return None
2019-09-15 16:37:12 +02:00
def decoding_depth(self, mode):
pm = PropertyManager.getSharedInstance()
# mode-specific setting?
if "wsjt_decoding_depths" in pm and mode in pm["wsjt_decoding_depths"]:
2019-09-23 03:15:24 +02:00
return pm["wsjt_decoding_depths"][mode]
2019-09-15 16:37:12 +02:00
# return global default
if "wsjt_decoding_depth" in pm:
return pm["wsjt_decoding_depth"]
# default when no setting is provided
return 3
2019-07-06 20:03:17 +02:00
2019-07-13 23:16:25 +02:00
class Ft8Chopper(WsjtChopper):
def __init__(self, source):
self.interval = 15
2019-07-20 12:47:10 +02:00
self.fileTimestampFormat = "%y%m%d_%H%M%S"
2019-07-13 23:16:25 +02:00
super().__init__(source)
def decoder_commandline(self, file):
2019-09-15 16:37:12 +02:00
return ["jt9", "--ft8", "-d", str(self.decoding_depth("ft8")), file]
2019-07-13 23:16:25 +02:00
class WsprChopper(WsjtChopper):
def __init__(self, source):
self.interval = 120
2019-07-20 12:47:10 +02:00
self.fileTimestampFormat = "%y%m%d_%H%M"
2019-07-13 23:16:25 +02:00
super().__init__(source)
def decoder_commandline(self, file):
2019-09-15 16:37:12 +02:00
cmd = ["wsprd"]
if self.decoding_depth("wspr") > 1:
cmd += ["-d"]
cmd += [file]
return cmd
2019-07-13 23:16:25 +02:00
class Jt65Chopper(WsjtChopper):
def __init__(self, source):
self.interval = 60
2019-07-20 12:47:10 +02:00
self.fileTimestampFormat = "%y%m%d_%H%M"
super().__init__(source)
def decoder_commandline(self, file):
2019-09-15 16:37:12 +02:00
return ["jt9", "--jt65", "-d", str(self.decoding_depth("jt65")), file]
class Jt9Chopper(WsjtChopper):
def __init__(self, source):
self.interval = 60
2019-07-20 12:47:10 +02:00
self.fileTimestampFormat = "%y%m%d_%H%M"
super().__init__(source)
def decoder_commandline(self, file):
2019-09-15 16:37:12 +02:00
return ["jt9", "--jt9", "-d", str(self.decoding_depth("jt9")), file]
2019-07-20 13:38:25 +02:00
class Ft4Chopper(WsjtChopper):
def __init__(self, source):
self.interval = 7.5
self.fileTimestampFormat = "%y%m%d_%H%M%S"
super().__init__(source)
def decoder_commandline(self, file):
2019-09-15 16:37:12 +02:00
return ["jt9", "--ft4", "-d", str(self.decoding_depth("ft4")), file]
2019-07-20 13:38:25 +02:00
2019-07-06 20:03:17 +02:00
class WsjtParser(object):
def __init__(self, handler):
self.handler = handler
self.dial_freq = None
self.band = None
2019-07-06 20:03:17 +02:00
modes = {"~": "FT8", "#": "JT65", "@": "JT9", "+": "FT4"}
2019-07-11 23:40:09 +02:00
2019-07-06 20:03:17 +02:00
def parse(self, data):
try:
msg = data.decode().rstrip()
# known debug messages we know to skip
if msg.startswith("<DecodeFinished>"):
return
if msg.startswith(" EOF on input file"):
return
2019-07-20 12:47:10 +02:00
modes = list(WsjtParser.modes.keys())
if msg[21] in modes or msg[19] in modes:
decoder = Jt9Decoder()
2019-07-20 12:47:10 +02:00
else:
decoder = WsprDecoder()
2019-09-23 22:45:55 +02:00
out = decoder.parse(msg, self.dial_freq)
if "mode" in out:
self.pushDecode(out["mode"])
if "callsign" in out and "locator" in out:
Map.getSharedInstance().updateLocation(
out["callsign"], LocatorLocation(out["locator"]), out["mode"], self.band
)
PskReporter.getSharedInstance().spot(out)
2019-07-06 20:03:17 +02:00
self.handler.write_wsjt_message(out)
except ValueError:
logger.exception("error while parsing wsjt message")
2019-07-06 22:21:47 +02:00
def pushDecode(self, mode):
metrics = Metrics.getSharedInstance()
2019-09-12 23:23:50 +02:00
band = "unknown"
if self.band is not None:
band = self.band.getName()
if band is None:
band = "unknown"
if mode is None:
mode = "unknown"
name = "wsjt.decodes.{band}.{mode}".format(band=band, mode=mode)
metric = metrics.getMetric(name)
if metric is None:
metric = CounterMetric()
metrics.addMetric(name, metric)
metric.inc()
def setDialFrequency(self, freq):
self.dial_freq = freq
self.band = Bandplan.getSharedInstance().findBand(freq)
class Decoder(object):
def parse_timestamp(self, instring, dateformat):
ts = datetime.strptime(instring, dateformat)
2019-09-25 00:47:34 +02:00
return int(
datetime.combine(datetime.utcnow().date(), ts.time()).replace(tzinfo=timezone.utc).timestamp() * 1000
)
class Jt9Decoder(Decoder):
locator_pattern = re.compile("[A-Z0-9]+\\s([A-Z0-9]+)\\s([A-R]{2}[0-9]{2})$")
2019-09-23 22:45:55 +02:00
def parse(self, msg, dial_freq):
2019-07-13 23:16:25 +02:00
# ft8 sample
# '222100 -15 -0.0 508 ~ CQ EA7MJ IM66'
# jt65 sample
2019-07-20 12:47:10 +02:00
# '2352 -7 0.4 1801 # R0WAS R2ABM KO85'
# '0003 -4 0.4 1762 # CQ R2ABM KO85'
modes = list(WsjtParser.modes.keys())
if msg[19] in modes:
dateformat = "%H%M"
else:
2019-07-20 12:47:10 +02:00
dateformat = "%H%M%S"
timestamp = self.parse_timestamp(msg[0 : len(dateformat)], dateformat)
msg = msg[len(dateformat) + 1 :]
modeChar = msg[14:15]
2019-07-20 12:47:10 +02:00
mode = WsjtParser.modes[modeChar] if modeChar in WsjtParser.modes else "unknown"
wsjt_msg = msg[17:53].strip()
result = {
2019-07-20 12:47:10 +02:00
"timestamp": timestamp,
"db": float(msg[0:3]),
"dt": float(msg[4:8]),
2019-09-23 22:45:55 +02:00
"freq": dial_freq + int(msg[9:13]),
2019-07-20 12:47:10 +02:00
"mode": mode,
"msg": wsjt_msg,
2019-07-20 12:47:10 +02:00
}
result.update(self.parseMessage(wsjt_msg))
return result
2019-07-13 23:16:25 +02:00
def parseMessage(self, msg):
m = Jt9Decoder.locator_pattern.match(msg)
2019-07-06 22:21:47 +02:00
if m is None:
return {}
2019-07-06 22:21:47 +02:00
# this is a valid locator in theory, but it's somewhere in the arctic ocean, near the north pole, so it's very
# likely this just means roger roger goodbye.
if m.group(2) == "RR73":
return {"callsign": m.group(1)}
return {"callsign": m.group(1), "locator": m.group(2)}
2019-07-13 23:16:25 +02:00
class WsprDecoder(Decoder):
wspr_splitter_pattern = re.compile("([A-Z0-9]*)\\s([A-R]{2}[0-9]{2})\\s([0-9]+)")
2019-09-23 22:45:55 +02:00
def parse(self, msg, dial_freq):
2019-07-13 23:16:25 +02:00
# wspr sample
# '2600 -24 0.4 0.001492 -1 G8AXA JO01 33'
2019-07-20 12:47:10 +02:00
# '0052 -29 2.6 0.001486 0 G02CWT IO92 23'
2019-07-14 14:33:30 +02:00
wsjt_msg = msg[29:].strip()
result = {
2019-07-20 12:47:10 +02:00
"timestamp": self.parse_timestamp(msg[0:4], "%H%M"),
"db": float(msg[5:8]),
"dt": float(msg[9:13]),
2019-09-23 22:45:55 +02:00
"freq": dial_freq + int(float(msg[14:24]) * 1e6),
2019-07-20 12:47:10 +02:00
"drift": int(msg[25:28]),
"mode": "WSPR",
"msg": wsjt_msg,
2019-07-20 12:47:10 +02:00
}
result.update(self.parseMessage(wsjt_msg))
return result
2019-07-13 23:16:25 +02:00
def parseMessage(self, msg):
m = WsprDecoder.wspr_splitter_pattern.match(msg)
2019-07-13 23:16:25 +02:00
if m is None:
return {}
return {"callsign": m.group(1), "locator": m.group(2)}