diff --git a/csdr.py b/csdr.py index 066d2de..c9d17c7 100755 --- a/csdr.py +++ b/csdr.py @@ -263,6 +263,7 @@ class dsp(object): if self.get_secondary_demodulator() == "ft8": chopper = Ft8Chopper(self.secondary_process_demod.stdout) chopper.start() + self.output.add_output("wsjt_demod", chopper.read) else: self.output.add_output("secondary_demod", partial(self.secondary_process_demod.stdout.read, 1)) diff --git a/owrx/connection.py b/owrx/connection.py index 3286975..a782cfc 100644 --- a/owrx/connection.py +++ b/owrx/connection.py @@ -159,6 +159,9 @@ class OpenWebRxReceiverClient(Client): def write_metadata(self, metadata): self.protected_send({"type":"metadata","value":metadata}) + def write_wsjt_message(self, message): + self.protected_send({"type": "wsjt_message", "value": message}) + class MapConnection(Client): def __init__(self, conn): diff --git a/owrx/wsjt.py b/owrx/wsjt.py index c9fc319..cc8a7f8 100644 --- a/owrx/wsjt.py +++ b/owrx/wsjt.py @@ -1,9 +1,11 @@ import threading import wave -from datetime import datetime, timedelta +from datetime import datetime, timedelta, date import time import sched import subprocess +import os +from multiprocessing.connection import Pipe import logging logger = logging.getLogger(__name__) @@ -14,7 +16,8 @@ class Ft8Chopper(threading.Thread): self.source = source (self.wavefilename, self.wavefile) = self.getWaveFile() self.scheduler = sched.scheduler(time.time, time.sleep) - self.queue = [] + self.fileQueue = [] + (self.outputReader, self.outputWriter) = Pipe() self.doRun = True super().__init__() @@ -53,15 +56,28 @@ class Ft8Chopper(threading.Thread): (self.wavefilename, self.wavefile) = self.getWaveFile() file.close() - self.queue.append(filename) + self.fileQueue.append(filename) self._scheduleNextSwitch() def decode(self): - if self.queue: - file = self.queue.pop() - logger.debug("processing file {0}".format(file)) + def decode_and_unlink(file): #TODO expose decoding quality parameters through config - self.decoder = subprocess.Popen(["jt9", "--ft8", "-d", "3", file]) + decoder = subprocess.Popen(["jt9", "--ft8", "-d", "3", file], stdout=subprocess.PIPE) + while True: + line = decoder.stdout.readline() + if line is None or (isinstance(line, bytes) and len(line) == 0): + break + self.outputWriter.send(line) + rc = decoder.wait() + logger.debug("decoder return code: %i", rc) + os.unlink(file) + + self.decoder = decoder + + if self.fileQueue: + file = self.fileQueue.pop() + logger.debug("processing file {0}".format(file)) + threading.Thread(target=decode_and_unlink, args=[file]).start() def run(self) -> None: logger.debug("FT8 chopper starting up") @@ -76,4 +92,38 @@ class Ft8Chopper(threading.Thread): self.decode() logger.debug("FT8 chopper shutting down") + self.outputReader.close() + self.outputWriter.close() self.emptyScheduler() + + def read(self): + try: + return self.outputReader.recv() + except EOFError: + return None + + +class WsjtParser(object): + def __init__(self, handler): + self.handler = handler + + def parse(self, data): + try: + msg = data.decode().rstrip() + # known debug messages we know to skip + if msg.startswith(""): + return + if msg.startswith(" EOF on input file"): + return + + out = {} + time = datetime.strptime(msg[0:6], "%H%M%S") + out["timestamp"] = datetime.combine(date.today(), time.time()).timestamp() + out["db"] = float(msg[7:10]) + out["dt"] = float(msg[11:15]) + out["freq"] = int(msg[16:20]) + out["msg"] = msg[24:] + + self.handler.write_wsjt_message(out) + except ValueError: + logger.exception("error while parsing wsjt message")