openwebrx-clone/owrx/wsjt.py

400 lines
12 KiB
Python
Raw Permalink Normal View History

2020-04-22 21:53:19 +00:00
from datetime import datetime, timezone
from typing import List
2019-07-06 20:21:47 +00:00
from owrx.map import Map, LocatorLocation
import re
2020-04-22 21:53:19 +00:00
from owrx.metrics import Metrics, CounterMetric
from owrx.reporting import ReportingEngine
from owrx.parser import Parser
from owrx.audio import AudioChopperProfile, StaticProfileSource, ConfigWiredProfileSource
from abc import ABC, ABCMeta, abstractmethod
from owrx.config import Config
2021-02-03 19:11:07 +00:00
from enum import Enum
import logging
logger = logging.getLogger(__name__)
class WsjtProfile(AudioChopperProfile, metaclass=ABCMeta):
def decoding_depth(self):
pm = Config.get()
mode = self.getMode().lower()
# mode-specific setting?
if "wsjt_decoding_depths" in pm and mode in pm["wsjt_decoding_depths"]:
return pm["wsjt_decoding_depths"][mode]
# return global default
if "wsjt_decoding_depth" in pm:
return pm["wsjt_decoding_depth"]
# default when no setting is provided
return 3
def getTimestampFormat(self):
if self.getInterval() < 60:
return "%H%M%S"
return "%H%M"
def getFileTimestampFormat(self):
return "%y%m%d_" + self.getTimestampFormat()
@abstractmethod
def getMode(self):
pass
class Fst4ProfileSource(ConfigWiredProfileSource):
def getPropertiesToWire(self) -> List[str]:
return ["fst4_enabled_intervals"]
def getProfiles(self) -> List[AudioChopperProfile]:
config = Config.get()
profiles = config["fst4_enabled_intervals"] if "fst4_enabled_intervals" in config else []
return [Fst4Profile(i) for i in profiles if i in Fst4Profile.availableIntervals]
class Fst4wProfileSource(ConfigWiredProfileSource):
def getPropertiesToWire(self) -> List[str]:
return ["fst4w_enabled_intervals"]
def getProfiles(self) -> List[AudioChopperProfile]:
config = Config.get()
profiles = config["fst4w_enabled_intervals"] if "fst4w_enabled_intervals" in config else []
return [Fst4wProfile(i) for i in profiles if i in Fst4wProfile.availableIntervals]
class Q65ProfileSource(ConfigWiredProfileSource):
def getPropertiesToWire(self) -> List[str]:
return ["q65_enabled_combinations"]
def getProfiles(self) -> List[AudioChopperProfile]:
config = Config.get()
profiles = config["q65_enabled_combinations"] if "q65_enabled_combinations" in config else []
def buildProfile(modestring):
try:
mode = Q65Mode[modestring[0]]
interval = Q65Interval(int(modestring[1:]))
if interval.is_available(mode):
return Q65Profile(interval, mode)
except (ValueError, KeyError):
pass
logger.warning('"%s" is not a valid Q65 mode, or an invalid mode string, ignoring', modestring)
return None
mapped = [buildProfile(m) for m in profiles]
return [p for p in mapped if p is not None]
class WsjtProfiles(object):
2021-04-09 16:16:25 +00:00
@staticmethod
def getSource(mode: str):
2021-04-09 16:16:25 +00:00
if mode == "ft8":
return StaticProfileSource([Ft8Profile()])
2021-04-09 16:16:25 +00:00
elif mode == "wspr":
return StaticProfileSource([WsprProfile()])
2021-04-09 16:16:25 +00:00
elif mode == "jt65":
return StaticProfileSource([Jt65Profile()])
2021-04-09 16:16:25 +00:00
elif mode == "jt9":
return StaticProfileSource([Jt9Profile()])
2021-04-09 16:16:25 +00:00
elif mode == "ft4":
return StaticProfileSource([Ft4Profile()])
2021-04-09 16:16:25 +00:00
elif mode == "fst4":
return Fst4ProfileSource()
2021-04-09 16:16:25 +00:00
elif mode == "fst4w":
return Fst4wProfileSource()
2021-04-09 16:16:25 +00:00
elif mode == "q65":
return Q65ProfileSource()
2021-04-09 16:16:25 +00:00
class Ft8Profile(WsjtProfile):
2020-04-05 17:08:58 +00:00
def getInterval(self):
return 15
2019-07-13 21:16:25 +00:00
def decoder_commandline(self, file):
return ["jt9", "--ft8", "-d", str(self.decoding_depth()), file]
def getMode(self):
return "FT8"
2019-07-13 21:16:25 +00:00
class WsprProfile(WsjtProfile):
2020-04-05 17:08:58 +00:00
def getInterval(self):
return 120
2019-07-13 21:16:25 +00:00
def decoder_commandline(self, file):
2019-09-15 14:37:12 +00:00
cmd = ["wsprd"]
if self.decoding_depth() > 1:
2019-09-15 14:37:12 +00:00
cmd += ["-d"]
cmd += [file]
return cmd
2019-07-13 21:16:25 +00:00
def getMode(self):
return "WSPR"
2019-07-13 21:16:25 +00:00
class Jt65Profile(WsjtProfile):
2020-04-05 17:08:58 +00:00
def getInterval(self):
return 60
def decoder_commandline(self, file):
return ["jt9", "--jt65", "-d", str(self.decoding_depth()), file]
def getMode(self):
return "JT65"
class Jt9Profile(WsjtProfile):
2020-04-05 17:08:58 +00:00
def getInterval(self):
return 60
def decoder_commandline(self, file):
return ["jt9", "--jt9", "-d", str(self.decoding_depth()), file]
def getMode(self):
return "JT9"
class Ft4Profile(WsjtProfile):
2020-04-05 17:08:58 +00:00
def getInterval(self):
return 7.5
2019-07-20 11:38:25 +00:00
def decoder_commandline(self, file):
return ["jt9", "--ft4", "-d", str(self.decoding_depth()), file]
def getMode(self):
return "FT4"
2019-07-20 11:38:25 +00:00
2020-12-07 10:56:01 +00:00
class Fst4Profile(WsjtProfile):
2021-01-20 16:01:46 +00:00
availableIntervals = [15, 30, 60, 120, 300, 900, 1800]
2020-12-07 10:56:01 +00:00
def __init__(self, interval):
self.interval = interval
def getInterval(self):
return self.interval
def decoder_commandline(self, file):
return ["jt9", "--fst4", "-p", str(self.interval), "-d", str(self.decoding_depth()), file]
def getMode(self):
return "FST4"
2020-12-07 10:56:01 +00:00
class Fst4wProfile(WsjtProfile):
availableIntervals = [120, 300, 900, 1800]
def __init__(self, interval):
self.interval = interval
def getInterval(self):
return self.interval
def decoder_commandline(self, file):
return ["jt9", "--fst4w", "-p", str(self.interval), "-d", str(self.decoding_depth()), file]
def getMode(self):
return "FST4W"
2020-12-07 10:56:01 +00:00
2021-02-03 19:11:07 +00:00
class Q65Mode(Enum):
# value is the bandwidth multiplier according to https://physics.princeton.edu/pulsar/k1jt/Q65_Quick_Start.pdf
2021-02-03 19:11:07 +00:00
A = 1
B = 2
C = 4
D = 8
E = 16
2021-02-03 19:11:07 +00:00
def is_available(self, interval: "Q65Interval"):
return interval.is_available(self)
2021-02-03 19:11:07 +00:00
class Q65Interval(Enum):
# (interval, occupied bandwidth in mode "A")
# according to https://physics.princeton.edu/pulsar/k1jt/Q65_Quick_Start.pdf
INTERVAL_15 = (15, 433)
INTERVAL_30 = (30, 217)
INTERVAL_60 = (60, 108)
INTERVAL_120 = (120, 49)
INTERVAL_300 = (300, 19)
def __new__(cls, *args, **kwargs):
interval, occupied_bandwidth = args
obj = object.__new__(cls)
obj._value_ = interval
obj.occupied_bandwidth = occupied_bandwidth
return obj
def is_available(self, mode: Q65Mode):
# total bandwidth must not exceed the typical SSB bandwidth
return self.occupied_bandwidth * mode.value < 2700
class Q65Profile(WsjtProfile):
def __init__(self, interval: Q65Interval, mode: Q65Mode):
self.interval = interval.value
2021-02-03 19:11:07 +00:00
self.mode = mode
def getMode(self):
return "Q65"
def getInterval(self):
return self.interval
def decoder_commandline(self, file):
return ["jt9", "--q65", "-p", str(self.interval), "-b", self.mode.name, "-d", str(self.decoding_depth()), file]
class WsjtParser(Parser):
def parse(self, data):
try:
profile, freq, raw_msg = data
self.setDialFrequency(freq)
msg = raw_msg.decode().rstrip()
# known debug messages we know to skip
if msg.startswith("<DecodeFinished>"):
return
if msg.startswith(" EOF on input file"):
return
mode = profile.getMode()
if mode in ["WSPR", "FST4W"]:
messageParser = BeaconMessageParser()
else:
messageParser = QsoMessageParser()
if mode == "WSPR":
decoder = WsprDecoder(profile, messageParser)
else:
decoder = Jt9Decoder(profile, messageParser)
out = decoder.parse(msg, freq)
if isinstance(profile, Q65Profile) and not out["msg"]:
# all efforts in vain, it's just a potential signal indicator
return
out["mode"] = mode
out["interval"] = profile.getInterval()
self.pushDecode(mode)
if "callsign" in out and "locator" in out:
Map.getSharedInstance().updateLocation(
out["callsign"], LocatorLocation(out["locator"]), mode, self.band
)
ReportingEngine.getSharedInstance().spot(out)
self.handler.write_wsjt_message(out)
except Exception:
logger.exception("Exception while parsing wsjt message")
2019-07-06 20:21:47 +00:00
def pushDecode(self, mode):
metrics = Metrics.getSharedInstance()
2019-09-12 21:23:50 +00:00
band = "unknown"
if self.band is not None:
band = self.band.getName()
if band is None:
band = "unknown"
if mode is None:
mode = "unknown"
name = "wsjt.decodes.{band}.{mode}".format(band=band, mode=mode)
metric = metrics.getMetric(name)
if metric is None:
metric = CounterMetric()
metrics.addMetric(name, metric)
metric.inc()
2020-04-05 19:48:05 +00:00
class Decoder(ABC):
def __init__(self, profile, messageParser):
self.profile = profile
self.messageParser = messageParser
2020-12-09 11:02:40 +00:00
def parse_timestamp(self, instring):
dateformat = self.profile.getTimestampFormat()
remain = instring[len(dateformat) + 1 :]
2020-12-09 11:02:40 +00:00
try:
ts = datetime.strptime(instring[0 : len(dateformat)], dateformat)
2020-12-09 11:02:40 +00:00
return remain, int(
datetime.combine(datetime.utcnow().date(), ts.time()).replace(tzinfo=timezone.utc).timestamp() * 1000
)
except ValueError:
return remain, None
2020-04-05 19:48:05 +00:00
@abstractmethod
def parse(self, msg, dial_freq):
pass
class MessageParser(ABC):
@abstractmethod
def parse(self, msg):
pass
# Used in QSO-style modes (FT8, FT4, FST4)
class QsoMessageParser(MessageParser):
locator_pattern = re.compile(".*\\s([A-Z0-9/]{2,})(\\sR)?\\s([A-R]{2}[0-9]{2})$")
def parse(self, msg):
m = QsoMessageParser.locator_pattern.match(msg)
2020-12-07 10:56:01 +00:00
if m is None:
return {}
# this is a valid locator in theory, but it's somewhere in the arctic ocean, near the north pole, so it's very
# likely this just means roger roger goodbye.
2020-12-07 23:01:00 +00:00
if m.group(3) == "RR73":
2020-12-07 10:56:01 +00:00
return {"callsign": m.group(1)}
2020-12-07 23:01:00 +00:00
return {"callsign": m.group(1), "locator": m.group(3)}
# Used in propagation reporting / beacon modes (WSPR / FST4W)
class BeaconMessageParser(MessageParser):
wspr_splitter_pattern = re.compile("([A-Z0-9/]*)\\s([A-R]{2}[0-9]{2})\\s([0-9]+)")
def parse(self, msg):
m = BeaconMessageParser.wspr_splitter_pattern.match(msg)
if m is None:
return {}
return {"callsign": m.group(1), "locator": m.group(2), "dbm": m.group(3)}
2020-12-07 10:56:01 +00:00
class Jt9Decoder(Decoder):
2019-09-23 20:45:55 +00:00
def parse(self, msg, dial_freq):
2019-07-13 21:16:25 +00:00
# ft8 sample
# '222100 -15 -0.0 508 ~ CQ EA7MJ IM66'
# jt65 sample
2019-07-20 10:47:10 +00:00
# '2352 -7 0.4 1801 # R0WAS R2ABM KO85'
# '0003 -4 0.4 1762 # CQ R2ABM KO85'
2020-12-07 10:56:01 +00:00
# fst4 sample
# '**** -23 0.6 3023 ` <...> <...> R 591631 BI53PV'
2020-12-09 11:02:40 +00:00
msg, timestamp = self.parse_timestamp(msg)
wsjt_msg = msg[17:53].strip()
result = {
2019-07-20 10:47:10 +00:00
"timestamp": timestamp,
"db": float(msg[0:3]),
"dt": float(msg[4:8]),
2019-09-23 20:45:55 +00:00
"freq": dial_freq + int(msg[9:13]),
"msg": wsjt_msg,
2019-07-20 10:47:10 +00:00
}
result.update(self.messageParser.parse(wsjt_msg))
return result
2019-07-13 21:16:25 +00:00
class WsprDecoder(Decoder):
2019-09-23 20:45:55 +00:00
def parse(self, msg, dial_freq):
2019-07-13 21:16:25 +00:00
# wspr sample
# '2600 -24 0.4 0.001492 -1 G8AXA JO01 33'
2019-07-20 10:47:10 +00:00
# '0052 -29 2.6 0.001486 0 G02CWT IO92 23'
2020-12-09 11:02:40 +00:00
msg, timestamp = self.parse_timestamp(msg)
wsjt_msg = msg[24:].strip()
result = {
2020-12-09 11:02:40 +00:00
"timestamp": timestamp,
"db": float(msg[0:3]),
"dt": float(msg[4:8]),
"freq": dial_freq + int(float(msg[10:20]) * 1e6),
"drift": int(msg[20:23]),
"msg": wsjt_msg,
2019-07-20 10:47:10 +00:00
}
result.update(self.messageParser.parse(wsjt_msg))
return result