refactor chopper out of wsjt

This commit is contained in:
Jakob Ketterl 2020-04-22 23:53:19 +02:00
parent 9622cd6a2a
commit 0120b33a25
4 changed files with 249 additions and 231 deletions

View File

@ -29,8 +29,9 @@ import math
from functools import partial from functools import partial
from owrx.kiss import KissClient, DirewolfConfig from owrx.kiss import KissClient, DirewolfConfig
from owrx.wsjt import Ft8Chopper, WsprChopper, Jt9Chopper, Jt65Chopper, Ft4Chopper from owrx.wsjt import Ft8Profile, WsprProfile, Jt9Profile, Jt65Profile, Ft4Profile
from owrx.js8 import Js8Chopper from owrx.js8 import Js8NormalProfile
from owrx.audio import AudioChopper
import logging import logging
@ -450,23 +451,23 @@ class dsp(object):
if self.isWsjtMode(): if self.isWsjtMode():
smd = self.get_secondary_demodulator() smd = self.get_secondary_demodulator()
chopper_cls = None chopper_profile = None
output_name = "wsjt_demod" output_name = "wsjt_demod"
if smd == "ft8": if smd == "ft8":
chopper_cls = Ft8Chopper chopper_profile = Ft8Profile()
elif smd == "wspr": elif smd == "wspr":
chopper_cls = WsprChopper chopper_profile = WsprProfile()
elif smd == "jt65": elif smd == "jt65":
chopper_cls = Jt65Chopper chopper_profile = Jt65Profile()
elif smd == "jt9": elif smd == "jt9":
chopper_cls = Jt9Chopper chopper_profile = Jt9Profile()
elif smd == "ft4": elif smd == "ft4":
chopper_cls = Ft4Chopper chopper_profile = Ft4Profile()
elif smd == "js8": elif smd == "js8":
chopper_cls = Js8Chopper chopper_profile = Js8NormalProfile()
output_name = "js8_demod" output_name = "js8_demod"
if chopper_cls is not None: if chopper_profile is not None:
chopper = chopper_cls(self, self.secondary_process_demod.stdout) chopper = AudioChopper(self, self.secondary_process_demod.stdout, chopper_profile)
chopper.start() chopper.start()
self.output.send_output(output_name, chopper.read) self.output.send_output(output_name, chopper.read)
elif self.isPacket(): elif self.isPacket():

226
owrx/audio.py Normal file
View File

@ -0,0 +1,226 @@
from abc import ABC, ABCMeta, abstractmethod
from owrx.config import Config
from owrx.metrics import Metrics, CounterMetric, DirectMetric
import threading
import wave
import subprocess
import os
from multiprocessing.connection import Pipe
from datetime import datetime, timedelta
from queue import Queue, Full
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class QueueJob(object):
def __init__(self, decoder, file, freq):
self.decoder = decoder
self.file = file
self.freq = freq
def run(self):
self.decoder.decode(self)
class QueueWorker(threading.Thread):
def __init__(self, queue):
self.queue = queue
self.doRun = True
super().__init__(daemon=True)
def run(self) -> None:
while self.doRun:
job = self.queue.get()
try:
job.run()
except Exception:
logger.exception("failed to decode job")
self.queue.onError()
self.queue.task_done()
class DecoderQueue(Queue):
sharedInstance = None
creationLock = threading.Lock()
@staticmethod
def getSharedInstance():
with DecoderQueue.creationLock:
if DecoderQueue.sharedInstance is None:
pm = Config.get()
DecoderQueue.sharedInstance = DecoderQueue(maxsize=pm["wsjt_queue_length"], workers=pm["wsjt_queue_workers"])
return DecoderQueue.sharedInstance
def __init__(self, maxsize, workers):
super().__init__(maxsize)
metrics = Metrics.getSharedInstance()
metrics.addMetric("wsjt.queue.length", DirectMetric(self.qsize))
self.inCounter = CounterMetric()
metrics.addMetric("wsjt.queue.in", self.inCounter)
self.outCounter = CounterMetric()
metrics.addMetric("wsjt.queue.out", self.outCounter)
self.overflowCounter = CounterMetric()
metrics.addMetric("wsjt.queue.overflow", self.overflowCounter)
self.errorCounter = CounterMetric()
metrics.addMetric("wsjt.queue.error", self.errorCounter)
self.workers = [self.newWorker() for _ in range(0, workers)]
def put(self, item, **kwars):
self.inCounter.inc()
try:
super(DecoderQueue, self).put(item, block=False)
except Full:
self.overflowCounter.inc()
raise
def get(self, **kwargs):
# super.get() is blocking, so it would mess up the stats to inc() first
out = super(DecoderQueue, self).get(**kwargs)
self.outCounter.inc()
return out
def newWorker(self):
worker = QueueWorker(self)
worker.start()
return worker
def onError(self):
self.errorCounter.inc()
class AudioChopperProfile(ABC):
@abstractmethod
def getInterval(self):
pass
@abstractmethod
def getFileTimestampFormat(self):
pass
@abstractmethod
def decoder_commandline(self, file):
pass
def decoding_depth(self, mode):
pm = Config.get()
# mode-specific setting?
if "wsjt_decoding_depths" in pm and mode in pm["wsjt_decoding_depths"]:
return pm["wsjt_decoding_depths"][mode]
# return global default
if "wsjt_decoding_depth" in pm:
return pm["wsjt_decoding_depth"]
# default when no setting is provided
return 3
class AudioChopper(threading.Thread, metaclass=ABCMeta):
def __init__(self, dsp, source, profile: AudioChopperProfile):
self.dsp = dsp
self.source = source
self.profile = profile
self.tmp_dir = Config.get()["temporary_directory"]
self.wavefile = None
self.wavefilename = None
self.switchingLock = threading.Lock()
self.timer = None
(self.outputReader, self.outputWriter) = Pipe()
self.doRun = True
super().__init__()
def getWaveFile(self):
filename = "{tmp_dir}/openwebrx-audiochopper-{id}-{timestamp}.wav".format(
tmp_dir=self.tmp_dir, id=id(self), timestamp=datetime.utcnow().strftime(self.profile.getFileTimestampFormat())
)
wavefile = wave.open(filename, "wb")
wavefile.setnchannels(1)
wavefile.setsampwidth(2)
wavefile.setframerate(12000)
return filename, wavefile
def getNextDecodingTime(self):
t = datetime.utcnow()
zeroed = t.replace(minute=0, second=0, microsecond=0)
delta = t - zeroed
interval = self.profile.getInterval()
seconds = (int(delta.total_seconds() / interval) + 1) * interval
t = zeroed + timedelta(seconds=seconds)
logger.debug("scheduling: {0}".format(t))
return t
def cancelTimer(self):
if self.timer:
self.timer.cancel()
self.timer = None
def _scheduleNextSwitch(self):
self.cancelTimer()
if self.doRun:
delta = self.getNextDecodingTime() - datetime.utcnow()
self.timer = threading.Timer(delta.total_seconds(), self.switchFiles)
self.timer.start()
def switchFiles(self):
self.switchingLock.acquire()
file = self.wavefile
filename = self.wavefilename
(self.wavefilename, self.wavefile) = self.getWaveFile()
self.switchingLock.release()
file.close()
try:
DecoderQueue.getSharedInstance().put(QueueJob(self, filename, self.dsp.get_operating_freq()))
except Full:
logger.warning("wsjt decoding queue overflow; dropping one file")
os.unlink(filename)
self._scheduleNextSwitch()
def decode(self, job: QueueJob):
logger.debug("processing file %s", job.file)
decoder = subprocess.Popen(
["nice", "-n", "10"] + self.profile.decoder_commandline(job.file),
stdout=subprocess.PIPE,
cwd=self.tmp_dir,
close_fds=True,
)
for line in decoder.stdout:
self.outputWriter.send((job.freq, line))
try:
rc = decoder.wait(timeout=10)
if rc != 0:
logger.warning("decoder return code: %i", rc)
except subprocess.TimeoutExpired:
logger.warning("subprocess (pid=%i}) did not terminate correctly; sending kill signal.", decoder.pid)
decoder.kill()
os.unlink(job.file)
def run(self) -> None:
logger.debug("WSJT chopper starting up")
(self.wavefilename, self.wavefile) = self.getWaveFile()
self._scheduleNextSwitch()
while self.doRun:
data = self.source.read(256)
if data is None or (isinstance(data, bytes) and len(data) == 0):
self.doRun = False
else:
self.switchingLock.acquire()
self.wavefile.writeframes(data)
self.switchingLock.release()
logger.debug("WSJT chopper shutting down")
self.outputReader.close()
self.outputWriter.close()
self.cancelTimer()
try:
os.unlink(self.wavefilename)
except Exception:
logger.exception("error removing undecoded file")
def read(self):
try:
return self.outputReader.recv()
except EOFError:
return None

View File

@ -1,4 +1,4 @@
from .wsjt import WsjtChopper from .audio import AudioChopperProfile
from .parser import Parser from .parser import Parser
import re import re
from js8py import Js8 from js8py import Js8
@ -12,7 +12,7 @@ import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class Js8Chopper(WsjtChopper): class Js8NormalProfile(AudioChopperProfile):
def getInterval(self): def getInterval(self):
return 15 return 15

View File

@ -1,17 +1,11 @@
import threading from datetime import datetime, timezone
import wave
from datetime import datetime, timedelta, timezone
import subprocess
import os
from multiprocessing.connection import Pipe
from owrx.map import Map, LocatorLocation from owrx.map import Map, LocatorLocation
import re import re
from queue import Queue, Full from owrx.metrics import Metrics, CounterMetric
from owrx.config import Config
from owrx.metrics import Metrics, CounterMetric, DirectMetric
from owrx.pskreporter import PskReporter from owrx.pskreporter import PskReporter
from owrx.parser import Parser from owrx.parser import Parser
from abc import ABC, ABCMeta, abstractmethod from owrx.audio import AudioChopperProfile
from abc import ABC, abstractmethod
import logging import logging
@ -19,210 +13,7 @@ logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
class QueueJob(object): class Ft8Profile(AudioChopperProfile):
def __init__(self, decoder, file, freq):
self.decoder = decoder
self.file = file
self.freq = freq
def run(self):
self.decoder.decode(self)
class WsjtQueueWorker(threading.Thread):
def __init__(self, queue):
self.queue = queue
self.doRun = True
super().__init__(daemon=True)
def run(self) -> None:
while self.doRun:
job = self.queue.get()
try:
job.run()
except Exception:
logger.exception("failed to decode job")
self.queue.onError()
self.queue.task_done()
class WsjtQueue(Queue):
sharedInstance = None
creationLock = threading.Lock()
@staticmethod
def getSharedInstance():
with WsjtQueue.creationLock:
if WsjtQueue.sharedInstance is None:
pm = Config.get()
WsjtQueue.sharedInstance = WsjtQueue(maxsize=pm["wsjt_queue_length"], workers=pm["wsjt_queue_workers"])
return WsjtQueue.sharedInstance
def __init__(self, maxsize, workers):
super().__init__(maxsize)
metrics = Metrics.getSharedInstance()
metrics.addMetric("wsjt.queue.length", DirectMetric(self.qsize))
self.inCounter = CounterMetric()
metrics.addMetric("wsjt.queue.in", self.inCounter)
self.outCounter = CounterMetric()
metrics.addMetric("wsjt.queue.out", self.outCounter)
self.overflowCounter = CounterMetric()
metrics.addMetric("wsjt.queue.overflow", self.overflowCounter)
self.errorCounter = CounterMetric()
metrics.addMetric("wsjt.queue.error", self.errorCounter)
self.workers = [self.newWorker() for _ in range(0, workers)]
def put(self, item):
self.inCounter.inc()
try:
super(WsjtQueue, self).put(item, block=False)
except Full:
self.overflowCounter.inc()
raise
def get(self, **kwargs):
# super.get() is blocking, so it would mess up the stats to inc() first
out = super(WsjtQueue, self).get(**kwargs)
self.outCounter.inc()
return out
def newWorker(self):
worker = WsjtQueueWorker(self)
worker.start()
return worker
def onError(self):
self.errorCounter.inc()
class WsjtChopper(threading.Thread, metaclass=ABCMeta):
def __init__(self, dsp, source):
self.dsp = dsp
self.source = source
self.tmp_dir = Config.get()["temporary_directory"]
(self.wavefilename, self.wavefile) = self.getWaveFile()
self.switchingLock = threading.Lock()
self.timer = None
(self.outputReader, self.outputWriter) = Pipe()
self.doRun = True
super().__init__()
@abstractmethod
def getInterval(self):
pass
@abstractmethod
def getFileTimestampFormat(self):
pass
def getWaveFile(self):
filename = "{tmp_dir}/openwebrx-wsjtchopper-{id}-{timestamp}.wav".format(
tmp_dir=self.tmp_dir, id=id(self), timestamp=datetime.utcnow().strftime(self.getFileTimestampFormat())
)
wavefile = wave.open(filename, "wb")
wavefile.setnchannels(1)
wavefile.setsampwidth(2)
wavefile.setframerate(12000)
return filename, wavefile
def getNextDecodingTime(self):
t = datetime.utcnow()
zeroed = t.replace(minute=0, second=0, microsecond=0)
delta = t - zeroed
interval = self.getInterval()
seconds = (int(delta.total_seconds() / interval) + 1) * interval
t = zeroed + timedelta(seconds=seconds)
logger.debug("scheduling: {0}".format(t))
return t
def cancelTimer(self):
if self.timer:
self.timer.cancel()
def _scheduleNextSwitch(self):
if self.doRun:
delta = self.getNextDecodingTime() - datetime.utcnow()
self.timer = threading.Timer(delta.total_seconds(), self.switchFiles)
self.timer.start()
def switchFiles(self):
self.switchingLock.acquire()
file = self.wavefile
filename = self.wavefilename
(self.wavefilename, self.wavefile) = self.getWaveFile()
self.switchingLock.release()
file.close()
try:
WsjtQueue.getSharedInstance().put(QueueJob(self, filename, self.dsp.get_operating_freq()))
except Full:
logger.warning("wsjt decoding queue overflow; dropping one file")
os.unlink(filename)
self._scheduleNextSwitch()
@abstractmethod
def decoder_commandline(self, file):
pass
def decode(self, job: QueueJob):
logger.debug("processing file %s", job.file)
decoder = subprocess.Popen(
["nice", "-n", "10"] + self.decoder_commandline(job.file),
stdout=subprocess.PIPE,
cwd=self.tmp_dir,
close_fds=True,
)
for line in decoder.stdout:
self.outputWriter.send((job.freq, line))
try:
rc = decoder.wait(timeout=10)
if rc != 0:
logger.warning("decoder return code: %i", rc)
except subprocess.TimeoutExpired:
logger.warning("subprocess (pid=%i}) did not terminate correctly; sending kill signal.", decoder.pid)
decoder.kill()
os.unlink(job.file)
def run(self) -> None:
logger.debug("WSJT chopper starting up")
self._scheduleNextSwitch()
while self.doRun:
data = self.source.read(256)
if data is None or (isinstance(data, bytes) and len(data) == 0):
self.doRun = False
else:
self.switchingLock.acquire()
self.wavefile.writeframes(data)
self.switchingLock.release()
logger.debug("WSJT chopper shutting down")
self.outputReader.close()
self.outputWriter.close()
self.cancelTimer()
try:
os.unlink(self.wavefilename)
except Exception:
logger.exception("error removing undecoded file")
def read(self):
try:
return self.outputReader.recv()
except EOFError:
return None
def decoding_depth(self, mode):
pm = Config.get()
# mode-specific setting?
if "wsjt_decoding_depths" in pm and mode in pm["wsjt_decoding_depths"]:
return pm["wsjt_decoding_depths"][mode]
# return global default
if "wsjt_decoding_depth" in pm:
return pm["wsjt_decoding_depth"]
# default when no setting is provided
return 3
class Ft8Chopper(WsjtChopper):
def getInterval(self): def getInterval(self):
return 15 return 15
@ -233,7 +24,7 @@ class Ft8Chopper(WsjtChopper):
return ["jt9", "--ft8", "-d", str(self.decoding_depth("ft8")), file] return ["jt9", "--ft8", "-d", str(self.decoding_depth("ft8")), file]
class WsprChopper(WsjtChopper): class WsprProfile(AudioChopperProfile):
def getInterval(self): def getInterval(self):
return 120 return 120
@ -248,7 +39,7 @@ class WsprChopper(WsjtChopper):
return cmd return cmd
class Jt65Chopper(WsjtChopper): class Jt65Profile(AudioChopperProfile):
def getInterval(self): def getInterval(self):
return 60 return 60
@ -259,7 +50,7 @@ class Jt65Chopper(WsjtChopper):
return ["jt9", "--jt65", "-d", str(self.decoding_depth("jt65")), file] return ["jt9", "--jt65", "-d", str(self.decoding_depth("jt65")), file]
class Jt9Chopper(WsjtChopper): class Jt9Profile(AudioChopperProfile):
def getInterval(self): def getInterval(self):
return 60 return 60
@ -270,7 +61,7 @@ class Jt9Chopper(WsjtChopper):
return ["jt9", "--jt9", "-d", str(self.decoding_depth("jt9")), file] return ["jt9", "--jt9", "-d", str(self.decoding_depth("jt9")), file]
class Ft4Chopper(WsjtChopper): class Ft4Profile(AudioChopperProfile):
def getInterval(self): def getInterval(self):
return 7.5 return 7.5