diff --git a/csdr/chain/demodulator.py b/csdr/chain/demodulator.py index 74373c2..f7620f5 100644 --- a/csdr/chain/demodulator.py +++ b/csdr/chain/demodulator.py @@ -1,3 +1,4 @@ +from pycsdr.modules import Reader from csdr.chain import Chain from abc import ABC, abstractmethod @@ -10,6 +11,10 @@ class BaseDemodulatorChain(Chain): return True +class SecondaryDemodulator(Chain): + pass + + class FixedAudioRateChain(ABC): @abstractmethod def getFixedAudioRate(self): diff --git a/csdr/chain/digimodes.py b/csdr/chain/digimodes.py new file mode 100644 index 0000000..af3f392 --- /dev/null +++ b/csdr/chain/digimodes.py @@ -0,0 +1,14 @@ +from csdr.chain.demodulator import SecondaryDemodulator, FixedAudioRateChain +from owrx.audio.chopper import AudioChopper +from pycsdr.modules import Agc, Convert +from pycsdr.types import Format + + +class AudioChopperDemodulator(SecondaryDemodulator, FixedAudioRateChain): + # TODO parser typing + def __init__(self, mode: str, parser): + workers = [Convert(Format.FLOAT, Format.SHORT), AudioChopper(mode, parser)] + super().__init__(workers) + + def getFixedAudioRate(self): + return 12000 diff --git a/owrx/audio/chopper.py b/owrx/audio/chopper.py index 4842432..cb910fd 100644 --- a/owrx/audio/chopper.py +++ b/owrx/audio/chopper.py @@ -1,10 +1,10 @@ from owrx.modes import Modes, AudioChopperMode -from csdr.output import Output from itertools import groupby import threading from owrx.audio import ProfileSourceSubscriber from owrx.audio.wav import AudioWriter -from multiprocessing.connection import Pipe +from csdr.chain import Chain +import pickle import logging @@ -12,18 +12,18 @@ logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) -class AudioChopper(threading.Thread, Output, ProfileSourceSubscriber): - def __init__(self, active_dsp, mode_str: str): - self.read_fn = None +class AudioChopper(threading.Thread, Chain, ProfileSourceSubscriber): + # TODO parser typing + def __init__(self, mode_str: str, parser): + self.parser = parser self.doRun = True - self.dsp = active_dsp self.writers = [] mode = Modes.findByModulation(mode_str) if mode is None or not isinstance(mode, AudioChopperMode): raise ValueError("Mode {} is not an audio chopper mode".format(mode_str)) self.profile_source = mode.get_profile_source() - (self.outputReader, self.outputWriter) = Pipe() super().__init__() + Chain.__init__(self, []) def stop_writers(self): while self.writers: @@ -34,19 +34,20 @@ class AudioChopper(threading.Thread, Output, ProfileSourceSubscriber): sorted_profiles = sorted(self.profile_source.getProfiles(), key=lambda p: p.getInterval()) groups = {interval: list(group) for interval, group in groupby(sorted_profiles, key=lambda p: p.getInterval())} writers = [ - AudioWriter(self.dsp, self.outputWriter, interval, profiles) for interval, profiles in groups.items() + AudioWriter(self, interval, profiles) for interval, profiles in groups.items() ] for w in writers: w.start() self.writers = writers - def supports_type(self, t): - return t == "audio" - - def receive_output(self, t, read_fn): - self.read_fn = read_fn + def setReader(self, reader): + super().setReader(reader) self.start() + def stop(self): + self.reader.stop() + super().stop() + def run(self) -> None: logger.debug("Audio chopper starting up") self.setup_writers() @@ -54,37 +55,24 @@ class AudioChopper(threading.Thread, Output, ProfileSourceSubscriber): while self.doRun: data = None try: - data = self.read_fn(256) + data = self.reader.read() except ValueError: pass - if data is None or (isinstance(data, bytes) and len(data) == 0): + if data is None: self.doRun = False else: for w in self.writers: - w.write(data) + w.write(data.tobytes()) logger.debug("Audio chopper shutting down") self.profile_source.unsubscribe(self) self.stop_writers() - self.outputWriter.close() - self.outputWriter = None - - # drain messages left in the queue so that the queue can be successfully closed - # this is necessary since python keeps the file descriptors open otherwise - try: - while True: - self.outputReader.recv() - except EOFError: - pass - self.outputReader.close() - self.outputReader = None def onProfilesChanged(self): logger.debug("profile change received, resetting writers...") self.setup_writers() - def read(self): - try: - return self.outputReader.recv() - except (EOFError, OSError): - return None + def send(self, profile, line): + data = self.parser.parse(profile, line) + if data is not None: + self.writer.write(pickle.dumps(data)) diff --git a/owrx/audio/queue.py b/owrx/audio/queue.py index daf27c8..79f6911 100644 --- a/owrx/audio/queue.py +++ b/owrx/audio/queue.py @@ -13,11 +13,10 @@ logger.setLevel(logging.INFO) class QueueJob(object): - def __init__(self, profile, writer, file, freq): + def __init__(self, profile, writer, file): self.profile = profile self.writer = writer self.file = file - self.freq = freq def run(self): logger.debug("processing file %s", self.file) @@ -30,7 +29,7 @@ class QueueJob(object): ) try: for line in decoder.stdout: - self.writer.send((self.profile, self.freq, line)) + self.writer.send(self.profile, line) except (OSError, AttributeError): decoder.stdout.flush() # TODO uncouple parsing from the output so that decodes can still go to the map and the spotters diff --git a/owrx/audio/wav.py b/owrx/audio/wav.py index 37af029..5e8e9e3 100644 --- a/owrx/audio/wav.py +++ b/owrx/audio/wav.py @@ -47,8 +47,7 @@ class WaveFile(object): class AudioWriter(object): - def __init__(self, active_dsp, outputWriter, interval, profiles: List[AudioChopperProfile]): - self.dsp = active_dsp + def __init__(self, outputWriter, interval, profiles: List[AudioChopperProfile]): self.outputWriter = outputWriter self.interval = interval self.profiles = profiles @@ -102,7 +101,7 @@ class AudioWriter(object): logger.exception("Error while linking job files") continue - job = QueueJob(profile, self.outputWriter, filename, self.dsp.get_operating_freq()) + job = QueueJob(profile, self.outputWriter, filename) try: DecoderQueue.getSharedInstance().put(job) except Full: diff --git a/owrx/connection.py b/owrx/connection.py index 90332cd..41b0e08 100644 --- a/owrx/connection.py +++ b/owrx/connection.py @@ -17,9 +17,11 @@ from owrx.websocket import Handler from queue import Queue, Full, Empty from js8py import Js8Frame from abc import ABCMeta, abstractmethod +from io import BytesIO import json import threading import struct +import pickle import logging @@ -417,7 +419,12 @@ class OpenWebRxReceiverClient(OpenWebRxClient, SdrSourceEventClient): self.send({"type": "metadata", "value": metadata}) def write_wsjt_message(self, message): - self.send({"type": "wsjt_message", "value": message}) + io = BytesIO(message.tobytes()) + try: + while True: + self.send({"type": "wsjt_message", "value": pickle.load(io)}) + except EOFError: + pass def write_dial_frequencies(self, frequencies): self.send({"type": "dial_frequencies", "value": frequencies}) diff --git a/owrx/dsp.py b/owrx/dsp.py index 85e328a..2f32957 100644 --- a/owrx/dsp.py +++ b/owrx/dsp.py @@ -9,12 +9,13 @@ from owrx.property.validators import OrValidator, RegexValidator, BoolValidator from owrx.modes import Modes from csdr.output import Output from csdr.chain import Chain -from csdr.chain.demodulator import BaseDemodulatorChain, FixedIfSampleRateChain, FixedAudioRateChain, HdAudio +from csdr.chain.demodulator import BaseDemodulatorChain, FixedIfSampleRateChain, FixedAudioRateChain, HdAudio, SecondaryDemodulator from csdr.chain.selector import Selector from csdr.chain.clientaudio import ClientAudioChain from csdr.chain.analog import NFm, WFm, Am, Ssb from csdr.chain.digiham import DigihamChain, Dmr, Dstar, Nxdn, Ysf from csdr.chain.fft import FftChain +from csdr.chain.digimodes import AudioChopperDemodulator from pycsdr.modules import Buffer, Writer from pycsdr.types import Format from typing import Union @@ -34,22 +35,45 @@ class ClientDemodulatorChain(Chain): self.selector = Selector(sampleRate, outputRate, 0.0) self.selector.setBandpass(-4000, 4000) self.selectorBuffer = Buffer(Format.COMPLEX_FLOAT) + self.audioBuffer = None self.demodulator = demod + self.secondaryDemodulator = None inputRate = demod.getFixedAudioRate() if isinstance(demod, FixedAudioRateChain) else outputRate oRate = hdOutputRate if isinstance(demod, HdAudio) else outputRate self.clientAudioChain = ClientAudioChain(demod.getOutputFormat(), inputRate, oRate, audioCompression) self.secondaryFftChain = None self.metaWriter = None + self.secondaryFftWriter = None + self.secondaryWriter = None self.squelchLevel = -150 super().__init__([self.selector, self.demodulator, self.clientAudioChain]) + def stop(self): + super().stop() + if self.secondaryFftChain is not None: + self.secondaryFftChain.stop() + self.secondaryFftChain = None + if self.secondaryDemodulator is not None: + self.secondaryDemodulator.stop() + self.secondaryDemodulator = None + def _connect(self, w1, w2, buffer: Union[Buffer, None] = None) -> None: if w1 is self.selector: super()._connect(w1, w2, self.selectorBuffer) + elif w2 is self.clientAudioChain: + format = w1.getOutputFormat() + if self.audioBuffer is None or self.audioBuffer.getFormat() != format: + self.audioBuffer = Buffer(format) + if self.secondaryDemodulator is not None: + self.secondaryDemodulator.setReader(self.audioBuffer.getReader()) + super()._connect(w1, w2, self.audioBuffer) else: super()._connect(w1, w2) def setDemodulator(self, demodulator: BaseDemodulatorChain): + if demodulator is self.demodulator: + return + try: self.clientAudioChain.setFormat(demodulator.getOutputFormat()) except ValueError: @@ -68,11 +92,15 @@ class ClientDemodulatorChain(Chain): if isinstance(self.demodulator, FixedIfSampleRateChain): self.selector.setOutputRate(self.demodulator.getFixedIfSampleRate()) + elif self.secondaryDemodulator is not None and isinstance(self.secondaryDemodulator, FixedAudioRateChain): + self.selector.setOutputRate(self.secondaryDemodulator.getFixedAudioRate()) else: self.selector.setOutputRate(outputRate) if isinstance(self.demodulator, FixedAudioRateChain): self.clientAudioChain.setInputRate(self.demodulator.getFixedAudioRate()) + elif self.secondaryDemodulator is not None and isinstance(self.secondaryDemodulator, FixedAudioRateChain): + self.clientAudioChain.setInputRate(self.secondaryDemodulator.getFixedAudioRate()) else: self.clientAudioChain.setInputRate(outputRate) @@ -86,6 +114,39 @@ class ClientDemodulatorChain(Chain): if self.metaWriter is not None and isinstance(demodulator, DigihamChain): demodulator.setMetaWriter(self.metaWriter) + def setSecondaryDemodulator(self, demod: Union[SecondaryDemodulator, None]): + if demod is self.secondaryDemodulator: + return + + if self.secondaryDemodulator is not None: + self.secondaryDemodulator.stop() + + self.secondaryDemodulator = demod + + if self.secondaryDemodulator is not None and isinstance(self.secondaryDemodulator, FixedAudioRateChain): + if isinstance(self.demodulator, FixedAudioRateChain) and self.demodulator.getFixedAudioRate() != self.secondaryDemodulator.getFixedAudioRate(): + raise ValueError("secondary and primary demodulator chain audio rates do not match!") + else: + rate = self.secondaryDemodulator.getFixedAudioRate() + else: + rate = self.outputRate + self.selector.setOutputRate(rate) + self.clientAudioChain.setInputRate(rate) + + if self.secondaryDemodulator is not None: + self.secondaryDemodulator.setReader(self.audioBuffer.getReader()) + self.secondaryDemodulator.setWriter(self.secondaryWriter) + + if self.secondaryDemodulator is None and self.secondaryFftChain is not None: + self.secondaryFftChain.stop() + self.secondaryFftChain = None + + if self.secondaryDemodulator is not None and self.secondaryFftChain is None: + # TODO eliminate constants + self.secondaryFftChain = FftChain(self.outputRate, 2048, 0.3, 9, "adpcm") + self.secondaryFftChain.setReader(self.selectorBuffer.getReader()) + self.secondaryFftChain.setWriter(self.secondaryFftWriter) + def setLowCut(self, lowCut): self.selector.setLowCut(lowCut) @@ -153,19 +214,21 @@ class ClientDemodulatorChain(Chain): if isinstance(self.demodulator, DigihamChain): self.demodulator.setMetaWriter(self.metaWriter) - def setSecondaryFftWriter(self, writer: Union[Writer, None]) -> None: - if writer is None: - if self.secondaryFftChain is not None: - self.secondaryFftChain.stop() - self.secondaryFftChain = None - else: - if self.secondaryFftChain is None: - # TODO eliminate constants - self.secondaryFftChain = FftChain(self.outputRate, 2048, 0.3, 9, "adpcm") - self.secondaryFftChain.setReader(self.selectorBuffer.getReader()) + def setSecondaryFftWriter(self, writer: Writer) -> None: + if writer is self.secondaryFftWriter: + return + self.secondaryFftWriter = writer + if self.secondaryFftChain is not None: self.secondaryFftChain.setWriter(writer) + def setSecondaryWriter(self, writer: Writer) -> None: + if writer is self.secondaryWriter: + return + self.secondaryWriter = writer + if self.secondaryDemodulator is not None: + self.secondaryDemodulator.setWriter(writer) + def setSecondaryFftSize(self, size: int) -> None: # TODO pass @@ -186,7 +249,6 @@ class DspManager(Output, SdrSourceEventClient): self.sdrSource = sdrSource self.parsers = { "meta": MetaParser(self.handler), - "wsjt_demod": WsjtParser(self.handler), "packet_demod": AprsParser(self.handler), "pocsag_demod": PocsagParser(self.handler), "js8_demod": Js8Parser(self.handler), @@ -260,6 +322,18 @@ class DspManager(Output, SdrSourceEventClient): self.chain.setMetaWriter(buffer) self.wireOutput("meta", buffer) + # wire secondary FFT + # TODO format is different depending on compression + buffer = Buffer(Format.CHAR) + self.chain.setSecondaryFftWriter(buffer) + self.wireOutput("secondary_fft", buffer) + + # wire secondary demodulator + buffer = Buffer(Format.CHAR) + self.chain.setSecondaryWriter(buffer) + # TODO there's multiple outputs depending on the modulation right now + self.wireOutput("wsjt_demod", buffer) + def set_dial_freq(changes): if ( "center_freq" not in self.props @@ -380,16 +454,21 @@ class DspManager(Output, SdrSourceEventClient): } ) - def setSecondaryDemodulator(self, mod): - if not mod: - self.chain.setSecondaryFftWriter(None) - else: - buffer = Buffer(Format.CHAR) - self.chain.setSecondaryFftWriter(buffer) - self.wireOutput("secondary_fft", buffer) + def _getSecondaryDemodulator(self, mod): + if isinstance(mod, SecondaryDemodulator): + return mod + # TODO add remaining modes + if mod in ["ft8"]: + return AudioChopperDemodulator(mod, WsjtParser()) + return None + def setSecondaryDemodulator(self, mod): + demodulator = self._getSecondaryDemodulator(mod) + if not demodulator: + self.chain.setSecondaryDemodulator(None) + else: self.sendSecondaryConfig() - #self.chain.setSecondaryDemodulator(mod) + self.chain.setSecondaryDemodulator(demodulator) def setAudioCompression(self, comp): try: @@ -420,6 +499,7 @@ class DspManager(Output, SdrSourceEventClient): "smeter": self.handler.write_s_meter_level, "secondary_fft": self.handler.write_secondary_fft, "secondary_demod": self.handler.write_secondary_demod, + "wsjt_demod": self.handler.write_wsjt_message, } for demod, parser in self.parsers.items(): writers[demod] = parser.parse diff --git a/owrx/wsjt.py b/owrx/wsjt.py index 0693046..b4a7d25 100644 --- a/owrx/wsjt.py +++ b/owrx/wsjt.py @@ -1,15 +1,14 @@ from datetime import datetime, timezone from typing import List - from owrx.map import Map, LocatorLocation -import re from owrx.metrics import Metrics, CounterMetric from owrx.reporting import ReportingEngine -from owrx.parser import Parser from owrx.audio import AudioChopperProfile, StaticProfileSource, ConfigWiredProfileSource from abc import ABC, ABCMeta, abstractmethod from owrx.config import Config from enum import Enum +from owrx.bands import Bandplan +import re import logging @@ -245,11 +244,13 @@ class Q65Profile(WsjtProfile): return ["jt9", "--q65", "-p", str(self.interval), "-b", self.mode.name, "-d", str(self.decoding_depth()), file] -class WsjtParser(Parser): - def parse(self, data): +class WsjtParser: + def parse(self, profile, raw_msg): try: - profile, freq, raw_msg = data - self.setDialFrequency(freq) + # TODO get the frequency back from somewhere + freq = 14074000 + band = Bandplan.getSharedInstance().findBand(freq) + msg = raw_msg.decode().rstrip() # known debug messages we know to skip if msg.startswith(""): @@ -273,29 +274,27 @@ class WsjtParser(Parser): out["mode"] = mode out["interval"] = profile.getInterval() - self.pushDecode(mode) + self.pushDecode(mode, band) if "callsign" in out and "locator" in out: Map.getSharedInstance().updateLocation( - out["callsign"], LocatorLocation(out["locator"]), mode, self.band + out["callsign"], LocatorLocation(out["locator"]), mode, band ) ReportingEngine.getSharedInstance().spot(out) - self.handler.write_wsjt_message(out) + return out except Exception: logger.exception("Exception while parsing wsjt message") - def pushDecode(self, mode): + def pushDecode(self, mode, band): metrics = Metrics.getSharedInstance() - band = "unknown" - if self.band is not None: - band = self.band.getName() - if band is None: - band = "unknown" + bandName = "unknown" + if band is not None: + bandName = band.getName() if mode is None: mode = "unknown" - name = "wsjt.decodes.{band}.{mode}".format(band=band, mode=mode) + name = "wsjt.decodes.{band}.{mode}".format(band=bandName, mode=mode) metric = metrics.getMetric(name) if metric is None: metric = CounterMetric()