diff --git a/csdr/csdr.py b/csdr/csdr.py index ac4dee3..07feb01 100644 --- a/csdr/csdr.py +++ b/csdr/csdr.py @@ -29,8 +29,9 @@ import math from functools import partial from owrx.kiss import KissClient, DirewolfConfig -from owrx.wsjt import Ft8Chopper, WsprChopper, Jt9Chopper, Jt65Chopper, Ft4Chopper -from owrx.js8 import Js8Chopper +from owrx.wsjt import Ft8Profile, WsprProfile, Jt9Profile, Jt65Profile, Ft4Profile +from owrx.js8 import Js8NormalProfile +from owrx.audio import AudioChopper import logging @@ -450,23 +451,23 @@ class dsp(object): if self.isWsjtMode(): smd = self.get_secondary_demodulator() - chopper_cls = None + chopper_profile = None output_name = "wsjt_demod" if smd == "ft8": - chopper_cls = Ft8Chopper + chopper_profile = Ft8Profile() elif smd == "wspr": - chopper_cls = WsprChopper + chopper_profile = WsprProfile() elif smd == "jt65": - chopper_cls = Jt65Chopper + chopper_profile = Jt65Profile() elif smd == "jt9": - chopper_cls = Jt9Chopper + chopper_profile = Jt9Profile() elif smd == "ft4": - chopper_cls = Ft4Chopper + chopper_profile = Ft4Profile() elif smd == "js8": - chopper_cls = Js8Chopper + chopper_profile = Js8NormalProfile() output_name = "js8_demod" - if chopper_cls is not None: - chopper = chopper_cls(self, self.secondary_process_demod.stdout) + if chopper_profile is not None: + chopper = AudioChopper(self, self.secondary_process_demod.stdout, chopper_profile) chopper.start() self.output.send_output(output_name, chopper.read) elif self.isPacket(): diff --git a/owrx/audio.py b/owrx/audio.py new file mode 100644 index 0000000..90b22a5 --- /dev/null +++ b/owrx/audio.py @@ -0,0 +1,226 @@ +from abc import ABC, ABCMeta, abstractmethod +from owrx.config import Config +from owrx.metrics import Metrics, CounterMetric, DirectMetric +import threading +import wave +import subprocess +import os +from multiprocessing.connection import Pipe +from datetime import datetime, timedelta +from queue import Queue, Full + + +import logging + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + + +class QueueJob(object): + def __init__(self, decoder, file, freq): + self.decoder = decoder + self.file = file + self.freq = freq + + def run(self): + self.decoder.decode(self) + + +class QueueWorker(threading.Thread): + def __init__(self, queue): + self.queue = queue + self.doRun = True + super().__init__(daemon=True) + + def run(self) -> None: + while self.doRun: + job = self.queue.get() + try: + job.run() + except Exception: + logger.exception("failed to decode job") + self.queue.onError() + self.queue.task_done() + + +class DecoderQueue(Queue): + sharedInstance = None + creationLock = threading.Lock() + + @staticmethod + def getSharedInstance(): + with DecoderQueue.creationLock: + if DecoderQueue.sharedInstance is None: + pm = Config.get() + DecoderQueue.sharedInstance = DecoderQueue(maxsize=pm["wsjt_queue_length"], workers=pm["wsjt_queue_workers"]) + return DecoderQueue.sharedInstance + + def __init__(self, maxsize, workers): + super().__init__(maxsize) + metrics = Metrics.getSharedInstance() + metrics.addMetric("wsjt.queue.length", DirectMetric(self.qsize)) + self.inCounter = CounterMetric() + metrics.addMetric("wsjt.queue.in", self.inCounter) + self.outCounter = CounterMetric() + metrics.addMetric("wsjt.queue.out", self.outCounter) + self.overflowCounter = CounterMetric() + metrics.addMetric("wsjt.queue.overflow", self.overflowCounter) + self.errorCounter = CounterMetric() + metrics.addMetric("wsjt.queue.error", self.errorCounter) + self.workers = [self.newWorker() for _ in range(0, workers)] + + def put(self, item, **kwars): + self.inCounter.inc() + try: + super(DecoderQueue, self).put(item, block=False) + except Full: + self.overflowCounter.inc() + raise + + def get(self, **kwargs): + # super.get() is blocking, so it would mess up the stats to inc() first + out = super(DecoderQueue, self).get(**kwargs) + self.outCounter.inc() + return out + + def newWorker(self): + worker = QueueWorker(self) + worker.start() + return worker + + def onError(self): + self.errorCounter.inc() + + +class AudioChopperProfile(ABC): + @abstractmethod + def getInterval(self): + pass + + @abstractmethod + def getFileTimestampFormat(self): + pass + + @abstractmethod + def decoder_commandline(self, file): + pass + + def decoding_depth(self, mode): + pm = Config.get() + # mode-specific setting? + if "wsjt_decoding_depths" in pm and mode in pm["wsjt_decoding_depths"]: + return pm["wsjt_decoding_depths"][mode] + # return global default + if "wsjt_decoding_depth" in pm: + return pm["wsjt_decoding_depth"] + # default when no setting is provided + return 3 + + +class AudioChopper(threading.Thread, metaclass=ABCMeta): + def __init__(self, dsp, source, profile: AudioChopperProfile): + self.dsp = dsp + self.source = source + self.profile = profile + self.tmp_dir = Config.get()["temporary_directory"] + self.wavefile = None + self.wavefilename = None + self.switchingLock = threading.Lock() + self.timer = None + (self.outputReader, self.outputWriter) = Pipe() + self.doRun = True + super().__init__() + + def getWaveFile(self): + filename = "{tmp_dir}/openwebrx-audiochopper-{id}-{timestamp}.wav".format( + tmp_dir=self.tmp_dir, id=id(self), timestamp=datetime.utcnow().strftime(self.profile.getFileTimestampFormat()) + ) + wavefile = wave.open(filename, "wb") + wavefile.setnchannels(1) + wavefile.setsampwidth(2) + wavefile.setframerate(12000) + return filename, wavefile + + def getNextDecodingTime(self): + t = datetime.utcnow() + zeroed = t.replace(minute=0, second=0, microsecond=0) + delta = t - zeroed + interval = self.profile.getInterval() + seconds = (int(delta.total_seconds() / interval) + 1) * interval + t = zeroed + timedelta(seconds=seconds) + logger.debug("scheduling: {0}".format(t)) + return t + + def cancelTimer(self): + if self.timer: + self.timer.cancel() + self.timer = None + + def _scheduleNextSwitch(self): + self.cancelTimer() + if self.doRun: + delta = self.getNextDecodingTime() - datetime.utcnow() + self.timer = threading.Timer(delta.total_seconds(), self.switchFiles) + self.timer.start() + + def switchFiles(self): + self.switchingLock.acquire() + file = self.wavefile + filename = self.wavefilename + (self.wavefilename, self.wavefile) = self.getWaveFile() + self.switchingLock.release() + + file.close() + try: + DecoderQueue.getSharedInstance().put(QueueJob(self, filename, self.dsp.get_operating_freq())) + except Full: + logger.warning("wsjt decoding queue overflow; dropping one file") + os.unlink(filename) + self._scheduleNextSwitch() + + def decode(self, job: QueueJob): + logger.debug("processing file %s", job.file) + decoder = subprocess.Popen( + ["nice", "-n", "10"] + self.profile.decoder_commandline(job.file), + stdout=subprocess.PIPE, + cwd=self.tmp_dir, + close_fds=True, + ) + for line in decoder.stdout: + self.outputWriter.send((job.freq, line)) + try: + rc = decoder.wait(timeout=10) + if rc != 0: + logger.warning("decoder return code: %i", rc) + except subprocess.TimeoutExpired: + logger.warning("subprocess (pid=%i}) did not terminate correctly; sending kill signal.", decoder.pid) + decoder.kill() + os.unlink(job.file) + + def run(self) -> None: + logger.debug("WSJT chopper starting up") + (self.wavefilename, self.wavefile) = self.getWaveFile() + self._scheduleNextSwitch() + while self.doRun: + data = self.source.read(256) + if data is None or (isinstance(data, bytes) and len(data) == 0): + self.doRun = False + else: + self.switchingLock.acquire() + self.wavefile.writeframes(data) + self.switchingLock.release() + + logger.debug("WSJT chopper shutting down") + self.outputReader.close() + self.outputWriter.close() + self.cancelTimer() + try: + os.unlink(self.wavefilename) + except Exception: + logger.exception("error removing undecoded file") + + def read(self): + try: + return self.outputReader.recv() + except EOFError: + return None diff --git a/owrx/js8.py b/owrx/js8.py index 796e4eb..303a273 100644 --- a/owrx/js8.py +++ b/owrx/js8.py @@ -1,4 +1,4 @@ -from .wsjt import WsjtChopper +from .audio import AudioChopperProfile from .parser import Parser import re from js8py import Js8 @@ -12,7 +12,7 @@ import logging logger = logging.getLogger(__name__) -class Js8Chopper(WsjtChopper): +class Js8NormalProfile(AudioChopperProfile): def getInterval(self): return 15 diff --git a/owrx/wsjt.py b/owrx/wsjt.py index 982e475..475e3b3 100644 --- a/owrx/wsjt.py +++ b/owrx/wsjt.py @@ -1,17 +1,11 @@ -import threading -import wave -from datetime import datetime, timedelta, timezone -import subprocess -import os -from multiprocessing.connection import Pipe +from datetime import datetime, timezone from owrx.map import Map, LocatorLocation import re -from queue import Queue, Full -from owrx.config import Config -from owrx.metrics import Metrics, CounterMetric, DirectMetric +from owrx.metrics import Metrics, CounterMetric from owrx.pskreporter import PskReporter from owrx.parser import Parser -from abc import ABC, ABCMeta, abstractmethod +from owrx.audio import AudioChopperProfile +from abc import ABC, abstractmethod import logging @@ -19,210 +13,7 @@ logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) -class QueueJob(object): - def __init__(self, decoder, file, freq): - self.decoder = decoder - self.file = file - self.freq = freq - - def run(self): - self.decoder.decode(self) - - -class WsjtQueueWorker(threading.Thread): - def __init__(self, queue): - self.queue = queue - self.doRun = True - super().__init__(daemon=True) - - def run(self) -> None: - while self.doRun: - job = self.queue.get() - try: - job.run() - except Exception: - logger.exception("failed to decode job") - self.queue.onError() - self.queue.task_done() - - -class WsjtQueue(Queue): - sharedInstance = None - creationLock = threading.Lock() - - @staticmethod - def getSharedInstance(): - with WsjtQueue.creationLock: - if WsjtQueue.sharedInstance is None: - pm = Config.get() - WsjtQueue.sharedInstance = WsjtQueue(maxsize=pm["wsjt_queue_length"], workers=pm["wsjt_queue_workers"]) - return WsjtQueue.sharedInstance - - def __init__(self, maxsize, workers): - super().__init__(maxsize) - metrics = Metrics.getSharedInstance() - metrics.addMetric("wsjt.queue.length", DirectMetric(self.qsize)) - self.inCounter = CounterMetric() - metrics.addMetric("wsjt.queue.in", self.inCounter) - self.outCounter = CounterMetric() - metrics.addMetric("wsjt.queue.out", self.outCounter) - self.overflowCounter = CounterMetric() - metrics.addMetric("wsjt.queue.overflow", self.overflowCounter) - self.errorCounter = CounterMetric() - metrics.addMetric("wsjt.queue.error", self.errorCounter) - self.workers = [self.newWorker() for _ in range(0, workers)] - - def put(self, item): - self.inCounter.inc() - try: - super(WsjtQueue, self).put(item, block=False) - except Full: - self.overflowCounter.inc() - raise - - def get(self, **kwargs): - # super.get() is blocking, so it would mess up the stats to inc() first - out = super(WsjtQueue, self).get(**kwargs) - self.outCounter.inc() - return out - - def newWorker(self): - worker = WsjtQueueWorker(self) - worker.start() - return worker - - def onError(self): - self.errorCounter.inc() - - -class WsjtChopper(threading.Thread, metaclass=ABCMeta): - def __init__(self, dsp, source): - self.dsp = dsp - self.source = source - self.tmp_dir = Config.get()["temporary_directory"] - (self.wavefilename, self.wavefile) = self.getWaveFile() - self.switchingLock = threading.Lock() - self.timer = None - (self.outputReader, self.outputWriter) = Pipe() - self.doRun = True - super().__init__() - - @abstractmethod - def getInterval(self): - pass - - @abstractmethod - def getFileTimestampFormat(self): - pass - - def getWaveFile(self): - filename = "{tmp_dir}/openwebrx-wsjtchopper-{id}-{timestamp}.wav".format( - tmp_dir=self.tmp_dir, id=id(self), timestamp=datetime.utcnow().strftime(self.getFileTimestampFormat()) - ) - wavefile = wave.open(filename, "wb") - wavefile.setnchannels(1) - wavefile.setsampwidth(2) - wavefile.setframerate(12000) - return filename, wavefile - - def getNextDecodingTime(self): - t = datetime.utcnow() - zeroed = t.replace(minute=0, second=0, microsecond=0) - delta = t - zeroed - interval = self.getInterval() - seconds = (int(delta.total_seconds() / interval) + 1) * interval - t = zeroed + timedelta(seconds=seconds) - logger.debug("scheduling: {0}".format(t)) - return t - - def cancelTimer(self): - if self.timer: - self.timer.cancel() - - def _scheduleNextSwitch(self): - if self.doRun: - delta = self.getNextDecodingTime() - datetime.utcnow() - self.timer = threading.Timer(delta.total_seconds(), self.switchFiles) - self.timer.start() - - def switchFiles(self): - self.switchingLock.acquire() - file = self.wavefile - filename = self.wavefilename - (self.wavefilename, self.wavefile) = self.getWaveFile() - self.switchingLock.release() - - file.close() - try: - WsjtQueue.getSharedInstance().put(QueueJob(self, filename, self.dsp.get_operating_freq())) - except Full: - logger.warning("wsjt decoding queue overflow; dropping one file") - os.unlink(filename) - self._scheduleNextSwitch() - - @abstractmethod - def decoder_commandline(self, file): - pass - - def decode(self, job: QueueJob): - logger.debug("processing file %s", job.file) - decoder = subprocess.Popen( - ["nice", "-n", "10"] + self.decoder_commandline(job.file), - stdout=subprocess.PIPE, - cwd=self.tmp_dir, - close_fds=True, - ) - for line in decoder.stdout: - self.outputWriter.send((job.freq, line)) - try: - rc = decoder.wait(timeout=10) - if rc != 0: - logger.warning("decoder return code: %i", rc) - except subprocess.TimeoutExpired: - logger.warning("subprocess (pid=%i}) did not terminate correctly; sending kill signal.", decoder.pid) - decoder.kill() - os.unlink(job.file) - - def run(self) -> None: - logger.debug("WSJT chopper starting up") - self._scheduleNextSwitch() - while self.doRun: - data = self.source.read(256) - if data is None or (isinstance(data, bytes) and len(data) == 0): - self.doRun = False - else: - self.switchingLock.acquire() - self.wavefile.writeframes(data) - self.switchingLock.release() - - logger.debug("WSJT chopper shutting down") - self.outputReader.close() - self.outputWriter.close() - self.cancelTimer() - try: - os.unlink(self.wavefilename) - except Exception: - logger.exception("error removing undecoded file") - - def read(self): - try: - return self.outputReader.recv() - except EOFError: - return None - - def decoding_depth(self, mode): - pm = Config.get() - # mode-specific setting? - if "wsjt_decoding_depths" in pm and mode in pm["wsjt_decoding_depths"]: - return pm["wsjt_decoding_depths"][mode] - # return global default - if "wsjt_decoding_depth" in pm: - return pm["wsjt_decoding_depth"] - # default when no setting is provided - return 3 - - -class Ft8Chopper(WsjtChopper): +class Ft8Profile(AudioChopperProfile): def getInterval(self): return 15 @@ -233,7 +24,7 @@ class Ft8Chopper(WsjtChopper): return ["jt9", "--ft8", "-d", str(self.decoding_depth("ft8")), file] -class WsprChopper(WsjtChopper): +class WsprProfile(AudioChopperProfile): def getInterval(self): return 120 @@ -248,7 +39,7 @@ class WsprChopper(WsjtChopper): return cmd -class Jt65Chopper(WsjtChopper): +class Jt65Profile(AudioChopperProfile): def getInterval(self): return 60 @@ -259,7 +50,7 @@ class Jt65Chopper(WsjtChopper): return ["jt9", "--jt65", "-d", str(self.decoding_depth("jt65")), file] -class Jt9Chopper(WsjtChopper): +class Jt9Profile(AudioChopperProfile): def getInterval(self): return 60 @@ -270,7 +61,7 @@ class Jt9Chopper(WsjtChopper): return ["jt9", "--jt9", "-d", str(self.decoding_depth("jt9")), file] -class Ft4Chopper(WsjtChopper): +class Ft4Profile(AudioChopperProfile): def getInterval(self): return 7.5