import threading import wave from datetime import datetime, timedelta, date import time import sched import subprocess import os from multiprocessing.connection import Pipe import logging logger = logging.getLogger(__name__) class Ft8Chopper(threading.Thread): def __init__(self, source): self.source = source (self.wavefilename, self.wavefile) = self.getWaveFile() self.scheduler = sched.scheduler(time.time, time.sleep) self.fileQueue = [] (self.outputReader, self.outputWriter) = Pipe() self.doRun = True super().__init__() def getWaveFile(self): filename = "/tmp/openwebrx-ft8chopper-{0}.wav".format(datetime.now().strftime("%Y%m%d-%H%M%S")) wavefile = wave.open(filename, "wb") wavefile.setnchannels(1) wavefile.setsampwidth(2) wavefile.setframerate(12000) return (filename, wavefile) def getNextDecodingTime(self): t = datetime.now() seconds = (int(t.second / 15) + 1) * 15 if seconds >= 60: t = t + timedelta(minutes = 1) seconds = 0 t = t.replace(second = seconds, microsecond = 0) logger.debug("scheduling: {0}".format(t)) return t.timestamp() def startScheduler(self): self._scheduleNextSwitch() threading.Thread(target = self.scheduler.run).start() def emptyScheduler(self): for event in self.scheduler.queue: self.scheduler.cancel(event) def _scheduleNextSwitch(self): self.scheduler.enterabs(self.getNextDecodingTime(), 1, self.switchFiles) def switchFiles(self): file = self.wavefile filename = self.wavefilename (self.wavefilename, self.wavefile) = self.getWaveFile() file.close() self.fileQueue.append(filename) self._scheduleNextSwitch() def decode(self): def decode_and_unlink(file): #TODO expose decoding quality parameters through config decoder = subprocess.Popen(["jt9", "--ft8", "-d", "3", file], stdout=subprocess.PIPE) while True: line = decoder.stdout.readline() if line is None or (isinstance(line, bytes) and len(line) == 0): break self.outputWriter.send(line) rc = decoder.wait() logger.debug("decoder return code: %i", rc) os.unlink(file) self.decoder = decoder if self.fileQueue: file = self.fileQueue.pop() logger.debug("processing file {0}".format(file)) threading.Thread(target=decode_and_unlink, args=[file]).start() def run(self) -> None: logger.debug("FT8 chopper starting up") self.startScheduler() while self.doRun: data = self.source.read(256) if data is None or (isinstance(data, bytes) and len(data) == 0): logger.warning("zero read on ft8 chopper") self.doRun = False else: self.wavefile.writeframes(data) self.decode() logger.debug("FT8 chopper shutting down") self.outputReader.close() self.outputWriter.close() self.emptyScheduler() def read(self): try: return self.outputReader.recv() except EOFError: return None class WsjtParser(object): def __init__(self, handler): self.handler = handler def parse(self, data): try: msg = data.decode().rstrip() # known debug messages we know to skip if msg.startswith(""): return if msg.startswith(" EOF on input file"): return out = {} time = datetime.strptime(msg[0:6], "%H%M%S") out["timestamp"] = datetime.combine(date.today(), time.time()).timestamp() out["db"] = float(msg[7:10]) out["dt"] = float(msg[11:15]) out["freq"] = int(msg[16:20]) out["msg"] = msg[24:] self.handler.write_wsjt_message(out) except ValueError: logger.exception("error while parsing wsjt message")