pass the frequency along with the job, refs #22 #61

This commit is contained in:
Jakob Ketterl 2020-04-05 16:35:46 +02:00
parent 366def0235
commit d06e9151b9
4 changed files with 54 additions and 29 deletions

View File

@ -162,6 +162,7 @@ class dsp(object):
self.output_rate = 11025 self.output_rate = 11025
self.fft_size = 1024 self.fft_size = 1024
self.fft_fps = 5 self.fft_fps = 5
self.center_freq = 0
self.offset_freq = 0 self.offset_freq = 0
self.low_cut = -4000 self.low_cut = -4000
self.high_cut = 4000 self.high_cut = 4000
@ -448,18 +449,19 @@ class dsp(object):
if self.isWsjtMode(): if self.isWsjtMode():
smd = self.get_secondary_demodulator() smd = self.get_secondary_demodulator()
chopper = None chopper_cls = None
if smd == "ft8": if smd == "ft8":
chopper = Ft8Chopper(self.secondary_process_demod.stdout) chopper_cls = Ft8Chopper
elif smd == "wspr": elif smd == "wspr":
chopper = WsprChopper(self.secondary_process_demod.stdout) chopper_cls = WsprChopper
elif smd == "jt65": elif smd == "jt65":
chopper = Jt65Chopper(self.secondary_process_demod.stdout) chopper_cls = Jt65Chopper
elif smd == "jt9": elif smd == "jt9":
chopper = Jt9Chopper(self.secondary_process_demod.stdout) chopper_cls = Jt9Chopper
elif smd == "ft4": elif smd == "ft4":
chopper = Ft4Chopper(self.secondary_process_demod.stdout) chopper_cls = Ft4Chopper
if chopper is not None: if chopper_cls is not None:
chopper = chopper_cls(self, self.secondary_process_demod.stdout)
chopper.start() chopper.start()
self.output.send_output("wsjt_demod", chopper.read) self.output.send_output("wsjt_demod", chopper.read)
elif self.isPacket(): elif self.isPacket():
@ -627,6 +629,13 @@ class dsp(object):
with self.modification_lock: with self.modification_lock:
self.pipes["shift_pipe"].write("%g\n" % (-float(self.offset_freq) / self.samp_rate)) self.pipes["shift_pipe"].write("%g\n" % (-float(self.offset_freq) / self.samp_rate))
def set_center_freq(self, center_freq):
# dsp only needs to know this to be able to pass it to decoders in the form of get_operating_freq()
self.center_freq = center_freq
def get_operating_freq(self):
return self.center_freq + self.offset_freq
def set_bpf(self, low_cut, high_cut): def set_bpf(self, low_cut, high_cut):
self.low_cut = low_cut self.low_cut = low_cut
self.high_cut = high_cut self.high_cut = high_cut

View File

@ -77,6 +77,7 @@ class DspManager(csdr.output):
self.props.wireProperty("samp_rate", self.dsp.set_samp_rate), self.props.wireProperty("samp_rate", self.dsp.set_samp_rate),
self.props.wireProperty("output_rate", self.dsp.set_output_rate), self.props.wireProperty("output_rate", self.dsp.set_output_rate),
self.props.wireProperty("offset_freq", self.dsp.set_offset_freq), self.props.wireProperty("offset_freq", self.dsp.set_offset_freq),
self.props.wireProperty("center_freq", self.dsp.set_center_freq),
self.props.wireProperty("squelch_level", self.dsp.set_squelch_level), self.props.wireProperty("squelch_level", self.dsp.set_squelch_level),
self.props.wireProperty("low_cut", set_low_cut), self.props.wireProperty("low_cut", set_low_cut),
self.props.wireProperty("high_cut", set_high_cut), self.props.wireProperty("high_cut", set_high_cut),

View File

@ -262,7 +262,9 @@ class ServiceHandler(object):
output = WsjtServiceOutput(frequency) output = WsjtServiceOutput(frequency)
d = dsp(output) d = dsp(output)
d.nc_port = source.getPort() d.nc_port = source.getPort()
d.set_offset_freq(frequency - source.getProps()["center_freq"]) center_freq = source.getProps()["center_freq"]
d.set_offset_freq(frequency - center_freq)
d.set_center_freq(center_freq)
if mode == "packet": if mode == "packet":
d.set_demodulator("nfm") d.set_demodulator("nfm")
d.set_bpf(-4000, 4000) d.set_bpf(-4000, 4000)

View File

@ -19,6 +19,16 @@ logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
class QueueJob(object):
def __init__(self, decoder, file, freq):
self.decoder = decoder
self.file = file
self.freq = freq
def run(self):
self.decoder.decode(self)
class WsjtQueueWorker(threading.Thread): class WsjtQueueWorker(threading.Thread):
def __init__(self, queue): def __init__(self, queue):
self.queue = queue self.queue = queue
@ -27,10 +37,9 @@ class WsjtQueueWorker(threading.Thread):
def run(self) -> None: def run(self) -> None:
while self.doRun: while self.doRun:
(processor, file) = self.queue.get() job = self.queue.get()
try: try:
logger.debug("processing file %s", file) job.run()
processor.decode(file)
except Exception: except Exception:
logger.exception("failed to decode job") logger.exception("failed to decode job")
self.queue.onError() self.queue.onError()
@ -87,7 +96,8 @@ class WsjtQueue(Queue):
class WsjtChopper(threading.Thread, metaclass=ABCMeta): class WsjtChopper(threading.Thread, metaclass=ABCMeta):
def __init__(self, source): def __init__(self, dsp, source):
self.dsp = dsp
self.source = source self.source = source
self.tmp_dir = Config.get()["temporary_directory"] self.tmp_dir = Config.get()["temporary_directory"]
(self.wavefilename, self.wavefile) = self.getWaveFile() (self.wavefilename, self.wavefile) = self.getWaveFile()
@ -135,7 +145,7 @@ class WsjtChopper(threading.Thread, metaclass=ABCMeta):
file.close() file.close()
try: try:
WsjtQueue.getSharedInstance().put((self, filename)) WsjtQueue.getSharedInstance().put(QueueJob(self, filename, self.dsp.get_operating_freq()))
except Full: except Full:
logger.warning("wsjt decoding queue overflow; dropping one file") logger.warning("wsjt decoding queue overflow; dropping one file")
os.unlink(filename) os.unlink(filename)
@ -145,15 +155,16 @@ class WsjtChopper(threading.Thread, metaclass=ABCMeta):
def decoder_commandline(self, file): def decoder_commandline(self, file):
pass pass
def decode(self, file): def decode(self, job: QueueJob):
logger.debug("processing file %s", job.file)
decoder = subprocess.Popen( decoder = subprocess.Popen(
["nice", "-n", "10"] + self.decoder_commandline(file), ["nice", "-n", "10"] + self.decoder_commandline(job.file),
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
cwd=self.tmp_dir, cwd=self.tmp_dir,
close_fds=True, close_fds=True,
) )
for line in decoder.stdout: for line in decoder.stdout:
self.outputWriter.send(line) self.outputWriter.send((job.freq, line))
try: try:
rc = decoder.wait(timeout=10) rc = decoder.wait(timeout=10)
if rc != 0: if rc != 0:
@ -161,7 +172,7 @@ class WsjtChopper(threading.Thread, metaclass=ABCMeta):
except subprocess.TimeoutExpired: except subprocess.TimeoutExpired:
logger.warning("subprocess (pid=%i}) did not terminate correctly; sending kill signal.", decoder.pid) logger.warning("subprocess (pid=%i}) did not terminate correctly; sending kill signal.", decoder.pid)
decoder.kill() decoder.kill()
os.unlink(file) os.unlink(job.file)
def run(self) -> None: def run(self) -> None:
logger.debug("WSJT chopper starting up") logger.debug("WSJT chopper starting up")
@ -203,20 +214,20 @@ class WsjtChopper(threading.Thread, metaclass=ABCMeta):
class Ft8Chopper(WsjtChopper): class Ft8Chopper(WsjtChopper):
def __init__(self, source): def __init__(self, dsp, source):
self.interval = 15 self.interval = 15
self.fileTimestampFormat = "%y%m%d_%H%M%S" self.fileTimestampFormat = "%y%m%d_%H%M%S"
super().__init__(source) super().__init__(dsp, source)
def decoder_commandline(self, file): def decoder_commandline(self, file):
return ["jt9", "--ft8", "-d", str(self.decoding_depth("ft8")), file] return ["jt9", "--ft8", "-d", str(self.decoding_depth("ft8")), file]
class WsprChopper(WsjtChopper): class WsprChopper(WsjtChopper):
def __init__(self, source): def __init__(self, dsp, source):
self.interval = 120 self.interval = 120
self.fileTimestampFormat = "%y%m%d_%H%M" self.fileTimestampFormat = "%y%m%d_%H%M"
super().__init__(source) super().__init__(dsp, source)
def decoder_commandline(self, file): def decoder_commandline(self, file):
cmd = ["wsprd"] cmd = ["wsprd"]
@ -227,30 +238,30 @@ class WsprChopper(WsjtChopper):
class Jt65Chopper(WsjtChopper): class Jt65Chopper(WsjtChopper):
def __init__(self, source): def __init__(self, dsp, source):
self.interval = 60 self.interval = 60
self.fileTimestampFormat = "%y%m%d_%H%M" self.fileTimestampFormat = "%y%m%d_%H%M"
super().__init__(source) super().__init__(dsp, source)
def decoder_commandline(self, file): def decoder_commandline(self, file):
return ["jt9", "--jt65", "-d", str(self.decoding_depth("jt65")), file] return ["jt9", "--jt65", "-d", str(self.decoding_depth("jt65")), file]
class Jt9Chopper(WsjtChopper): class Jt9Chopper(WsjtChopper):
def __init__(self, source): def __init__(self, dsp, source):
self.interval = 60 self.interval = 60
self.fileTimestampFormat = "%y%m%d_%H%M" self.fileTimestampFormat = "%y%m%d_%H%M"
super().__init__(source) super().__init__(dsp, source)
def decoder_commandline(self, file): def decoder_commandline(self, file):
return ["jt9", "--jt9", "-d", str(self.decoding_depth("jt9")), file] return ["jt9", "--jt9", "-d", str(self.decoding_depth("jt9")), file]
class Ft4Chopper(WsjtChopper): class Ft4Chopper(WsjtChopper):
def __init__(self, source): def __init__(self, dsp, source):
self.interval = 7.5 self.interval = 7.5
self.fileTimestampFormat = "%y%m%d_%H%M%S" self.fileTimestampFormat = "%y%m%d_%H%M%S"
super().__init__(source) super().__init__(dsp, source)
def decoder_commandline(self, file): def decoder_commandline(self, file):
return ["jt9", "--ft4", "-d", str(self.decoding_depth("ft4")), file] return ["jt9", "--ft4", "-d", str(self.decoding_depth("ft4")), file]
@ -261,7 +272,9 @@ class WsjtParser(Parser):
def parse(self, data): def parse(self, data):
try: try:
msg = data.decode().rstrip() freq, raw_msg = data
self.setDialFrequency(freq)
msg = raw_msg.decode().rstrip()
# known debug messages we know to skip # known debug messages we know to skip
if msg.startswith("<DecodeFinished>"): if msg.startswith("<DecodeFinished>"):
return return
@ -273,7 +286,7 @@ class WsjtParser(Parser):
decoder = Jt9Decoder() decoder = Jt9Decoder()
else: else:
decoder = WsprDecoder() decoder = WsprDecoder()
out = decoder.parse(msg, self.dial_freq) out = decoder.parse(msg, freq)
if "mode" in out: if "mode" in out:
self.pushDecode(out["mode"]) self.pushDecode(out["mode"])
if "callsign" in out and "locator" in out: if "callsign" in out and "locator" in out: