diff --git a/owrx/audio.py b/owrx/audio.py index 90b22a5..4e81f06 100644 --- a/owrx/audio.py +++ b/owrx/audio.py @@ -5,7 +5,7 @@ import threading import wave import subprocess import os -from multiprocessing.connection import Pipe +from multiprocessing.connection import Pipe, wait from datetime import datetime, timedelta from queue import Queue, Full @@ -13,7 +13,7 @@ from queue import Queue, Full import logging logger = logging.getLogger(__name__) -logger.setLevel(logging.INFO) +#logger.setLevel(logging.INFO) class QueueJob(object): @@ -117,7 +117,7 @@ class AudioChopperProfile(ABC): return 3 -class AudioChopper(threading.Thread, metaclass=ABCMeta): +class AudioWriter(object): def __init__(self, dsp, source, profile: AudioChopperProfile): self.dsp = dsp self.source = source @@ -128,12 +128,12 @@ class AudioChopper(threading.Thread, metaclass=ABCMeta): self.switchingLock = threading.Lock() self.timer = None (self.outputReader, self.outputWriter) = Pipe() - self.doRun = True - super().__init__() def getWaveFile(self): filename = "{tmp_dir}/openwebrx-audiochopper-{id}-{timestamp}.wav".format( - tmp_dir=self.tmp_dir, id=id(self), timestamp=datetime.utcnow().strftime(self.profile.getFileTimestampFormat()) + tmp_dir=self.tmp_dir, + id=id(self.profile), + timestamp=datetime.utcnow().strftime(self.profile.getFileTimestampFormat()), ) wavefile = wave.open(filename, "wb") wavefile.setnchannels(1) @@ -158,10 +158,9 @@ class AudioChopper(threading.Thread, metaclass=ABCMeta): def _scheduleNextSwitch(self): self.cancelTimer() - if self.doRun: - delta = self.getNextDecodingTime() - datetime.utcnow() - self.timer = threading.Timer(delta.total_seconds(), self.switchFiles) - self.timer.start() + delta = self.getNextDecodingTime() - datetime.utcnow() + self.timer = threading.Timer(delta.total_seconds(), self.switchFiles) + self.timer.start() def switchFiles(self): self.switchingLock.acquire() @@ -197,20 +196,16 @@ class AudioChopper(threading.Thread, metaclass=ABCMeta): decoder.kill() os.unlink(job.file) - def run(self) -> None: - logger.debug("WSJT chopper starting up") + def start(self): (self.wavefilename, self.wavefile) = self.getWaveFile() self._scheduleNextSwitch() - while self.doRun: - data = self.source.read(256) - if data is None or (isinstance(data, bytes) and len(data) == 0): - self.doRun = False - else: - self.switchingLock.acquire() - self.wavefile.writeframes(data) - self.switchingLock.release() - logger.debug("WSJT chopper shutting down") + def write(self, data): + self.switchingLock.acquire() + self.wavefile.writeframes(data) + self.switchingLock.release() + + def stop(self): self.outputReader.close() self.outputWriter.close() self.cancelTimer() @@ -219,8 +214,33 @@ class AudioChopper(threading.Thread, metaclass=ABCMeta): except Exception: logger.exception("error removing undecoded file") + +class AudioChopper(threading.Thread, metaclass=ABCMeta): + def __init__(self, dsp, source, *profiles: AudioChopperProfile): + self.source = source + self.writers = [AudioWriter(dsp, source, p) for p in profiles] + self.doRun = True + super().__init__() + + def run(self) -> None: + logger.debug("Audio chopper starting up") + for w in self.writers: + w.start() + while self.doRun: + data = self.source.read(256) + if data is None or (isinstance(data, bytes) and len(data) == 0): + self.doRun = False + else: + for w in self.writers: + w.write(data) + + logger.debug("Audio chopper shutting down") + for w in self.writers: + w.stop() + def read(self): try: - return self.outputReader.recv() + readers = wait([w.outputReader for w in self.writers]) + return [r.recv() for r in readers] except EOFError: return None diff --git a/owrx/js8.py b/owrx/js8.py index 303a273..5611f07 100644 --- a/owrx/js8.py +++ b/owrx/js8.py @@ -26,40 +26,41 @@ class Js8NormalProfile(AudioChopperProfile): class Js8Parser(Parser): decoderRegex = re.compile(" ?") - def parse(self, raw): - try: - freq, raw_msg = raw - self.setDialFrequency(freq) - msg = raw_msg.decode().rstrip() - if Js8Parser.decoderRegex.match(msg): - return - if msg.startswith(" EOF on input file"): - return + def parse(self, messages): + for raw in messages: + try: + freq, raw_msg = raw + self.setDialFrequency(freq) + msg = raw_msg.decode().rstrip() + if Js8Parser.decoderRegex.match(msg): + return + if msg.startswith(" EOF on input file"): + return - logger.debug(msg) + logger.debug(msg) - frame = Js8().parse_message(msg) - self.handler.write_js8_message(frame, self.dial_freq) - logger.debug(frame) + frame = Js8().parse_message(msg) + self.handler.write_js8_message(frame, self.dial_freq) + logger.debug(frame) - self.pushDecode() + self.pushDecode() - if (isinstance(frame, Js8FrameHeartbeat) or isinstance(frame, Js8FrameCompound)) and frame.grid: - Map.getSharedInstance().updateLocation( - frame.callsign, LocatorLocation(frame.grid), "JS8", self.band - ) - PskReporter.getSharedInstance().spot({ - "callsign": frame.callsign, - "mode": "JS8", - "locator": frame.grid, - "freq": self.dial_freq + frame.freq, - "db": frame.db, - "timestamp": frame.timestamp, - "msg": str(frame) - }) + if (isinstance(frame, Js8FrameHeartbeat) or isinstance(frame, Js8FrameCompound)) and frame.grid: + Map.getSharedInstance().updateLocation( + frame.callsign, LocatorLocation(frame.grid), "JS8", self.band + ) + PskReporter.getSharedInstance().spot({ + "callsign": frame.callsign, + "mode": "JS8", + "locator": frame.grid, + "freq": self.dial_freq + frame.freq, + "db": frame.db, + "timestamp": frame.timestamp, + "msg": str(frame) + }) - except Exception: - logger.exception("error while parsing js8 message") + except Exception: + logger.exception("error while parsing js8 message") def pushDecode(self): metrics = Metrics.getSharedInstance() diff --git a/owrx/wsjt.py b/owrx/wsjt.py index 475e3b3..6e4ea1a 100644 --- a/owrx/wsjt.py +++ b/owrx/wsjt.py @@ -10,7 +10,6 @@ from abc import ABC, abstractmethod import logging logger = logging.getLogger(__name__) -logger.setLevel(logging.INFO) class Ft8Profile(AudioChopperProfile): @@ -75,34 +74,35 @@ class Ft4Profile(AudioChopperProfile): class WsjtParser(Parser): modes = {"~": "FT8", "#": "JT65", "@": "JT9", "+": "FT4"} - def parse(self, data): - try: - freq, raw_msg = data - self.setDialFrequency(freq) - msg = raw_msg.decode().rstrip() - # known debug messages we know to skip - if msg.startswith(""): - return - if msg.startswith(" EOF on input file"): - return + def parse(self, messages): + for data in messages: + try: + freq, raw_msg = data + self.setDialFrequency(freq) + msg = raw_msg.decode().rstrip() + # known debug messages we know to skip + if msg.startswith(""): + return + if msg.startswith(" EOF on input file"): + return - modes = list(WsjtParser.modes.keys()) - if msg[21] in modes or msg[19] in modes: - decoder = Jt9Decoder() - else: - decoder = WsprDecoder() - out = decoder.parse(msg, freq) - if "mode" in out: - self.pushDecode(out["mode"]) - if "callsign" in out and "locator" in out: - Map.getSharedInstance().updateLocation( - out["callsign"], LocatorLocation(out["locator"]), out["mode"], self.band - ) - PskReporter.getSharedInstance().spot(out) + modes = list(WsjtParser.modes.keys()) + if msg[21] in modes or msg[19] in modes: + decoder = Jt9Decoder() + else: + decoder = WsprDecoder() + out = decoder.parse(msg, freq) + if "mode" in out: + self.pushDecode(out["mode"]) + if "callsign" in out and "locator" in out: + Map.getSharedInstance().updateLocation( + out["callsign"], LocatorLocation(out["locator"]), out["mode"], self.band + ) + PskReporter.getSharedInstance().spot(out) - self.handler.write_wsjt_message(out) - except ValueError: - logger.exception("error while parsing wsjt message") + self.handler.write_wsjt_message(out) + except ValueError: + logger.exception("error while parsing wsjt message") def pushDecode(self, mode): metrics = Metrics.getSharedInstance()