clean up wsjt remainders in absctract code

This commit is contained in:
Jakob Ketterl 2020-04-25 16:22:40 +02:00
parent a828f61c72
commit 978eea400d
5 changed files with 46 additions and 32 deletions

View File

@ -285,16 +285,17 @@ google_maps_api_key = ""
# in seconds; default: 2 hours # in seconds; default: 2 hours
map_position_retention_time = 2 * 60 * 60 map_position_retention_time = 2 * 60 * 60
# wsjt decoder queue configuration # decoder queue configuration
# due to the nature of the wsjt operating modes (ft8, ft8, jt9, jt65 and wspr), the data is recorded for a given amount # due to the nature of some operating modes (ft8, ft8, jt9, jt65, wspr and js8), the data is recorded for a given amount
# of time (6.5 seconds up to 2 minutes) and decoded at the end. this can lead to very high peak loads. # of time (6 seconds up to 2 minutes) and decoded at the end. this can lead to very high peak loads.
# to mitigate this, the recordings will be queued and processed in sequence. # to mitigate this, the recordings will be queued and processed in sequence.
# the number of workers will limit the total amount of work (one worker will losely occupy one cpu / thread) # the number of workers will limit the total amount of work (one worker will losely occupy one cpu / thread)
wsjt_queue_workers = 2 decoding_queue_workers = 2
# the maximum queue length will cause decodes to be dumped if the workers cannot keep up # the maximum queue length will cause decodes to be dumped if the workers cannot keep up
# if you are running background services, make sure this number is high enough to accept the task influx during peaks # if you are running background services, make sure this number is high enough to accept the task influx during peaks
# i.e. this should be higher than the number of wsjt services running at the same time # i.e. this should be higher than the number of decoding services running at the same time
wsjt_queue_length = 10 decoding_queue_length = 10
# wsjt decoding depth will allow more results, but will also consume more cpu # wsjt decoding depth will allow more results, but will also consume more cpu
wsjt_decoding_depth = 3 wsjt_decoding_depth = 3
# can also be set for each mode separately # can also be set for each mode separately
@ -303,6 +304,8 @@ wsjt_decoding_depths = {"jt65": 1}
# JS8 comes in different speeds: normal, slow, fast, turbo. This setting controls which ones are enabled. # JS8 comes in different speeds: normal, slow, fast, turbo. This setting controls which ones are enabled.
js8_enabled_profiles = ["normal", "slow"] js8_enabled_profiles = ["normal", "slow"]
# JS8 decoding depth; higher value will get more results, but will also consume more cpu
js8_decoding_depth = 3
temporary_directory = "/tmp" temporary_directory = "/tmp"

View File

@ -52,21 +52,21 @@ class DecoderQueue(Queue):
with DecoderQueue.creationLock: with DecoderQueue.creationLock:
if DecoderQueue.sharedInstance is None: if DecoderQueue.sharedInstance is None:
pm = Config.get() pm = Config.get()
DecoderQueue.sharedInstance = DecoderQueue(maxsize=pm["wsjt_queue_length"], workers=pm["wsjt_queue_workers"]) DecoderQueue.sharedInstance = DecoderQueue(maxsize=pm["decoding_queue_length"], workers=pm["decoding_queue_workers"])
return DecoderQueue.sharedInstance return DecoderQueue.sharedInstance
def __init__(self, maxsize, workers): def __init__(self, maxsize, workers):
super().__init__(maxsize) super().__init__(maxsize)
metrics = Metrics.getSharedInstance() metrics = Metrics.getSharedInstance()
metrics.addMetric("wsjt.queue.length", DirectMetric(self.qsize)) metrics.addMetric("decoding.queue.length", DirectMetric(self.qsize))
self.inCounter = CounterMetric() self.inCounter = CounterMetric()
metrics.addMetric("wsjt.queue.in", self.inCounter) metrics.addMetric("decoding.queue.in", self.inCounter)
self.outCounter = CounterMetric() self.outCounter = CounterMetric()
metrics.addMetric("wsjt.queue.out", self.outCounter) metrics.addMetric("decoding.queue.out", self.outCounter)
self.overflowCounter = CounterMetric() self.overflowCounter = CounterMetric()
metrics.addMetric("wsjt.queue.overflow", self.overflowCounter) metrics.addMetric("decoding.queue.overflow", self.overflowCounter)
self.errorCounter = CounterMetric() self.errorCounter = CounterMetric()
metrics.addMetric("wsjt.queue.error", self.errorCounter) metrics.addMetric("decoding.queue.error", self.errorCounter)
self.workers = [self.newWorker() for _ in range(0, workers)] self.workers = [self.newWorker() for _ in range(0, workers)]
def put(self, item, **kwars): def put(self, item, **kwars):
@ -105,17 +105,6 @@ class AudioChopperProfile(ABC):
def decoder_commandline(self, file): def decoder_commandline(self, file):
pass pass
def decoding_depth(self, mode):
pm = Config.get()
# mode-specific setting?
if "wsjt_decoding_depths" in pm and mode in pm["wsjt_decoding_depths"]:
return pm["wsjt_decoding_depths"][mode]
# return global default
if "wsjt_decoding_depth" in pm:
return pm["wsjt_decoding_depth"]
# default when no setting is provided
return 3
class AudioWriter(object): class AudioWriter(object):
def __init__(self, dsp, source, profile: AudioChopperProfile): def __init__(self, dsp, source, profile: AudioChopperProfile):
@ -173,7 +162,7 @@ class AudioWriter(object):
try: try:
DecoderQueue.getSharedInstance().put(QueueJob(self, filename, self.dsp.get_operating_freq())) DecoderQueue.getSharedInstance().put(QueueJob(self, filename, self.dsp.get_operating_freq()))
except Full: except Full:
logger.warning("wsjt decoding queue overflow; dropping one file") logger.warning("decoding queue overflow; dropping one file")
os.unlink(filename) os.unlink(filename)
self._scheduleNextSwitch() self._scheduleNextSwitch()

View File

@ -146,8 +146,8 @@ class SettingsController(AdminController):
), ),
Section( Section(
"WSJT-X settings", "WSJT-X settings",
NumberInput("wsjt_queue_workers", "Number of WSJT decoding workers"), NumberInput("decoding_queue_workers", "Number of decoding workers"),
NumberInput("wsjt_queue_length", "Maximum length of WSJT job queue"), NumberInput("decoding_queue_length", "Maximum length of decoding job queue"),
NumberInput( NumberInput(
"wsjt_decoding_depth", "wsjt_decoding_depth",
"WSJT decoding depth", "WSJT decoding depth",

View File

@ -28,6 +28,14 @@ class Js8Profiles(object):
class Js8Profile(AudioChopperProfile, metaclass=ABCMeta): class Js8Profile(AudioChopperProfile, metaclass=ABCMeta):
def decoding_depth(self, mode):
pm = Config.get()
# return global default
if "js8_decoding_depth" in pm:
return pm["js8_decoding_depth"]
# default when no setting is provided
return 3
def getFileTimestampFormat(self): def getFileTimestampFormat(self):
return "%y%m%d_%H%M%S" return "%y%m%d_%H%M%S"

View File

@ -5,14 +5,28 @@ from owrx.metrics import Metrics, CounterMetric
from owrx.pskreporter import PskReporter from owrx.pskreporter import PskReporter
from owrx.parser import Parser from owrx.parser import Parser
from owrx.audio import AudioChopperProfile from owrx.audio import AudioChopperProfile
from abc import ABC, abstractmethod from abc import ABC, ABCMeta, abstractmethod
from owrx.config import Config
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class Ft8Profile(AudioChopperProfile): class WsjtProfile(AudioChopperProfile, metaclass=ABCMeta):
def decoding_depth(self, mode):
pm = Config.get()
# mode-specific setting?
if "wsjt_decoding_depths" in pm and mode in pm["wsjt_decoding_depths"]:
return pm["wsjt_decoding_depths"][mode]
# return global default
if "wsjt_decoding_depth" in pm:
return pm["wsjt_decoding_depth"]
# default when no setting is provided
return 3
class Ft8Profile(WsjtProfile):
def getInterval(self): def getInterval(self):
return 15 return 15
@ -23,7 +37,7 @@ class Ft8Profile(AudioChopperProfile):
return ["jt9", "--ft8", "-d", str(self.decoding_depth("ft8")), file] return ["jt9", "--ft8", "-d", str(self.decoding_depth("ft8")), file]
class WsprProfile(AudioChopperProfile): class WsprProfile(WsjtProfile):
def getInterval(self): def getInterval(self):
return 120 return 120
@ -38,7 +52,7 @@ class WsprProfile(AudioChopperProfile):
return cmd return cmd
class Jt65Profile(AudioChopperProfile): class Jt65Profile(WsjtProfile):
def getInterval(self): def getInterval(self):
return 60 return 60
@ -49,7 +63,7 @@ class Jt65Profile(AudioChopperProfile):
return ["jt9", "--jt65", "-d", str(self.decoding_depth("jt65")), file] return ["jt9", "--jt65", "-d", str(self.decoding_depth("jt65")), file]
class Jt9Profile(AudioChopperProfile): class Jt9Profile(WsjtProfile):
def getInterval(self): def getInterval(self):
return 60 return 60
@ -60,7 +74,7 @@ class Jt9Profile(AudioChopperProfile):
return ["jt9", "--jt9", "-d", str(self.decoding_depth("jt9")), file] return ["jt9", "--jt9", "-d", str(self.decoding_depth("jt9")), file]
class Ft4Profile(AudioChopperProfile): class Ft4Profile(WsjtProfile):
def getInterval(self): def getInterval(self):
return 7.5 return 7.5