Merge branch 'develop' into sdrplay_v3

This commit is contained in:
Jakob Ketterl
2020-05-24 14:05:36 +02:00
76 changed files with 2885 additions and 1609 deletions

View File

@ -1,3 +1,8 @@
import logging
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
from http.server import HTTPServer
from owrx.http import RequestHandler
from owrx.config import Config
@ -10,11 +15,6 @@ from owrx.websocket import WebSocketConnection
from owrx.pskreporter import PskReporter
from owrx.version import openwebrx_version
import logging
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
class ThreadedHttpServer(ThreadingMixIn, HTTPServer):
pass

244
owrx/audio.py Normal file
View File

@ -0,0 +1,244 @@
from abc import ABC, ABCMeta, abstractmethod
from owrx.config import Config
from owrx.metrics import Metrics, CounterMetric, DirectMetric
import threading
import wave
import subprocess
import os
from multiprocessing.connection import Pipe, wait
from datetime import datetime, timedelta
from queue import Queue, Full
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class QueueJob(object):
def __init__(self, decoder, file, freq):
self.decoder = decoder
self.file = file
self.freq = freq
def run(self):
self.decoder.decode(self)
def unlink(self):
try:
os.unlink(self.file)
except FileNotFoundError:
pass
class QueueWorker(threading.Thread):
def __init__(self, queue):
self.queue = queue
self.doRun = True
super().__init__(daemon=True)
def run(self) -> None:
while self.doRun:
job = self.queue.get()
try:
job.run()
except Exception:
logger.exception("failed to decode job")
self.queue.onError()
finally:
job.unlink()
self.queue.task_done()
class DecoderQueue(Queue):
sharedInstance = None
creationLock = threading.Lock()
@staticmethod
def getSharedInstance():
with DecoderQueue.creationLock:
if DecoderQueue.sharedInstance is None:
pm = Config.get()
DecoderQueue.sharedInstance = DecoderQueue(maxsize=pm["decoding_queue_length"], workers=pm["decoding_queue_workers"])
return DecoderQueue.sharedInstance
def __init__(self, maxsize, workers):
super().__init__(maxsize)
metrics = Metrics.getSharedInstance()
metrics.addMetric("decoding.queue.length", DirectMetric(self.qsize))
self.inCounter = CounterMetric()
metrics.addMetric("decoding.queue.in", self.inCounter)
self.outCounter = CounterMetric()
metrics.addMetric("decoding.queue.out", self.outCounter)
self.overflowCounter = CounterMetric()
metrics.addMetric("decoding.queue.overflow", self.overflowCounter)
self.errorCounter = CounterMetric()
metrics.addMetric("decoding.queue.error", self.errorCounter)
self.workers = [self.newWorker() for _ in range(0, workers)]
def put(self, item, **kwars):
self.inCounter.inc()
try:
super(DecoderQueue, self).put(item, block=False)
except Full:
self.overflowCounter.inc()
raise
def get(self, **kwargs):
# super.get() is blocking, so it would mess up the stats to inc() first
out = super(DecoderQueue, self).get(**kwargs)
self.outCounter.inc()
return out
def newWorker(self):
worker = QueueWorker(self)
worker.start()
return worker
def onError(self):
self.errorCounter.inc()
class AudioChopperProfile(ABC):
@abstractmethod
def getInterval(self):
pass
@abstractmethod
def getFileTimestampFormat(self):
pass
@abstractmethod
def decoder_commandline(self, file):
pass
class AudioWriter(object):
def __init__(self, dsp, source, profile: AudioChopperProfile):
self.dsp = dsp
self.source = source
self.profile = profile
self.tmp_dir = Config.get()["temporary_directory"]
self.wavefile = None
self.wavefilename = None
self.switchingLock = threading.Lock()
self.timer = None
(self.outputReader, self.outputWriter) = Pipe()
def getWaveFile(self):
filename = "{tmp_dir}/openwebrx-audiochopper-{id}-{timestamp}.wav".format(
tmp_dir=self.tmp_dir,
id=id(self),
timestamp=datetime.utcnow().strftime(self.profile.getFileTimestampFormat()),
)
wavefile = wave.open(filename, "wb")
wavefile.setnchannels(1)
wavefile.setsampwidth(2)
wavefile.setframerate(12000)
return filename, wavefile
def getNextDecodingTime(self):
t = datetime.utcnow()
zeroed = t.replace(minute=0, second=0, microsecond=0)
delta = t - zeroed
interval = self.profile.getInterval()
seconds = (int(delta.total_seconds() / interval) + 1) * interval
t = zeroed + timedelta(seconds=seconds)
logger.debug("scheduling: {0}".format(t))
return t
def cancelTimer(self):
if self.timer:
self.timer.cancel()
self.timer = None
def _scheduleNextSwitch(self):
self.cancelTimer()
delta = self.getNextDecodingTime() - datetime.utcnow()
self.timer = threading.Timer(delta.total_seconds(), self.switchFiles)
self.timer.start()
def switchFiles(self):
self.switchingLock.acquire()
file = self.wavefile
filename = self.wavefilename
(self.wavefilename, self.wavefile) = self.getWaveFile()
self.switchingLock.release()
file.close()
job = QueueJob(self, filename, self.dsp.get_operating_freq())
try:
DecoderQueue.getSharedInstance().put(job)
except Full:
logger.warning("decoding queue overflow; dropping one file")
job.unlink()
self._scheduleNextSwitch()
def decode(self, job: QueueJob):
logger.debug("processing file %s", job.file)
decoder = subprocess.Popen(
["nice", "-n", "10"] + self.profile.decoder_commandline(job.file),
stdout=subprocess.PIPE,
cwd=self.tmp_dir,
close_fds=True,
)
for line in decoder.stdout:
self.outputWriter.send((job.freq, line))
try:
rc = decoder.wait(timeout=10)
if rc != 0:
logger.warning("decoder return code: %i", rc)
except subprocess.TimeoutExpired:
logger.warning("subprocess (pid=%i}) did not terminate correctly; sending kill signal.", decoder.pid)
decoder.kill()
def start(self):
(self.wavefilename, self.wavefile) = self.getWaveFile()
self._scheduleNextSwitch()
def write(self, data):
self.switchingLock.acquire()
self.wavefile.writeframes(data)
self.switchingLock.release()
def stop(self):
self.outputReader.close()
self.outputWriter.close()
self.cancelTimer()
try:
os.unlink(self.wavefilename)
except Exception:
logger.exception("error removing undecoded file")
class AudioChopper(threading.Thread, metaclass=ABCMeta):
def __init__(self, dsp, source, *profiles: AudioChopperProfile):
self.source = source
self.writers = [AudioWriter(dsp, source, p) for p in profiles]
self.doRun = True
super().__init__()
def run(self) -> None:
logger.debug("Audio chopper starting up")
for w in self.writers:
w.start()
while self.doRun:
data = self.source.read(256)
if data is None or (isinstance(data, bytes) and len(data) == 0):
self.doRun = False
else:
for w in self.writers:
w.write(data)
logger.debug("Audio chopper shutting down")
for w in self.writers:
w.stop()
def read(self):
try:
readers = wait([w.outputReader for w in self.writers])
return [r.recv() for r in readers]
except EOFError:
return None

View File

@ -26,6 +26,11 @@ class ConfigMigrator(ABC):
def migrate(self, config):
pass
def renameKey(self, config, old, new):
if old in config and not new in config:
config[new] = config[old]
del config[old]
class ConfigMigratorVersion1(ConfigMigrator):
def migrate(self, config):
@ -37,6 +42,9 @@ class ConfigMigratorVersion1(ConfigMigrator):
levels = config["waterfall_auto_level_margin"]
config["waterfall_auto_level_margin"] = {"min": levels[0], "max": levels[1]}
self.renameKey(config, "wsjt_queue_workers", "decoding_queue_workers")
self.renameKey(config, "wsjt_queue_length", "decoding_queue_length")
config["version"] = 2
return config

View File

@ -1,4 +1,5 @@
from owrx.config import Config
from owrx.details import ReceiverDetails
from owrx.dsp import DspManager
from owrx.cpu import CpuUsageThread
from owrx.sdr import SdrService
@ -9,10 +10,12 @@ from owrx.version import openwebrx_version
from owrx.bands import Bandplan
from owrx.bookmarks import Bookmarks
from owrx.map import Map
from owrx.locator import Locator
from owrx.property import PropertyStack
from owrx.modes import Modes, DigitalMode
from multiprocessing import Queue
from queue import Full
from js8py import Js8Frame
from abc import ABC, ABCMeta, abstractmethod
import json
import threading
@ -21,7 +24,7 @@ import logging
logger = logging.getLogger(__name__)
class Client(object):
class Client(ABC):
def __init__(self, conn):
self.conn = conn
self.multiprocessingPipe = Queue(100)
@ -50,6 +53,7 @@ class Client(object):
except Full:
self.close()
@abstractmethod
def handleTextMessage(self, conn, message):
pass
@ -60,7 +64,25 @@ class Client(object):
self.close()
class OpenWebRxReceiverClient(Client):
class OpenWebRxClient(Client, metaclass=ABCMeta):
def __init__(self, conn):
super().__init__(conn)
receiver_details = ReceiverDetails()
def send_receiver_info(*args):
receiver_info = receiver_details.__dict__()
self.write_receiver_details(receiver_info)
# TODO unsubscribe
receiver_details.wire(send_receiver_info)
send_receiver_info()
def write_receiver_details(self, details):
self.send({"type": "receiver_details", "value": details})
class OpenWebRxReceiverClient(OpenWebRxClient):
config_keys = [
"waterfall_colors",
"waterfall_min_level",
@ -68,7 +90,6 @@ class OpenWebRxReceiverClient(Client):
"waterfall_auto_level_margin",
"samp_rate",
"fft_size",
"fft_fps",
"audio_compression",
"fft_compression",
"max_clients",
@ -94,33 +115,16 @@ class OpenWebRxReceiverClient(Client):
self.close()
raise
pm = Config.get()
self.setSdr()
receiver_details = pm.filter(
"receiver_name",
"receiver_location",
"receiver_asl",
"receiver_gps",
"photo_title",
"photo_desc",
)
def send_receiver_info(*args):
receiver_info = receiver_details.__dict__()
receiver_info["locator"] = Locator.fromCoordinates(receiver_info["receiver_gps"])
self.write_receiver_details(receiver_info)
# TODO unsubscribe
receiver_details.wire(send_receiver_info)
send_receiver_info()
self.__sendProfiles()
features = FeatureDetector().feature_availability()
self.write_features(features)
modes = Modes.getModes()
self.write_modes(modes)
self.__sendProfiles()
CpuUsageThread.getSharedInstance().add_client(self)
def __sendProfiles(self):
@ -134,14 +138,19 @@ class OpenWebRxReceiverClient(Client):
def handleTextMessage(self, conn, message):
try:
message = json.loads(message)
logger.debug(message)
if "type" in message:
if message["type"] == "dspcontrol":
if "action" in message and message["action"] == "start":
self.startDsp()
if "params" in message:
params = message["params"]
self.setDspProperties(params)
dsp = self.getDsp()
if dsp is None:
logger.warning("DSP not available; discarding client data")
else:
params = message["params"]
dsp.setProperties(params)
elif message["type"] == "config":
if "params" in message:
@ -158,7 +167,7 @@ class OpenWebRxReceiverClient(Client):
if "params" in message:
self.connectionProperties = message["params"]
if self.dsp:
self.setDspProperties(self.connectionProperties)
self.getDsp().setProperties(self.connectionProperties)
else:
logger.warning("received message without type: {0}".format(message))
@ -175,6 +184,7 @@ class OpenWebRxReceiverClient(Client):
next = SdrService.getFirstSource()
if next is None:
# exit condition: no sdrs available
logger.warning("no more SDR devices available")
self.handleNoSdrsAvailable()
return
@ -190,16 +200,17 @@ class OpenWebRxReceiverClient(Client):
self.sdr = next
self.startDsp()
self.getDsp()
# keep trying until we find a suitable SDR
if self.sdr.getState() == SdrSource.STATE_FAILED:
self.write_log_message('SDR device "{0}" has failed, selecting new device'.format(self.sdr.getName()))
else:
# found a working sdr, exit the loop
if self.sdr.getState() != SdrSource.STATE_FAILED:
break
logger.warning('SDR device "%s" has failed, selecing new device', self.sdr.getName())
self.write_log_message('SDR device "{0}" has failed, selecting new device'.format(self.sdr.getName()))
# send initial config
self.setDspProperties(self.connectionProperties)
self.getDsp().setProperties(self.connectionProperties)
stack = PropertyStack()
stack.addLayer(0, self.sdr.getProps())
@ -231,9 +242,7 @@ class OpenWebRxReceiverClient(Client):
self.write_sdr_error("No SDR Devices available")
def startDsp(self):
if self.dsp is None and self.sdr is not None:
self.dsp = DspManager(self, self.sdr)
self.dsp.start()
self.getDsp().start()
def close(self):
self.stopDsp()
@ -254,6 +263,8 @@ class OpenWebRxReceiverClient(Client):
def setParams(self, params):
config = Config.get()
# allow direct configuration only if enabled in the config
if "configurable_keys" not in config:
return
keys = config["configurable_keys"]
if not keys:
return
@ -263,11 +274,15 @@ class OpenWebRxReceiverClient(Client):
stack.addLayer(1, config)
protected = stack.filter(*keys)
for key, value in params.items():
protected[key] = value
try:
protected[key] = value
except KeyError:
pass
def setDspProperties(self, params):
for key, value in params.items():
self.dsp.setProperty(key, value)
def getDsp(self):
if self.dsp is None and self.sdr is not None:
self.dsp = DspManager(self, self.sdr)
return self.dsp
def write_spectrum_data(self, data):
self.mp_send(bytes([0x01]) + data)
@ -297,9 +312,6 @@ class OpenWebRxReceiverClient(Client):
def write_config(self, cfg):
self.send({"type": "config", "value": cfg})
def write_receiver_details(self, details):
self.send({"type": "receiver_details", "value": details})
def write_profiles(self, profiles):
self.send({"type": "profiles", "value": profiles})
@ -333,8 +345,39 @@ class OpenWebRxReceiverClient(Client):
def write_backoff_message(self, reason):
self.send({"type": "backoff", "reason": reason})
def write_js8_message(self, frame: Js8Frame, freq: int):
self.send({"type": "js8_message", "value": {
"msg": str(frame),
"timestamp": frame.timestamp,
"db": frame.db,
"dt": frame.dt,
"freq": freq + frame.freq,
"thread_type": frame.thread_type,
"mode": frame.mode
}})
class MapConnection(Client):
def write_modes(self, modes):
def to_json(m):
res = {
"modulation": m.modulation,
"name": m.name,
"type": "digimode" if isinstance(m, DigitalMode) else "analog",
"requirements": m.requirements,
"squelch": m.squelch,
}
if m.bandpass is not None:
res["bandpass"] = {
"low_cut": m.bandpass.low_cut,
"high_cut": m.bandpass.high_cut
}
if isinstance(m, DigitalMode):
res["underlying"] = m.underlying
return res
self.send({"type": "modes", "value": [to_json(m) for m in modes]})
class MapConnection(OpenWebRxClient):
def __init__(self, conn):
super().__init__(conn)

View File

@ -1,6 +1,11 @@
from .template import WebpageController
from .session import SessionStorage
from owrx.config import Config
from urllib import parse
import logging
logger = logging.getLogger(__name__)
class Authentication(object):
@ -18,10 +23,11 @@ class AdminController(WebpageController):
def handle_request(self):
config = Config.get()
if not config["webadmin_enabled"]:
if "webadmin_enabled" not in config or not config["webadmin_enabled"]:
self.send_response("Web Admin is disabled", code=403)
return
if self.authentication.isAuthenticated(self.request):
super().handle_request()
else:
self.send_redirect("/login")
target = "/login?{0}".format(parse.urlencode({"ref": self.request.path}))
self.send_redirect(target)

View File

@ -1,5 +1,6 @@
from . import Controller
from owrx.feature import FeatureDetector
from owrx.details import ReceiverDetails
import json
@ -7,3 +8,8 @@ class ApiController(Controller):
def indexAction(self):
data = json.dumps(FeatureDetector().feature_report())
self.send_response(data, content_type="application/json")
def receiverDetails(self):
receiver_details = ReceiverDetails()
data = json.dumps(receiver_details.__dict__())
self.send_response(data, content_type="application/json")

View File

@ -4,13 +4,18 @@ from datetime import datetime
import mimetypes
import os
import pkg_resources
from abc import ABCMeta, abstractmethod
class AssetsController(Controller):
class AssetsController(Controller, metaclass=ABCMeta):
def getModified(self, file):
return None
return datetime.fromtimestamp(os.path.getmtime(self.getFilePath(file)))
def openFile(self, file):
return open(self.getFilePath(file), "rb")
@abstractmethod
def getFilePath(self, file):
pass
def serve_file(self, file, content_type=None):
@ -41,8 +46,8 @@ class AssetsController(Controller):
class OwrxAssetsController(AssetsController):
def openFile(self, file):
return pkg_resources.resource_stream("htdocs", file)
def getFilePath(self, file):
return pkg_resources.resource_filename("htdocs", file)
class AprsSymbolsController(AssetsController):
@ -57,8 +62,61 @@ class AprsSymbolsController(AssetsController):
def getFilePath(self, file):
return self.path + file
def getModified(self, file):
return datetime.fromtimestamp(os.path.getmtime(self.getFilePath(file)))
def openFile(self, file):
return open(self.getFilePath(file), "rb")
class CompiledAssetsController(Controller):
profiles = {
"receiver.js": [
"openwebrx.js",
"lib/jquery-3.2.1.min.js",
"lib/jquery.nanoscroller.js",
"lib/Header.js",
"lib/Demodulator.js",
"lib/DemodulatorPanel.js",
"lib/BookmarkBar.js",
"lib/BookmarkDialog.js",
"lib/AudioEngine.js",
"lib/ProgressBar.js",
"lib/Measurement.js",
"lib/FrequencyDisplay.js",
"lib/Js8Threads.js",
"lib/Modes.js",
],
"map.js": [
"lib/jquery-3.2.1.min.js",
"lib/chroma.min.js",
"lib/Header.js",
"map.js",
],
}
def indexAction(self):
profileName = self.request.matches.group(1)
if profileName not in CompiledAssetsController.profiles:
self.send_response("profile not found", code=404)
return
files = CompiledAssetsController.profiles[profileName]
files = [pkg_resources.resource_filename("htdocs", f) for f in files]
modified = self.getModified(files)
if modified is not None and "If-Modified-Since" in self.handler.headers:
client_modified = datetime.strptime(
self.handler.headers["If-Modified-Since"], "%a, %d %b %Y %H:%M:%S %Z"
)
if modified <= client_modified:
self.send_response("", code=304)
return
contents = [self.getContents(f) for f in files]
(content_type, encoding) = mimetypes.MimeTypes().guess_type(profileName)
self.send_response("\n".join(contents), content_type=content_type, last_modified=modified, max_age=3600)
def getContents(self, file):
with open(file) as f:
return f.read()
def getModified(self, files):
modified = [datetime.fromtimestamp(os.path.getmtime(f)) for f in files]
return max(*modified)

View File

@ -46,12 +46,12 @@ class SessionController(WebpageController):
if data["user"] in userlist:
user = userlist[data["user"]]
if user.password.is_valid(data["password"]):
# TODO pass the final destination
# TODO evaluate password force_change and redirect to password change
key = SessionStorage.getSharedInstance().startSession({"user": user.name})
cookie = SimpleCookie()
cookie["owrx-session"] = key
self.send_redirect("/admin", cookies=cookie)
target = self.request.query["ref"][0] if "ref" in self.request.query else "/settings"
self.send_redirect(target, cookies=cookie)
return
self.send_redirect("/login")

View File

@ -11,7 +11,10 @@ from owrx.form import (
DropdownInput,
Option,
ServicesCheckboxInput,
Js8ProfileCheckboxInput,
)
from urllib.parse import quote
import json
import logging
logger = logging.getLogger(__name__)
@ -43,6 +46,41 @@ class Section(object):
class SettingsController(AdminController):
def indexAction(self):
self.serve_template("settings.html", **self.template_variables())
class SdrSettingsController(AdminController):
def template_variables(self):
variables = super().template_variables()
variables["devices"] = self.render_devices()
return variables
def render_devices(self):
return "".join(self.render_device(key, value) for key, value in Config.get()["sdrs"].items())
def render_device(self, device_id, config):
return """
<div class="card device bg-dark text-white">
<div class="card-header">
{device_name}
</div>
<div class="card-body">
{form}
</div>
</div>
""".format(device_name=config["name"], form=self.render_form(device_id, config))
def render_form(self, device_id, config):
return """
<form class="sdrdevice" data-config="{formdata}"></form>
""".format(device_id=device_id, formdata=quote(json.dumps(config)))
def indexAction(self):
self.serve_template("sdrsettings.html", **self.template_variables())
class GeneralSettingsController(AdminController):
sections = [
Section(
"General settings",
@ -145,14 +183,23 @@ class SettingsController(AdminController):
),
),
Section(
"WSJT-X settings",
NumberInput("wsjt_queue_workers", "Number of WSJT decoding workers"),
NumberInput("wsjt_queue_length", "Maximum length of WSJT job queue"),
"Decoding settings",
NumberInput("decoding_queue_workers", "Number of decoding workers"),
NumberInput("decoding_queue_length", "Maximum length of decoding job queue"),
NumberInput(
"wsjt_decoding_depth",
"WSJT decoding depth",
"Default WSJT decoding depth",
infotext="A higher decoding depth will allow more results, but will also consume more cpu",
),
NumberInput(
"js8_decoding_depth",
"Js8Call decoding depth",
infotext="A higher decoding depth will allow more results, but will also consume more cpu",
),
Js8ProfileCheckboxInput(
"js8_enabled_profiles",
"Js8Call enabled modes"
),
),
Section(
"Background decoding",
@ -212,7 +259,7 @@ class SettingsController(AdminController):
]
def render_sections(self):
sections = "".join(section.render() for section in SettingsController.sections)
sections = "".join(section.render() for section in GeneralSettingsController.sections)
return """
<form class="settings-body" method="POST">
{sections}
@ -225,7 +272,7 @@ class SettingsController(AdminController):
)
def indexAction(self):
self.serve_template("admin.html", **self.template_variables())
self.serve_template("generalsettings.html", **self.template_variables())
def template_variables(self):
variables = super().template_variables()
@ -235,7 +282,7 @@ class SettingsController(AdminController):
def processFormData(self):
data = parse_qs(self.get_body().decode("utf-8"))
data = {
k: v for i in SettingsController.sections for k, v in i.parse(data).items()
k: v for i in GeneralSettingsController.sections for k, v in i.parse(data).items()
}
config = Config.get()
for k, v in data.items():

View File

@ -5,6 +5,7 @@ from owrx.sdr import SdrService
from owrx.config import Config
import os
import json
import pkg_resources
class StatusController(Controller):
@ -12,6 +13,7 @@ class StatusController(Controller):
pm = Config.get()
# convert to old format
gps = (pm["receiver_gps"]["lat"], pm["receiver_gps"]["lon"])
avatar_path = pkg_resources.resource_filename("htdocs", "gfx/openwebrx-avatar.png")
# TODO keys that have been left out since they are no longer simple strings: sdr_hw, bands, antenna
vars = {
"status": "active",
@ -23,7 +25,7 @@ class StatusController(Controller):
"asl": pm["receiver_asl"],
"loc": pm["receiver_location"],
"sw_version": openwebrx_version,
"avatar_ctime": os.path.getctime("htdocs/gfx/openwebrx-avatar.png"),
"avatar_ctime": os.path.getctime(avatar_path),
}
self.send_response("\n".join(["{key}={value}".format(key=key, value=value) for key, value in vars.items()]))

View File

@ -1,6 +1,7 @@
from . import Controller
import pkg_resources
from string import Template
from owrx.config import Config
class TemplateController(Controller):
@ -19,7 +20,11 @@ class TemplateController(Controller):
class WebpageController(TemplateController):
def template_variables(self):
header = self.render_template("include/header.include.html")
settingslink = ""
pm = Config.get()
if "webadmin_enabled" in pm and pm["webadmin_enabled"]:
settingslink = """<a class="button" href="settings" target="openwebrx-settings"><img src="static/gfx/openwebrx-panel-settings.png" alt="Settings"/><br/>Settings</a>"""
header = self.render_template("include/header.include.html", settingslink=settingslink)
return {"header": header}

21
owrx/details.py Normal file
View File

@ -0,0 +1,21 @@
from owrx.config import Config
from owrx.locator import Locator
from owrx.property import PropertyFilter
class ReceiverDetails(PropertyFilter):
def __init__(self):
super().__init__(
Config.get(),
"receiver_name",
"receiver_location",
"receiver_asl",
"receiver_gps",
"photo_title",
"photo_desc",
)
def __dict__(self):
receiver_info = super().__dict__()
receiver_info["locator"] = Locator.fromCoordinates(receiver_info["receiver_gps"])
return receiver_info

View File

@ -1,10 +1,11 @@
from owrx.config import Config
from owrx.meta import MetaParser
from owrx.wsjt import WsjtParser
from owrx.js8 import Js8Parser
from owrx.aprs import AprsParser
from owrx.pocsag import PocsagParser
from owrx.source import SdrSource
from owrx.property import PropertyStack, PropertyLayer
from owrx.modes import Modes
from csdr import csdr
import threading
@ -22,6 +23,7 @@ class DspManager(csdr.output):
"wsjt_demod": WsjtParser(self.handler),
"packet_demod": AprsParser(self.handler),
"pocsag_demod": PocsagParser(self.handler),
"js8_demod": Js8Parser(self.handler),
}
self.props = PropertyStack()
@ -35,6 +37,7 @@ class DspManager(csdr.output):
"offset_freq",
"mod",
"secondary_offset_freq",
"dmr_filter",
))
# properties that we inherit from the sdr
self.props.addLayer(1, self.sdrSource.getProps().filter(
@ -47,9 +50,10 @@ class DspManager(csdr.output):
"digimodes_enable",
"samp_rate",
"digital_voice_unvoiced_quality",
"dmr_filter",
"temporary_directory",
"center_freq",
"start_mod",
"start_freq",
))
self.dsp = csdr.dsp(self)
@ -70,6 +74,20 @@ class DspManager(csdr.output):
for parser in self.parsers.values():
parser.setDialFrequency(freq)
if "start_mod" in self.props:
self.dsp.set_demodulator(self.props["start_mod"])
mode = Modes.findByModulation(self.props["start_mod"])
if mode and mode.bandpass:
self.dsp.set_bpf(mode.bandpass.low_cut, mode.bandpass.high_cut)
else:
self.dsp.set_bpf(-4000, 4000)
if "start_freq" in self.props and "center_freq" in self.props:
self.dsp.set_offset_freq(self.props["start_freq"] - self.props["center_freq"])
else:
self.dsp.set_offset_freq(0)
self.subscriptions = [
self.props.wireProperty("audio_compression", self.dsp.set_audio_compression),
self.props.wireProperty("fft_compression", self.dsp.set_fft_compression),
@ -88,8 +106,6 @@ class DspManager(csdr.output):
self.props.filter("center_freq", "offset_freq").wire(set_dial_freq),
]
self.dsp.set_offset_freq(0)
self.dsp.set_bpf(-4000, 4000)
self.dsp.csdr_dynamic_bufsize = self.props["csdr_dynamic_bufsize"]
self.dsp.csdr_print_bufsizes = self.props["csdr_print_bufsizes"]
self.dsp.csdr_through = self.props["csdr_through"]
@ -114,6 +130,8 @@ class DspManager(csdr.output):
self.props.wireProperty("secondary_offset_freq", self.dsp.set_secondary_offset_freq),
]
self.startOnAvailable = False
self.sdrSource.addClient(self)
super().__init__()
@ -121,6 +139,8 @@ class DspManager(csdr.output):
def start(self):
if self.sdrSource.isAvailable():
self.dsp.start()
else:
self.startOnAvailable = True
def receive_output(self, t, read_fn):
logger.debug("adding new output of type %s", t)
@ -139,11 +159,16 @@ class DspManager(csdr.output):
def stop(self):
self.dsp.stop()
self.startOnAvailable = False
self.sdrSource.removeClient(self)
for sub in self.subscriptions:
sub.cancel()
self.subscriptions = []
def setProperties(self, props):
for k, v in props.items():
self.setProperty(k, v)
def setProperty(self, prop, value):
self.props[prop] = value
@ -153,7 +178,9 @@ class DspManager(csdr.output):
def onStateChange(self, state):
if state == SdrSource.STATE_RUNNING:
logger.debug("received STATE_RUNNING, attempting DspSource restart")
self.dsp.start()
if self.startOnAvailable:
self.dsp.start()
self.startOnAvailable = False
elif state == SdrSource.STATE_STOPPING:
logger.debug("received STATE_STOPPING, shutting down DspSource")
self.dsp.stop()

View File

@ -29,7 +29,7 @@ class FeatureDetector(object):
"airspy": ["soapy_connector", "soapy_airspy"],
"airspyhf": ["soapy_connector", "soapy_airspyhf"],
"lime_sdr": ["soapy_connector", "soapy_lime_sdr"],
"fifi_sdr": ["alsa"],
"fifi_sdr": ["alsa", "rockprog"],
"pluto_sdr": ["soapy_connector", "soapy_pluto_sdr"],
"soapy_remote": ["soapy_connector", "soapy_remote"],
"uhd": ["soapy_connector", "soapy_uhd"],
@ -40,6 +40,7 @@ class FeatureDetector(object):
"wsjt-x": ["wsjtx", "sox"],
"packet": ["direwolf", "sox"],
"pocsag": ["digiham", "sox"],
"js8call": ["js8", "sox"],
}
def feature_availability(self):
@ -246,13 +247,13 @@ class FeatureDetector(object):
def _has_soapy_driver(self, driver):
try:
process = subprocess.Popen(["SoapySDRUtil", "--info"], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
factory_regex = re.compile("^Available factories\\.\\.\\. (.*)$")
factory_regex = re.compile("^Available factories\\.\\.\\. ?(.*)$")
drivers = []
for line in process.stdout:
matches = factory_regex.match(line.decode())
if matches:
drivers = [s.strip() for s in matches[1].split(", ")]
drivers = [s.strip() for s in matches.group(1).split(", ")]
return driver in drivers
except FileNotFoundError:
@ -370,9 +371,27 @@ class FeatureDetector(object):
"""
return reduce(and_, map(self.command_is_runnable, ["jt9", "wsprd"]), True)
def has_js8(self):
"""
To decode JS8, you will need to install [JS8Call](http://js8call.com/)
Please note that the `js8` command line decoder is not made available on $PATH by some JS8Call package builds.
You will need to manually make it available by either linking it to `/usr/bin` or by adding its location to
$PATH.
"""
return self.command_is_runnable("js8")
def has_alsa(self):
"""
Some SDR receivers are identifying themselves as a soundcard. In order to read their data, OpenWebRX relies
on the Alsa library. It is available as a package for most Linux distributions.
"""
return self.command_is_runnable("arecord --help")
def has_rockprog(self):
"""
The "rockprog" executable is required to send commands to your FiFiSDR. It needs to be installed separately.
You can find instructions and downloads [here](https://o28.sischa.net/fifisdr/trac/wiki/De%3Arockprog).
"""
return self.command_is_runnable("rockprog")

View File

@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from owrx.service import ServiceDetector
from owrx.modes import Modes
from owrx.config import Config
@ -196,11 +196,22 @@ class MultiCheckboxInput(Input):
class ServicesCheckboxInput(MultiCheckboxInput):
def __init__(self, id, label, infotext=None):
services = [
Option(s, s.upper()) for s in ServiceDetector.getAvailableServices()
Option(s.modulation, s.name) for s in Modes.getAvailableServices()
]
super().__init__(id, label, services, infotext)
class Js8ProfileCheckboxInput(MultiCheckboxInput):
def __init__(self, id, label, infotext=None):
profiles = [
Option("normal", "Normal (15s, 50Hz, ~16WPM)"),
Option("slow", "Slow (30s, 25Hz, ~8WPM"),
Option("fast", "Fast (10s, 80Hz, ~24WPM"),
Option("turbo", "Turbo (6s, 160Hz, ~40WPM"),
]
super().__init__(id, label, profiles, infotext)
class DropdownInput(Input):
def __init__(self, id, label, options, infotext=None):
super().__init__(id, label, infotext=infotext)

View File

@ -6,12 +6,13 @@ from owrx.controllers.template import (
)
from owrx.controllers.assets import (
OwrxAssetsController,
AprsSymbolsController
AprsSymbolsController,
CompiledAssetsController
)
from owrx.controllers.websocket import WebSocketController
from owrx.controllers.api import ApiController
from owrx.controllers.metrics import MetricsController
from owrx.controllers.settings import SettingsController
from owrx.controllers.settings import SettingsController, GeneralSettingsController, SdrSettingsController
from owrx.controllers.session import SessionController
from http.server import BaseHTTPRequestHandler
from urllib.parse import urlparse, parse_qs
@ -91,6 +92,7 @@ class Router(object):
StaticRoute("/status", StatusController),
StaticRoute("/status.json", StatusController, options={"action": "jsonAction"}),
RegexRoute("/static/(.+)", OwrxAssetsController),
RegexRoute("/compiled/(.+)", CompiledAssetsController),
RegexRoute("/aprs-symbols/(.+)", AprsSymbolsController),
StaticRoute("/ws/", WebSocketController),
RegexRoute("(/favicon.ico)", OwrxAssetsController),
@ -99,9 +101,12 @@ class Router(object):
StaticRoute("/map", MapController),
StaticRoute("/features", FeatureController),
StaticRoute("/api/features", ApiController),
StaticRoute("/api/receiverdetails", ApiController, options={"action": "receiverDetails"}),
StaticRoute("/metrics", MetricsController),
StaticRoute("/admin", SettingsController),
StaticRoute("/admin", SettingsController, method="POST", options={"action": "processFormData"}),
StaticRoute("/settings", SettingsController),
StaticRoute("/generalsettings", GeneralSettingsController),
StaticRoute("/generalsettings", GeneralSettingsController, method="POST", options={"action": "processFormData"}),
StaticRoute("/sdrsettings", SdrSettingsController),
StaticRoute("/login", SessionController, options={"action": "loginAction"}),
StaticRoute("/login", SessionController, method="POST", options={"action": "processLoginAction"}),
StaticRoute("/logout", SessionController, options={"action": "logoutAction"}),

132
owrx/js8.py Normal file
View File

@ -0,0 +1,132 @@
from .audio import AudioChopperProfile
from .parser import Parser
import re
from js8py import Js8
from js8py.frames import Js8FrameHeartbeat, Js8FrameCompound
from .map import Map, LocatorLocation
from .pskreporter import PskReporter
from .metrics import Metrics, CounterMetric
from .config import Config
from abc import ABCMeta, abstractmethod
import logging
logger = logging.getLogger(__name__)
class Js8Profiles(object):
@staticmethod
def getEnabledProfiles():
config = Config.get()
profiles = config["js8_enabled_profiles"] if "js8_enabled_profiles" in config else []
return [Js8Profiles.loadProfile(p) for p in profiles]
@staticmethod
def loadProfile(profileName):
className = "Js8{0}Profile".format(profileName[0].upper() + profileName[1:].lower())
return globals()[className]()
class Js8Profile(AudioChopperProfile, metaclass=ABCMeta):
def decoding_depth(self, mode):
pm = Config.get()
# return global default
if "js8_decoding_depth" in pm:
return pm["js8_decoding_depth"]
# default when no setting is provided
return 3
def getFileTimestampFormat(self):
return "%y%m%d_%H%M%S"
def decoder_commandline(self, file):
return ["js8", "--js8", "-b", self.get_sub_mode(), "-d", str(self.decoding_depth("js8")), file]
@abstractmethod
def get_sub_mode(self):
pass
class Js8NormalProfile(Js8Profile):
def getInterval(self):
return 15
def get_sub_mode(self):
return "A"
class Js8SlowProfile(Js8Profile):
def getInterval(self):
return 30
def get_sub_mode(self):
return "E"
class Js8FastProfile(Js8Profile):
def getInterval(self):
return 10
def get_sub_mode(self):
return "B"
class Js8TurboProfile(Js8Profile):
def getInterval(self):
return 6
def get_sub_mode(self):
return "C"
class Js8Parser(Parser):
decoderRegex = re.compile(" ?<Decode(Started|Debug|Finished)>")
def parse(self, messages):
for raw in messages:
try:
freq, raw_msg = raw
self.setDialFrequency(freq)
msg = raw_msg.decode().rstrip()
if Js8Parser.decoderRegex.match(msg):
return
if msg.startswith(" EOF on input file"):
return
frame = Js8().parse_message(msg)
self.handler.write_js8_message(frame, self.dial_freq)
self.pushDecode()
if (isinstance(frame, Js8FrameHeartbeat) or isinstance(frame, Js8FrameCompound)) and frame.grid:
Map.getSharedInstance().updateLocation(
frame.callsign, LocatorLocation(frame.grid), "JS8", self.band
)
PskReporter.getSharedInstance().spot({
"callsign": frame.callsign,
"mode": "JS8",
"locator": frame.grid,
"freq": self.dial_freq + frame.freq,
"db": frame.db,
"timestamp": frame.timestamp,
"msg": str(frame)
})
except Exception:
logger.exception("error while parsing js8 message")
def pushDecode(self):
metrics = Metrics.getSharedInstance()
band = "unknown"
if self.band is not None:
band = self.band.getName()
if band is None:
band = "unknown"
name = "js8call.decodes.{band}.JS8".format(band=band)
metric = metrics.getMetric(name)
if metric is None:
metric = CounterMetric()
metrics.addMetric(name, metric)
metric.inc()

90
owrx/modes.py Normal file
View File

@ -0,0 +1,90 @@
from owrx.feature import FeatureDetector
from functools import reduce
class Bandpass(object):
def __init__(self, low_cut, high_cut):
self.low_cut = low_cut
self.high_cut = high_cut
class Mode(object):
def __init__(self, modulation, name, bandpass: Bandpass = None, requirements=None, service=False, squelch=True):
self.modulation = modulation
self.name = name
self.requirements = requirements if requirements is not None else []
self.service = service
self.bandpass = bandpass
self.squelch = squelch
def is_available(self):
fd = FeatureDetector()
return reduce(lambda a, b: a and b, [fd.is_available(r) for r in self.requirements], True)
def is_service(self):
return self.service
class AnalogMode(Mode):
pass
class DigitalMode(Mode):
def __init__(
self, modulation, name, underlying, bandpass: Bandpass = None, requirements=None, service=False, squelch=True
):
super().__init__(modulation, name, bandpass, requirements, service, squelch)
self.underlying = underlying
class Modes(object):
mappings = [
AnalogMode("nfm", "FM", bandpass=Bandpass(-4000, 4000)),
AnalogMode("am", "AM", bandpass=Bandpass(-4000, 4000)),
AnalogMode("lsb", "LSB", bandpass=Bandpass(-3000, -300)),
AnalogMode("usb", "USB", bandpass=Bandpass(300, 3000)),
AnalogMode("cw", "CW", bandpass=Bandpass(700, 900)),
AnalogMode("dmr", "DMR", bandpass=Bandpass(-4000, 4000), requirements=["digital_voice_digiham"], squelch=False),
AnalogMode("dstar", "DStar", bandpass=Bandpass(-3250, 3250), requirements=["digital_voice_dsd"], squelch=False),
AnalogMode("nxdn", "NXDN", bandpass=Bandpass(-3250, 3250), requirements=["digital_voice_dsd"], squelch=False),
AnalogMode("ysf", "YSF", bandpass=Bandpass(-4000, 4000), requirements=["digital_voice_digiham"], squelch=False),
DigitalMode("bpsk31", "BPSK31", underlying=["usb"]),
DigitalMode("bpsk63", "BPSK63", underlying=["usb"]),
DigitalMode("ft8", "FT8", underlying=["usb"], requirements=["wsjt-x"], service=True),
DigitalMode("ft4", "FT4", underlying=["usb"], requirements=["wsjt-x"], service=True),
DigitalMode("jt65", "JT65", underlying=["usb"], requirements=["wsjt-x"], service=True),
DigitalMode("jt9", "JT9", underlying=["usb"], requirements=["wsjt-x"], service=True),
DigitalMode(
"wspr", "WSPR", underlying=["usb"], bandpass=Bandpass(1350, 1650), requirements=["wsjt-x"], service=True
),
DigitalMode("js8", "JS8Call", underlying=["usb"], requirements=["js8call"], service=True),
DigitalMode(
"packet", "Packet", underlying=["nfm", "usb", "lsb"], requirements=["packet"], service=True, squelch=False
),
DigitalMode(
"pocsag",
"Pocsag",
underlying=["nfm"],
bandpass=Bandpass(-6000, 6000),
requirements=["pocsag"],
squelch=False,
),
]
@staticmethod
def getModes():
return Modes.mappings
@staticmethod
def getAvailableModes():
return [m for m in Modes.getModes() if m.is_available()]
@staticmethod
def getAvailableServices():
return [m for m in Modes.getAvailableModes() if m.is_service()]
@staticmethod
def findByModulation(modulation):
modes = [m for m in Modes.getAvailableModes() if m.modulation == modulation]
if modes:
return modes[0]

View File

@ -40,6 +40,10 @@ class PropertyManager(ABC):
def __dict__(self):
pass
@abstractmethod
def __delitem__(self, key):
pass
@abstractmethod
def keys(self):
pass
@ -98,6 +102,9 @@ class PropertyLayer(PropertyManager):
def __dict__(self):
return {k: v for k, v in self.properties.items()}
def __delitem__(self, key):
return self.properties.__delitem__(key)
def keys(self):
return self.properties.keys()
@ -132,6 +139,11 @@ class PropertyFilter(PropertyManager):
def __dict__(self):
return {k: v for k, v in self.pm.__dict__().items() if k in self.props}
def __delitem__(self, key):
if key not in self.props:
raise KeyError(key)
return self.pm.__delitem__(key)
def keys(self):
return [k for k in self.pm.keys() if k in self.props]
@ -226,5 +238,9 @@ class PropertyStack(PropertyManager):
def __dict__(self):
return {k: self.__getitem__(k) for k in self.keys()}
def __delitem__(self, key):
for layer in self.layers:
layer["props"].__delitem__(key)
def keys(self):
return set([key for l in self.layers for key in l["props"].keys()])

View File

@ -30,7 +30,7 @@ class PskReporter(object):
sharedInstance = None
creationLock = threading.Lock()
interval = 300
supportedModes = ["FT8", "FT4", "JT9", "JT65"]
supportedModes = ["FT8", "FT4", "JT9", "JT65", "JS8"]
@staticmethod
def getSharedInstance():

View File

@ -5,13 +5,14 @@ from owrx.bands import Bandplan
from csdr.csdr import dsp, output
from owrx.wsjt import WsjtParser
from owrx.aprs import AprsParser
from owrx.js8 import Js8Parser
from owrx.config import Config
from owrx.source.resampler import Resampler
from owrx.feature import FeatureDetector
from owrx.property import PropertyLayer
from js8py import Js8Frame
from abc import ABCMeta, abstractmethod
from .schedule import ServiceScheduler
from functools import reduce
from owrx.modes import Modes
import logging
@ -50,28 +51,12 @@ class AprsServiceOutput(ServiceOutput):
return t == "packet_demod"
class ServiceDetector(object):
requirements = {
"ft8": ["wsjt-x"],
"ft4": ["wsjt-x"],
"jt65": ["wsjt-x"],
"jt9": ["wsjt-x"],
"wspr": ["wsjt-x"],
"packet": ["packet"],
}
class Js8ServiceOutput(ServiceOutput):
def getParser(self):
return Js8Parser(Js8Handler())
@staticmethod
def getAvailableServices():
# TODO this should be in a more central place (the frontend also needs this)
fd = FeatureDetector()
return [
name
for name, requirements in ServiceDetector.requirements.items()
if reduce(
lambda a, b: a and b, [fd.is_available(r) for r in requirements], True
)
]
def supports_type(self, t):
return t == "js8_demod"
class ServiceHandler(object):
@ -109,7 +94,7 @@ class ServiceHandler(object):
def isSupported(self, mode):
configured = Config.get()["services_decoders"]
available = ServiceDetector.getAvailableServices()
available = [m.modulation for m in Modes.getAvailableServices()]
return mode in configured and mode in available
def shutdown(self):
@ -258,6 +243,8 @@ class ServiceHandler(object):
# TODO selecting outputs will need some more intelligence here
if mode == "packet":
output = AprsServiceOutput(frequency)
elif mode == "js8":
output = Js8ServiceOutput(frequency)
else:
output = WsjtServiceOutput(frequency)
d = dsp(output)
@ -278,6 +265,7 @@ class ServiceHandler(object):
d.set_secondary_demodulator(mode)
d.set_audio_compression("none")
d.set_samp_rate(source.getProps()["samp_rate"])
d.set_temporary_directory(Config.get()['temporary_directory'])
d.set_service()
d.start()
return d
@ -293,6 +281,11 @@ class AprsHandler(object):
pass
class Js8Handler(object):
def write_js8_message(self, frame: Js8Frame, freq: int):
pass
class Services(object):
handlers = []

View File

@ -1,5 +1,6 @@
from .direct import DirectSource
from owrx.command import Flag, Option
from owrx.command import Option
import time
class HackrfSource(DirectSource):
@ -11,8 +12,12 @@ class HackrfSource(DirectSource):
"rf_gain": Option("-g"),
"lna_gain": Option("-l"),
"rf_amp": Option("-a"),
"ppm": Option("-C"),
}
).setStatic("-r-")
def getFormatConversion(self):
return ["csdr convert_s8_f"]
def sleepOnRestart(self):
time.sleep(1)

View File

@ -1,10 +1,4 @@
from .direct import DirectSource
from . import SdrSource
import subprocess
import threading
import os
import socket
import time
import logging
@ -29,7 +23,7 @@ class Resampler(DirectSource):
def getCommand(self):
return [
"nc -v 127.0.0.1 {nc_port}".format(nc_port=self.sdr.getPort()),
"csdr shift_addition_cc {shift}".format(shift=self.shift),
"csdr shift_addfast_cc {shift}".format(shift=self.shift),
"csdr fir_decimate_cc {decimation} {ddc_transition_bw} HAMMING".format(
decimation=self.decimation, ddc_transition_bw=self.transition_bw
),

View File

@ -9,6 +9,7 @@ class SdrplaySource(SoapyConnectorSource):
"bias_tee": "biasT_ctrl",
"rf_notch": "rfnotch_ctrl",
"dab_notch": "dabnotch_ctrl",
"if_mode": "if_mode",
}
)
return mappings

View File

@ -199,9 +199,15 @@ class WebSocketConnection(object):
data = bytes([b ^ masking_key[index % 4] for (index, b) in enumerate(data)])
if opcode == OPCODE_TEXT_MESSAGE:
message = data.decode("utf-8")
self.messageHandler.handleTextMessage(self, message)
try:
self.messageHandler.handleTextMessage(self, message)
except Exception:
logger.exception("Exception in websocket handler handleTextMessage()")
elif opcode == OPCODE_BINARY_MESSAGE:
self.messageHandler.handleBinaryMessage(self, data)
try:
self.messageHandler.handleBinaryMessage(self, data)
except Exception:
logger.exception("Exception in websocket handler handleBinaryMessage()")
elif opcode == OPCODE_PING:
self.sendPong()
elif opcode == OPCODE_PONG:

View File

@ -1,215 +1,19 @@
import threading
import wave
from datetime import datetime, timedelta, timezone
import subprocess
import os
from multiprocessing.connection import Pipe
from datetime import datetime, timezone
from owrx.map import Map, LocatorLocation
import re
from queue import Queue, Full
from owrx.config import Config
from owrx.metrics import Metrics, CounterMetric, DirectMetric
from owrx.metrics import Metrics, CounterMetric
from owrx.pskreporter import PskReporter
from owrx.parser import Parser
from owrx.audio import AudioChopperProfile
from abc import ABC, ABCMeta, abstractmethod
from owrx.config import Config
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class QueueJob(object):
def __init__(self, decoder, file, freq):
self.decoder = decoder
self.file = file
self.freq = freq
def run(self):
self.decoder.decode(self)
class WsjtQueueWorker(threading.Thread):
def __init__(self, queue):
self.queue = queue
self.doRun = True
super().__init__(daemon=True)
def run(self) -> None:
while self.doRun:
job = self.queue.get()
try:
job.run()
except Exception:
logger.exception("failed to decode job")
self.queue.onError()
self.queue.task_done()
class WsjtQueue(Queue):
sharedInstance = None
creationLock = threading.Lock()
@staticmethod
def getSharedInstance():
with WsjtQueue.creationLock:
if WsjtQueue.sharedInstance is None:
pm = Config.get()
WsjtQueue.sharedInstance = WsjtQueue(maxsize=pm["wsjt_queue_length"], workers=pm["wsjt_queue_workers"])
return WsjtQueue.sharedInstance
def __init__(self, maxsize, workers):
super().__init__(maxsize)
metrics = Metrics.getSharedInstance()
metrics.addMetric("wsjt.queue.length", DirectMetric(self.qsize))
self.inCounter = CounterMetric()
metrics.addMetric("wsjt.queue.in", self.inCounter)
self.outCounter = CounterMetric()
metrics.addMetric("wsjt.queue.out", self.outCounter)
self.overflowCounter = CounterMetric()
metrics.addMetric("wsjt.queue.overflow", self.overflowCounter)
self.errorCounter = CounterMetric()
metrics.addMetric("wsjt.queue.error", self.errorCounter)
self.workers = [self.newWorker() for _ in range(0, workers)]
def put(self, item):
self.inCounter.inc()
try:
super(WsjtQueue, self).put(item, block=False)
except Full:
self.overflowCounter.inc()
raise
def get(self, **kwargs):
# super.get() is blocking, so it would mess up the stats to inc() first
out = super(WsjtQueue, self).get(**kwargs)
self.outCounter.inc()
return out
def newWorker(self):
worker = WsjtQueueWorker(self)
worker.start()
return worker
def onError(self):
self.errorCounter.inc()
class WsjtChopper(threading.Thread, metaclass=ABCMeta):
def __init__(self, dsp, source):
self.dsp = dsp
self.source = source
self.tmp_dir = Config.get()["temporary_directory"]
(self.wavefilename, self.wavefile) = self.getWaveFile()
self.switchingLock = threading.Lock()
self.timer = None
(self.outputReader, self.outputWriter) = Pipe()
self.doRun = True
super().__init__()
@abstractmethod
def getInterval(self):
pass
@abstractmethod
def getFileTimestampFormat(self):
pass
def getWaveFile(self):
filename = "{tmp_dir}/openwebrx-wsjtchopper-{id}-{timestamp}.wav".format(
tmp_dir=self.tmp_dir, id=id(self), timestamp=datetime.utcnow().strftime(self.getFileTimestampFormat())
)
wavefile = wave.open(filename, "wb")
wavefile.setnchannels(1)
wavefile.setsampwidth(2)
wavefile.setframerate(12000)
return filename, wavefile
def getNextDecodingTime(self):
t = datetime.utcnow()
zeroed = t.replace(minute=0, second=0, microsecond=0)
delta = t - zeroed
interval = self.getInterval()
seconds = (int(delta.total_seconds() / interval) + 1) * interval
t = zeroed + timedelta(seconds=seconds)
logger.debug("scheduling: {0}".format(t))
return t
def cancelTimer(self):
if self.timer:
self.timer.cancel()
def _scheduleNextSwitch(self):
if self.doRun:
delta = self.getNextDecodingTime() - datetime.utcnow()
self.timer = threading.Timer(delta.total_seconds(), self.switchFiles)
self.timer.start()
def switchFiles(self):
self.switchingLock.acquire()
file = self.wavefile
filename = self.wavefilename
(self.wavefilename, self.wavefile) = self.getWaveFile()
self.switchingLock.release()
file.close()
try:
WsjtQueue.getSharedInstance().put(QueueJob(self, filename, self.dsp.get_operating_freq()))
except Full:
logger.warning("wsjt decoding queue overflow; dropping one file")
os.unlink(filename)
self._scheduleNextSwitch()
@abstractmethod
def decoder_commandline(self, file):
pass
def decode(self, job: QueueJob):
logger.debug("processing file %s", job.file)
decoder = subprocess.Popen(
["nice", "-n", "10"] + self.decoder_commandline(job.file),
stdout=subprocess.PIPE,
cwd=self.tmp_dir,
close_fds=True,
)
for line in decoder.stdout:
self.outputWriter.send((job.freq, line))
try:
rc = decoder.wait(timeout=10)
if rc != 0:
logger.warning("decoder return code: %i", rc)
except subprocess.TimeoutExpired:
logger.warning("subprocess (pid=%i}) did not terminate correctly; sending kill signal.", decoder.pid)
decoder.kill()
os.unlink(job.file)
def run(self) -> None:
logger.debug("WSJT chopper starting up")
self._scheduleNextSwitch()
while self.doRun:
data = self.source.read(256)
if data is None or (isinstance(data, bytes) and len(data) == 0):
self.doRun = False
else:
self.switchingLock.acquire()
self.wavefile.writeframes(data)
self.switchingLock.release()
logger.debug("WSJT chopper shutting down")
self.outputReader.close()
self.outputWriter.close()
self.cancelTimer()
try:
os.unlink(self.wavefilename)
except Exception:
logger.exception("error removing undecoded file")
def read(self):
try:
return self.outputReader.recv()
except EOFError:
return None
class WsjtProfile(AudioChopperProfile, metaclass=ABCMeta):
def decoding_depth(self, mode):
pm = Config.get()
# mode-specific setting?
@ -222,7 +26,7 @@ class WsjtChopper(threading.Thread, metaclass=ABCMeta):
return 3
class Ft8Chopper(WsjtChopper):
class Ft8Profile(WsjtProfile):
def getInterval(self):
return 15
@ -233,7 +37,7 @@ class Ft8Chopper(WsjtChopper):
return ["jt9", "--ft8", "-d", str(self.decoding_depth("ft8")), file]
class WsprChopper(WsjtChopper):
class WsprProfile(WsjtProfile):
def getInterval(self):
return 120
@ -248,7 +52,7 @@ class WsprChopper(WsjtChopper):
return cmd
class Jt65Chopper(WsjtChopper):
class Jt65Profile(WsjtProfile):
def getInterval(self):
return 60
@ -259,7 +63,7 @@ class Jt65Chopper(WsjtChopper):
return ["jt9", "--jt65", "-d", str(self.decoding_depth("jt65")), file]
class Jt9Chopper(WsjtChopper):
class Jt9Profile(WsjtProfile):
def getInterval(self):
return 60
@ -270,7 +74,7 @@ class Jt9Chopper(WsjtChopper):
return ["jt9", "--jt9", "-d", str(self.decoding_depth("jt9")), file]
class Ft4Chopper(WsjtChopper):
class Ft4Profile(WsjtProfile):
def getInterval(self):
return 7.5
@ -284,34 +88,35 @@ class Ft4Chopper(WsjtChopper):
class WsjtParser(Parser):
modes = {"~": "FT8", "#": "JT65", "@": "JT9", "+": "FT4"}
def parse(self, data):
try:
freq, raw_msg = data
self.setDialFrequency(freq)
msg = raw_msg.decode().rstrip()
# known debug messages we know to skip
if msg.startswith("<DecodeFinished>"):
return
if msg.startswith(" EOF on input file"):
return
def parse(self, messages):
for data in messages:
try:
freq, raw_msg = data
self.setDialFrequency(freq)
msg = raw_msg.decode().rstrip()
# known debug messages we know to skip
if msg.startswith("<DecodeFinished>"):
return
if msg.startswith(" EOF on input file"):
return
modes = list(WsjtParser.modes.keys())
if msg[21] in modes or msg[19] in modes:
decoder = Jt9Decoder()
else:
decoder = WsprDecoder()
out = decoder.parse(msg, freq)
if "mode" in out:
self.pushDecode(out["mode"])
if "callsign" in out and "locator" in out:
Map.getSharedInstance().updateLocation(
out["callsign"], LocatorLocation(out["locator"]), out["mode"], self.band
)
PskReporter.getSharedInstance().spot(out)
modes = list(WsjtParser.modes.keys())
if msg[21] in modes or msg[19] in modes:
decoder = Jt9Decoder()
else:
decoder = WsprDecoder()
out = decoder.parse(msg, freq)
if "mode" in out:
self.pushDecode(out["mode"])
if "callsign" in out and "locator" in out:
Map.getSharedInstance().updateLocation(
out["callsign"], LocatorLocation(out["locator"]), out["mode"], self.band
)
PskReporter.getSharedInstance().spot(out)
self.handler.write_wsjt_message(out)
except ValueError:
logger.exception("error while parsing wsjt message")
self.handler.write_wsjt_message(out)
except ValueError:
logger.exception("error while parsing wsjt message")
def pushDecode(self, mode):
metrics = Metrics.getSharedInstance()