multi-profile decoding

This commit is contained in:
Jakob Ketterl 2020-04-23 00:21:59 +02:00
parent 0120b33a25
commit 5ab2f02f63
3 changed files with 99 additions and 78 deletions

View File

@ -5,7 +5,7 @@ import threading
import wave import wave
import subprocess import subprocess
import os import os
from multiprocessing.connection import Pipe from multiprocessing.connection import Pipe, wait
from datetime import datetime, timedelta from datetime import datetime, timedelta
from queue import Queue, Full from queue import Queue, Full
@ -13,7 +13,7 @@ from queue import Queue, Full
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO) #logger.setLevel(logging.INFO)
class QueueJob(object): class QueueJob(object):
@ -117,7 +117,7 @@ class AudioChopperProfile(ABC):
return 3 return 3
class AudioChopper(threading.Thread, metaclass=ABCMeta): class AudioWriter(object):
def __init__(self, dsp, source, profile: AudioChopperProfile): def __init__(self, dsp, source, profile: AudioChopperProfile):
self.dsp = dsp self.dsp = dsp
self.source = source self.source = source
@ -128,12 +128,12 @@ class AudioChopper(threading.Thread, metaclass=ABCMeta):
self.switchingLock = threading.Lock() self.switchingLock = threading.Lock()
self.timer = None self.timer = None
(self.outputReader, self.outputWriter) = Pipe() (self.outputReader, self.outputWriter) = Pipe()
self.doRun = True
super().__init__()
def getWaveFile(self): def getWaveFile(self):
filename = "{tmp_dir}/openwebrx-audiochopper-{id}-{timestamp}.wav".format( filename = "{tmp_dir}/openwebrx-audiochopper-{id}-{timestamp}.wav".format(
tmp_dir=self.tmp_dir, id=id(self), timestamp=datetime.utcnow().strftime(self.profile.getFileTimestampFormat()) tmp_dir=self.tmp_dir,
id=id(self.profile),
timestamp=datetime.utcnow().strftime(self.profile.getFileTimestampFormat()),
) )
wavefile = wave.open(filename, "wb") wavefile = wave.open(filename, "wb")
wavefile.setnchannels(1) wavefile.setnchannels(1)
@ -158,10 +158,9 @@ class AudioChopper(threading.Thread, metaclass=ABCMeta):
def _scheduleNextSwitch(self): def _scheduleNextSwitch(self):
self.cancelTimer() self.cancelTimer()
if self.doRun: delta = self.getNextDecodingTime() - datetime.utcnow()
delta = self.getNextDecodingTime() - datetime.utcnow() self.timer = threading.Timer(delta.total_seconds(), self.switchFiles)
self.timer = threading.Timer(delta.total_seconds(), self.switchFiles) self.timer.start()
self.timer.start()
def switchFiles(self): def switchFiles(self):
self.switchingLock.acquire() self.switchingLock.acquire()
@ -197,20 +196,16 @@ class AudioChopper(threading.Thread, metaclass=ABCMeta):
decoder.kill() decoder.kill()
os.unlink(job.file) os.unlink(job.file)
def run(self) -> None: def start(self):
logger.debug("WSJT chopper starting up")
(self.wavefilename, self.wavefile) = self.getWaveFile() (self.wavefilename, self.wavefile) = self.getWaveFile()
self._scheduleNextSwitch() self._scheduleNextSwitch()
while self.doRun:
data = self.source.read(256)
if data is None or (isinstance(data, bytes) and len(data) == 0):
self.doRun = False
else:
self.switchingLock.acquire()
self.wavefile.writeframes(data)
self.switchingLock.release()
logger.debug("WSJT chopper shutting down") def write(self, data):
self.switchingLock.acquire()
self.wavefile.writeframes(data)
self.switchingLock.release()
def stop(self):
self.outputReader.close() self.outputReader.close()
self.outputWriter.close() self.outputWriter.close()
self.cancelTimer() self.cancelTimer()
@ -219,8 +214,33 @@ class AudioChopper(threading.Thread, metaclass=ABCMeta):
except Exception: except Exception:
logger.exception("error removing undecoded file") logger.exception("error removing undecoded file")
class AudioChopper(threading.Thread, metaclass=ABCMeta):
def __init__(self, dsp, source, *profiles: AudioChopperProfile):
self.source = source
self.writers = [AudioWriter(dsp, source, p) for p in profiles]
self.doRun = True
super().__init__()
def run(self) -> None:
logger.debug("Audio chopper starting up")
for w in self.writers:
w.start()
while self.doRun:
data = self.source.read(256)
if data is None or (isinstance(data, bytes) and len(data) == 0):
self.doRun = False
else:
for w in self.writers:
w.write(data)
logger.debug("Audio chopper shutting down")
for w in self.writers:
w.stop()
def read(self): def read(self):
try: try:
return self.outputReader.recv() readers = wait([w.outputReader for w in self.writers])
return [r.recv() for r in readers]
except EOFError: except EOFError:
return None return None

View File

@ -26,40 +26,41 @@ class Js8NormalProfile(AudioChopperProfile):
class Js8Parser(Parser): class Js8Parser(Parser):
decoderRegex = re.compile(" ?<Decode(Started|Debug|Finished)>") decoderRegex = re.compile(" ?<Decode(Started|Debug|Finished)>")
def parse(self, raw): def parse(self, messages):
try: for raw in messages:
freq, raw_msg = raw try:
self.setDialFrequency(freq) freq, raw_msg = raw
msg = raw_msg.decode().rstrip() self.setDialFrequency(freq)
if Js8Parser.decoderRegex.match(msg): msg = raw_msg.decode().rstrip()
return if Js8Parser.decoderRegex.match(msg):
if msg.startswith(" EOF on input file"): return
return if msg.startswith(" EOF on input file"):
return
logger.debug(msg) logger.debug(msg)
frame = Js8().parse_message(msg) frame = Js8().parse_message(msg)
self.handler.write_js8_message(frame, self.dial_freq) self.handler.write_js8_message(frame, self.dial_freq)
logger.debug(frame) logger.debug(frame)
self.pushDecode() self.pushDecode()
if (isinstance(frame, Js8FrameHeartbeat) or isinstance(frame, Js8FrameCompound)) and frame.grid: if (isinstance(frame, Js8FrameHeartbeat) or isinstance(frame, Js8FrameCompound)) and frame.grid:
Map.getSharedInstance().updateLocation( Map.getSharedInstance().updateLocation(
frame.callsign, LocatorLocation(frame.grid), "JS8", self.band frame.callsign, LocatorLocation(frame.grid), "JS8", self.band
) )
PskReporter.getSharedInstance().spot({ PskReporter.getSharedInstance().spot({
"callsign": frame.callsign, "callsign": frame.callsign,
"mode": "JS8", "mode": "JS8",
"locator": frame.grid, "locator": frame.grid,
"freq": self.dial_freq + frame.freq, "freq": self.dial_freq + frame.freq,
"db": frame.db, "db": frame.db,
"timestamp": frame.timestamp, "timestamp": frame.timestamp,
"msg": str(frame) "msg": str(frame)
}) })
except Exception: except Exception:
logger.exception("error while parsing js8 message") logger.exception("error while parsing js8 message")
def pushDecode(self): def pushDecode(self):
metrics = Metrics.getSharedInstance() metrics = Metrics.getSharedInstance()

View File

@ -10,7 +10,6 @@ from abc import ABC, abstractmethod
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class Ft8Profile(AudioChopperProfile): class Ft8Profile(AudioChopperProfile):
@ -75,34 +74,35 @@ class Ft4Profile(AudioChopperProfile):
class WsjtParser(Parser): class WsjtParser(Parser):
modes = {"~": "FT8", "#": "JT65", "@": "JT9", "+": "FT4"} modes = {"~": "FT8", "#": "JT65", "@": "JT9", "+": "FT4"}
def parse(self, data): def parse(self, messages):
try: for data in messages:
freq, raw_msg = data try:
self.setDialFrequency(freq) freq, raw_msg = data
msg = raw_msg.decode().rstrip() self.setDialFrequency(freq)
# known debug messages we know to skip msg = raw_msg.decode().rstrip()
if msg.startswith("<DecodeFinished>"): # known debug messages we know to skip
return if msg.startswith("<DecodeFinished>"):
if msg.startswith(" EOF on input file"): return
return if msg.startswith(" EOF on input file"):
return
modes = list(WsjtParser.modes.keys()) modes = list(WsjtParser.modes.keys())
if msg[21] in modes or msg[19] in modes: if msg[21] in modes or msg[19] in modes:
decoder = Jt9Decoder() decoder = Jt9Decoder()
else: else:
decoder = WsprDecoder() decoder = WsprDecoder()
out = decoder.parse(msg, freq) out = decoder.parse(msg, freq)
if "mode" in out: if "mode" in out:
self.pushDecode(out["mode"]) self.pushDecode(out["mode"])
if "callsign" in out and "locator" in out: if "callsign" in out and "locator" in out:
Map.getSharedInstance().updateLocation( Map.getSharedInstance().updateLocation(
out["callsign"], LocatorLocation(out["locator"]), out["mode"], self.band out["callsign"], LocatorLocation(out["locator"]), out["mode"], self.band
) )
PskReporter.getSharedInstance().spot(out) PskReporter.getSharedInstance().spot(out)
self.handler.write_wsjt_message(out) self.handler.write_wsjt_message(out)
except ValueError: except ValueError:
logger.exception("error while parsing wsjt message") logger.exception("error while parsing wsjt message")
def pushDecode(self, mode): def pushDecode(self, mode):
metrics = Metrics.getSharedInstance() metrics = Metrics.getSharedInstance()