diff --git a/owrx/audio/chopper.py b/owrx/audio/chopper.py index 879fd0e..d26db9e 100644 --- a/owrx/audio/chopper.py +++ b/owrx/audio/chopper.py @@ -22,8 +22,7 @@ class AudioChopper(threading.Thread, Output, ProfileSourceSubscriber): if mode is None or not isinstance(mode, AudioChopperMode): raise ValueError("Mode {} is not an audio chopper mode".format(mode_str)) self.profile_source = mode.get_profile_source() - self.writersChangedOut = None - self.writersChangedIn = None + (self.outputReader, self.outputWriter) = Pipe() super().__init__() def stop_writers(self): @@ -34,11 +33,10 @@ class AudioChopper(threading.Thread, Output, ProfileSourceSubscriber): self.stop_writers() sorted_profiles = sorted(self.profile_source.getProfiles(), key=lambda p: p.getInterval()) groups = {interval: list(group) for interval, group in groupby(sorted_profiles, key=lambda p: p.getInterval())} - writers = [AudioWriter(self.dsp, interval, profiles) for interval, profiles in groups.items()] + writers = [AudioWriter(self.dsp, self.outputWriter, interval, profiles) for interval, profiles in groups.items()] for w in writers: w.start() self.writers = writers - self.writersChangedOut.send(None) def supports_type(self, t): return t == "audio" @@ -49,7 +47,6 @@ class AudioChopper(threading.Thread, Output, ProfileSourceSubscriber): def run(self) -> None: logger.debug("Audio chopper starting up") - self.writersChangedIn, self.writersChangedOut = Pipe() self.setup_writers() self.profile_source.subscribe(self) while self.doRun: @@ -67,20 +64,25 @@ class AudioChopper(threading.Thread, Output, ProfileSourceSubscriber): logger.debug("Audio chopper shutting down") self.profile_source.unsubscribe(self) self.stop_writers() - self.writersChangedOut.close() - self.writersChangedIn.close() + self.outputWriter.close() + self.outputWriter = None + + # drain messages left in the queue so that the queue can be successfully closed + # this is necessary since python keeps the file descriptors open otherwise + try: + while True: + self.outputReader.recv() + except EOFError: + pass + self.outputReader.close() + self.outputReader = None def onProfilesChanged(self): logger.debug("profile change received, resetting writers...") self.setup_writers() def read(self): - while True: - try: - readers = wait([w.outputReader for w in self.writers] + [self.writersChangedIn]) - received = [(r, r.recv()) for r in readers] - data = [d for r, d in received if r is not self.writersChangedIn] - if data: - return data - except (EOFError, OSError): - return None + try: + return self.outputReader.recv() + except (EOFError, OSError): + return None diff --git a/owrx/audio/wav.py b/owrx/audio/wav.py index c8f5a5f..cccdc96 100644 --- a/owrx/audio/wav.py +++ b/owrx/audio/wav.py @@ -38,14 +38,14 @@ class WaveFile(object): class AudioWriter(object): - def __init__(self, active_dsp, interval, profiles: List[AudioChopperProfile]): + def __init__(self, active_dsp, outputWriter, interval, profiles: List[AudioChopperProfile]): self.dsp = active_dsp + self.outputWriter = outputWriter self.interval = interval self.profiles = profiles self.wavefile = None self.switchingLock = threading.Lock() self.timer = None - (self.outputReader, self.outputWriter) = Pipe() def getWaveFile(self): tmp_dir = CoreConfig().get_temporary_directory() @@ -114,19 +114,6 @@ class AudioWriter(object): self.wavefile.writeframes(data) def stop(self): - self.outputWriter.close() - self.outputWriter = None - - # drain messages left in the queue so that the queue can be successfully closed - # this is necessary since python keeps the file descriptors open otherwise - try: - while True: - self.outputReader.recv() - except EOFError: - pass - self.outputReader.close() - self.outputReader = None - self.cancelTimer() try: self.wavefile.close() diff --git a/owrx/js8.py b/owrx/js8.py index d4bbdb9..97b1195 100644 --- a/owrx/js8.py +++ b/owrx/js8.py @@ -84,40 +84,39 @@ class Js8TurboProfile(Js8Profile): class Js8Parser(Parser): decoderRegex = re.compile(" ?") - def parse(self, messages): - for raw in messages: - try: - profile, freq, raw_msg = raw - self.setDialFrequency(freq) - msg = raw_msg.decode().rstrip() - if Js8Parser.decoderRegex.match(msg): - return - if msg.startswith(" EOF on input file"): - return + def parse(self, raw): + try: + profile, freq, raw_msg = raw + self.setDialFrequency(freq) + msg = raw_msg.decode().rstrip() + if Js8Parser.decoderRegex.match(msg): + return + if msg.startswith(" EOF on input file"): + return - frame = Js8().parse_message(msg) - self.handler.write_js8_message(frame, self.dial_freq) + frame = Js8().parse_message(msg) + self.handler.write_js8_message(frame, self.dial_freq) - self.pushDecode() + self.pushDecode() - if (isinstance(frame, Js8FrameHeartbeat) or isinstance(frame, Js8FrameCompound)) and frame.grid: - Map.getSharedInstance().updateLocation( - frame.callsign, LocatorLocation(frame.grid), "JS8", self.band - ) - ReportingEngine.getSharedInstance().spot( - { - "callsign": frame.callsign, - "mode": "JS8", - "locator": frame.grid, - "freq": self.dial_freq + frame.freq, - "db": frame.db, - "timestamp": frame.timestamp, - "msg": str(frame), - } - ) + if (isinstance(frame, Js8FrameHeartbeat) or isinstance(frame, Js8FrameCompound)) and frame.grid: + Map.getSharedInstance().updateLocation( + frame.callsign, LocatorLocation(frame.grid), "JS8", self.band + ) + ReportingEngine.getSharedInstance().spot( + { + "callsign": frame.callsign, + "mode": "JS8", + "locator": frame.grid, + "freq": self.dial_freq + frame.freq, + "db": frame.db, + "timestamp": frame.timestamp, + "msg": str(frame), + } + ) - except Exception: - logger.exception("error while parsing js8 message") + except Exception: + logger.exception("error while parsing js8 message") def pushDecode(self): metrics = Metrics.getSharedInstance() diff --git a/owrx/wsjt.py b/owrx/wsjt.py index 4bd96d5..0693046 100644 --- a/owrx/wsjt.py +++ b/owrx/wsjt.py @@ -246,44 +246,43 @@ class Q65Profile(WsjtProfile): class WsjtParser(Parser): - def parse(self, messages): - for data in messages: - try: - profile, freq, raw_msg = data - self.setDialFrequency(freq) - msg = raw_msg.decode().rstrip() - # known debug messages we know to skip - if msg.startswith(""): - return - if msg.startswith(" EOF on input file"): - return + def parse(self, data): + try: + profile, freq, raw_msg = data + self.setDialFrequency(freq) + msg = raw_msg.decode().rstrip() + # known debug messages we know to skip + if msg.startswith(""): + return + if msg.startswith(" EOF on input file"): + return - mode = profile.getMode() - if mode in ["WSPR", "FST4W"]: - messageParser = BeaconMessageParser() - else: - messageParser = QsoMessageParser() - if mode == "WSPR": - decoder = WsprDecoder(profile, messageParser) - else: - decoder = Jt9Decoder(profile, messageParser) - out = decoder.parse(msg, freq) - if isinstance(profile, Q65Profile) and not out["msg"]: - # all efforts in vain, it's just a potential signal indicator - return - out["mode"] = mode - out["interval"] = profile.getInterval() + mode = profile.getMode() + if mode in ["WSPR", "FST4W"]: + messageParser = BeaconMessageParser() + else: + messageParser = QsoMessageParser() + if mode == "WSPR": + decoder = WsprDecoder(profile, messageParser) + else: + decoder = Jt9Decoder(profile, messageParser) + out = decoder.parse(msg, freq) + if isinstance(profile, Q65Profile) and not out["msg"]: + # all efforts in vain, it's just a potential signal indicator + return + out["mode"] = mode + out["interval"] = profile.getInterval() - self.pushDecode(mode) - if "callsign" in out and "locator" in out: - Map.getSharedInstance().updateLocation( - out["callsign"], LocatorLocation(out["locator"]), mode, self.band - ) - ReportingEngine.getSharedInstance().spot(out) + self.pushDecode(mode) + if "callsign" in out and "locator" in out: + Map.getSharedInstance().updateLocation( + out["callsign"], LocatorLocation(out["locator"]), mode, self.band + ) + ReportingEngine.getSharedInstance().spot(out) - self.handler.write_wsjt_message(out) - except Exception: - logger.exception("Exception while parsing wsjt message") + self.handler.write_wsjt_message(out) + except Exception: + logger.exception("Exception while parsing wsjt message") def pushDecode(self, mode): metrics = Metrics.getSharedInstance()