diff --git a/csdr/csdr.py b/csdr/csdr.py index f0d2498..651944e 100644 --- a/csdr/csdr.py +++ b/csdr/csdr.py @@ -162,6 +162,7 @@ class dsp(object): self.output_rate = 11025 self.fft_size = 1024 self.fft_fps = 5 + self.center_freq = 0 self.offset_freq = 0 self.low_cut = -4000 self.high_cut = 4000 @@ -448,18 +449,19 @@ class dsp(object): if self.isWsjtMode(): smd = self.get_secondary_demodulator() - chopper = None + chopper_cls = None if smd == "ft8": - chopper = Ft8Chopper(self.secondary_process_demod.stdout) + chopper_cls = Ft8Chopper elif smd == "wspr": - chopper = WsprChopper(self.secondary_process_demod.stdout) + chopper_cls = WsprChopper elif smd == "jt65": - chopper = Jt65Chopper(self.secondary_process_demod.stdout) + chopper_cls = Jt65Chopper elif smd == "jt9": - chopper = Jt9Chopper(self.secondary_process_demod.stdout) + chopper_cls = Jt9Chopper elif smd == "ft4": - chopper = Ft4Chopper(self.secondary_process_demod.stdout) - if chopper is not None: + chopper_cls = Ft4Chopper + if chopper_cls is not None: + chopper = chopper_cls(self, self.secondary_process_demod.stdout) chopper.start() self.output.send_output("wsjt_demod", chopper.read) elif self.isPacket(): @@ -627,6 +629,13 @@ class dsp(object): with self.modification_lock: self.pipes["shift_pipe"].write("%g\n" % (-float(self.offset_freq) / self.samp_rate)) + def set_center_freq(self, center_freq): + # dsp only needs to know this to be able to pass it to decoders in the form of get_operating_freq() + self.center_freq = center_freq + + def get_operating_freq(self): + return self.center_freq + self.offset_freq + def set_bpf(self, low_cut, high_cut): self.low_cut = low_cut self.high_cut = high_cut diff --git a/owrx/dsp.py b/owrx/dsp.py index f0f1fd5..32b9116 100644 --- a/owrx/dsp.py +++ b/owrx/dsp.py @@ -77,6 +77,7 @@ class DspManager(csdr.output): self.props.wireProperty("samp_rate", self.dsp.set_samp_rate), self.props.wireProperty("output_rate", self.dsp.set_output_rate), self.props.wireProperty("offset_freq", self.dsp.set_offset_freq), + self.props.wireProperty("center_freq", self.dsp.set_center_freq), self.props.wireProperty("squelch_level", self.dsp.set_squelch_level), self.props.wireProperty("low_cut", set_low_cut), self.props.wireProperty("high_cut", set_high_cut), diff --git a/owrx/service/__init__.py b/owrx/service/__init__.py index b1686dc..5498e5e 100644 --- a/owrx/service/__init__.py +++ b/owrx/service/__init__.py @@ -262,7 +262,9 @@ class ServiceHandler(object): output = WsjtServiceOutput(frequency) d = dsp(output) d.nc_port = source.getPort() - d.set_offset_freq(frequency - source.getProps()["center_freq"]) + center_freq = source.getProps()["center_freq"] + d.set_offset_freq(frequency - center_freq) + d.set_center_freq(center_freq) if mode == "packet": d.set_demodulator("nfm") d.set_bpf(-4000, 4000) diff --git a/owrx/wsjt.py b/owrx/wsjt.py index 4a30864..651c825 100644 --- a/owrx/wsjt.py +++ b/owrx/wsjt.py @@ -19,6 +19,16 @@ logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) +class QueueJob(object): + def __init__(self, decoder, file, freq): + self.decoder = decoder + self.file = file + self.freq = freq + + def run(self): + self.decoder.decode(self) + + class WsjtQueueWorker(threading.Thread): def __init__(self, queue): self.queue = queue @@ -27,10 +37,9 @@ class WsjtQueueWorker(threading.Thread): def run(self) -> None: while self.doRun: - (processor, file) = self.queue.get() + job = self.queue.get() try: - logger.debug("processing file %s", file) - processor.decode(file) + job.run() except Exception: logger.exception("failed to decode job") self.queue.onError() @@ -87,7 +96,8 @@ class WsjtQueue(Queue): class WsjtChopper(threading.Thread, metaclass=ABCMeta): - def __init__(self, source): + def __init__(self, dsp, source): + self.dsp = dsp self.source = source self.tmp_dir = Config.get()["temporary_directory"] (self.wavefilename, self.wavefile) = self.getWaveFile() @@ -135,7 +145,7 @@ class WsjtChopper(threading.Thread, metaclass=ABCMeta): file.close() try: - WsjtQueue.getSharedInstance().put((self, filename)) + WsjtQueue.getSharedInstance().put(QueueJob(self, filename, self.dsp.get_operating_freq())) except Full: logger.warning("wsjt decoding queue overflow; dropping one file") os.unlink(filename) @@ -145,15 +155,16 @@ class WsjtChopper(threading.Thread, metaclass=ABCMeta): def decoder_commandline(self, file): pass - def decode(self, file): + def decode(self, job: QueueJob): + logger.debug("processing file %s", job.file) decoder = subprocess.Popen( - ["nice", "-n", "10"] + self.decoder_commandline(file), + ["nice", "-n", "10"] + self.decoder_commandline(job.file), stdout=subprocess.PIPE, cwd=self.tmp_dir, close_fds=True, ) for line in decoder.stdout: - self.outputWriter.send(line) + self.outputWriter.send((job.freq, line)) try: rc = decoder.wait(timeout=10) if rc != 0: @@ -161,7 +172,7 @@ class WsjtChopper(threading.Thread, metaclass=ABCMeta): except subprocess.TimeoutExpired: logger.warning("subprocess (pid=%i}) did not terminate correctly; sending kill signal.", decoder.pid) decoder.kill() - os.unlink(file) + os.unlink(job.file) def run(self) -> None: logger.debug("WSJT chopper starting up") @@ -203,20 +214,20 @@ class WsjtChopper(threading.Thread, metaclass=ABCMeta): class Ft8Chopper(WsjtChopper): - def __init__(self, source): + def __init__(self, dsp, source): self.interval = 15 self.fileTimestampFormat = "%y%m%d_%H%M%S" - super().__init__(source) + super().__init__(dsp, source) def decoder_commandline(self, file): return ["jt9", "--ft8", "-d", str(self.decoding_depth("ft8")), file] class WsprChopper(WsjtChopper): - def __init__(self, source): + def __init__(self, dsp, source): self.interval = 120 self.fileTimestampFormat = "%y%m%d_%H%M" - super().__init__(source) + super().__init__(dsp, source) def decoder_commandline(self, file): cmd = ["wsprd"] @@ -227,30 +238,30 @@ class WsprChopper(WsjtChopper): class Jt65Chopper(WsjtChopper): - def __init__(self, source): + def __init__(self, dsp, source): self.interval = 60 self.fileTimestampFormat = "%y%m%d_%H%M" - super().__init__(source) + super().__init__(dsp, source) def decoder_commandline(self, file): return ["jt9", "--jt65", "-d", str(self.decoding_depth("jt65")), file] class Jt9Chopper(WsjtChopper): - def __init__(self, source): + def __init__(self, dsp, source): self.interval = 60 self.fileTimestampFormat = "%y%m%d_%H%M" - super().__init__(source) + super().__init__(dsp, source) def decoder_commandline(self, file): return ["jt9", "--jt9", "-d", str(self.decoding_depth("jt9")), file] class Ft4Chopper(WsjtChopper): - def __init__(self, source): + def __init__(self, dsp, source): self.interval = 7.5 self.fileTimestampFormat = "%y%m%d_%H%M%S" - super().__init__(source) + super().__init__(dsp, source) def decoder_commandline(self, file): return ["jt9", "--ft4", "-d", str(self.decoding_depth("ft4")), file] @@ -261,7 +272,9 @@ class WsjtParser(Parser): def parse(self, data): try: - msg = data.decode().rstrip() + freq, raw_msg = data + self.setDialFrequency(freq) + msg = raw_msg.decode().rstrip() # known debug messages we know to skip if msg.startswith(""): return @@ -273,7 +286,7 @@ class WsjtParser(Parser): decoder = Jt9Decoder() else: decoder = WsprDecoder() - out = decoder.parse(msg, self.dial_freq) + out = decoder.parse(msg, freq) if "mode" in out: self.pushDecode(out["mode"]) if "callsign" in out and "locator" in out: