Merge branch 'develop' into packet

This commit is contained in:
Jakob Ketterl
2019-08-11 11:53:29 +02:00
47 changed files with 3106 additions and 1055 deletions

65
owrx/bands.py Normal file
View File

@ -0,0 +1,65 @@
import json
import logging
logger = logging.getLogger(__name__)
class Band(object):
def __init__(self, dict):
self.name = dict["name"]
self.lower_bound = dict["lower_bound"]
self.upper_bound = dict["upper_bound"]
self.frequencies = []
if "frequencies" in dict:
for (mode, freqs) in dict["frequencies"].items():
if not isinstance(freqs, list):
freqs = [freqs]
for f in freqs:
if not self.inBand(f):
logger.warning(
"Frequency for {mode} on {band} is not within band limits: {frequency}".format(
mode=mode, frequency=f, band=self.name
)
)
else:
self.frequencies.append({"mode": mode, "frequency": f})
def inBand(self, freq):
return self.lower_bound <= freq <= self.upper_bound
def getName(self):
return self.name
def getDialFrequencies(self, range):
(low, hi) = range
return [e for e in self.frequencies if low <= e["frequency"] <= hi]
class Bandplan(object):
sharedInstance = None
@staticmethod
def getSharedInstance():
if Bandplan.sharedInstance is None:
Bandplan.sharedInstance = Bandplan()
return Bandplan.sharedInstance
def __init__(self):
f = open("bands.json", "r")
bands_json = json.load(f)
f.close()
self.bands = [Band(d) for d in bands_json]
def findBands(self, freq):
return [band for band in self.bands if band.inBand(freq)]
def findBand(self, freq):
bands = self.findBands(freq)
if bands:
return bands[0]
else:
return None
def collectDialFrequencies(self, range):
return [e for b in self.bands for e in b.getDialFrequencies(range)]

View File

@ -1,4 +1,5 @@
import logging
logger = logging.getLogger(__name__)
@ -15,7 +16,7 @@ class Subscription(object):
class Property(object):
def __init__(self, value = None):
def __init__(self, value=None):
self.value = value
self.subscribers = []
@ -23,7 +24,7 @@ class Property(object):
return self.value
def setValue(self, value):
if (self.value == value):
if self.value == value:
return self
self.value = value
for c in self.subscribers:
@ -36,7 +37,8 @@ class Property(object):
def wire(self, callback):
sub = Subscription(self, callback)
self.subscribers.append(sub)
if not self.value is None: sub.call(self.value)
if not self.value is None:
sub.call(self.value)
return sub
def unwire(self, sub):
@ -47,8 +49,10 @@ class Property(object):
pass
return self
class PropertyManager(object):
sharedInstance = None
@staticmethod
def getSharedInstance():
if PropertyManager.sharedInstance is None:
@ -56,9 +60,11 @@ class PropertyManager(object):
return PropertyManager.sharedInstance
def collect(self, *props):
return PropertyManager({name: self.getProperty(name) if self.hasProperty(name) else Property() for name in props})
return PropertyManager(
{name: self.getProperty(name) if self.hasProperty(name) else Property() for name in props}
)
def __init__(self, properties = None):
def __init__(self, properties=None):
self.properties = {}
self.subscribers = []
if properties is not None:
@ -67,12 +73,14 @@ class PropertyManager(object):
def add(self, name, prop):
self.properties[name] = prop
def fireCallbacks(value):
for c in self.subscribers:
try:
c.call(name, value)
except Exception as e:
logger.exception(e)
prop.wire(fireCallbacks)
return self
@ -88,7 +96,7 @@ class PropertyManager(object):
self.getProperty(name).setValue(value)
def __dict__(self):
return {k:v.getValue() for k, v in self.properties.items()}
return {k: v.getValue() for k, v in self.properties.items()}
def hasProperty(self, name):
return name in self.properties

View File

@ -1,20 +1,59 @@
from owrx.config import PropertyManager
from owrx.source import DspManager, CpuUsageThread, SdrService, ClientRegistry
from owrx.feature import FeatureDetector
from owrx.version import openwebrx_version
from owrx.bands import Bandplan
import json
from owrx.map import Map
import logging
logger = logging.getLogger(__name__)
class OpenWebRxClient(object):
config_keys = ["waterfall_colors", "waterfall_min_level", "waterfall_max_level",
"waterfall_auto_level_margin", "lfo_offset", "samp_rate", "fft_size", "fft_fps",
"audio_compression", "fft_compression", "max_clients", "start_mod",
"client_audio_buffer_size", "start_freq", "center_freq", "mathbox_waterfall_colors",
"mathbox_waterfall_history_length", "mathbox_waterfall_frequency_resolution"]
class Client(object):
def __init__(self, conn):
self.conn = conn
def protected_send(self, data):
try:
self.conn.send(data)
# these exception happen when the socket is closed
except OSError:
self.close()
except ValueError:
self.close()
def close(self):
self.conn.close()
logger.debug("connection closed")
class OpenWebRxReceiverClient(Client):
config_keys = [
"waterfall_colors",
"waterfall_min_level",
"waterfall_max_level",
"waterfall_auto_level_margin",
"lfo_offset",
"samp_rate",
"fft_size",
"fft_fps",
"audio_compression",
"fft_compression",
"max_clients",
"start_mod",
"client_audio_buffer_size",
"start_freq",
"center_freq",
"mathbox_waterfall_colors",
"mathbox_waterfall_history_length",
"mathbox_waterfall_frequency_resolution",
]
def __init__(self, conn):
super().__init__(conn)
self.dsp = None
self.sdr = None
self.configSub = None
@ -26,12 +65,23 @@ class OpenWebRxClient(object):
self.setSdr()
# send receiver info
receiver_keys = ["receiver_name", "receiver_location", "receiver_qra", "receiver_asl", "receiver_gps",
"photo_title", "photo_desc"]
receiver_keys = [
"receiver_name",
"receiver_location",
"receiver_qra",
"receiver_asl",
"receiver_gps",
"photo_title",
"photo_desc",
]
receiver_details = dict((key, pm.getPropertyValue(key)) for key in receiver_keys)
self.write_receiver_details(receiver_details)
profiles = [{"name": s.getName() + " " + p["name"], "id":sid + "|" + pid} for (sid, s) in SdrService.getSources().items() for (pid, p) in s.getProfiles().items()]
profiles = [
{"name": s.getName() + " " + p["name"], "id": sid + "|" + pid}
for (sid, s) in SdrService.getSources().items()
for (pid, p) in s.getProfiles().items()
]
self.write_profiles(profiles)
features = FeatureDetector().feature_availability()
@ -39,9 +89,9 @@ class OpenWebRxClient(object):
CpuUsageThread.getSharedInstance().add_client(self)
def setSdr(self, id = None):
def setSdr(self, id=None):
next = SdrService.getSource(id)
if (next == self.sdr):
if next == self.sdr:
return
self.stopDsp()
@ -53,14 +103,23 @@ class OpenWebRxClient(object):
self.sdr = next
# send initial config
configProps = self.sdr.getProps().collect(*OpenWebRxClient.config_keys).defaults(PropertyManager.getSharedInstance())
configProps = (
self.sdr.getProps()
.collect(*OpenWebRxReceiverClient.config_keys)
.defaults(PropertyManager.getSharedInstance())
)
def sendConfig(key, value):
config = dict((key, configProps[key]) for key in OpenWebRxClient.config_keys)
config = dict((key, configProps[key]) for key in OpenWebRxReceiverClient.config_keys)
# TODO mathematical properties? hmmmm
config["start_offset_freq"] = configProps["start_freq"] - configProps["center_freq"]
self.write_config(config)
cf = configProps["center_freq"]
srh = configProps["samp_rate"] / 2
frequencyRange = (cf - srh, cf + srh)
self.write_dial_frequendies(Bandplan.getSharedInstance().collectDialFrequencies(frequencyRange))
self.configSub = configProps.wire(sendConfig)
sendConfig(None, None)
@ -78,8 +137,7 @@ class OpenWebRxClient(object):
if self.configSub is not None:
self.configSub.cancel()
self.configSub = None
self.conn.close()
logger.debug("connection closed")
super().close()
def stopDsp(self):
if self.dsp is not None:
@ -90,8 +148,11 @@ class OpenWebRxClient(object):
def setParams(self, params):
# only the keys in the protected property manager can be overridden from the web
protected = self.sdr.getProps().collect("samp_rate", "center_freq", "rf_gain", "type", "if_gain") \
protected = (
self.sdr.getProps()
.collect("samp_rate", "center_freq", "rf_gain", "type", "if_gain")
.defaults(PropertyManager.getSharedInstance())
)
for key, value in params.items():
protected[key] = value
@ -99,41 +160,71 @@ class OpenWebRxClient(object):
for key, value in params.items():
self.dsp.setProperty(key, value)
def protected_send(self, data):
try:
self.conn.send(data)
# these exception happen when the socket is closed
except OSError:
self.close()
except ValueError:
self.close()
def write_spectrum_data(self, data):
self.protected_send(bytes([0x01]) + data)
def write_dsp_data(self, data):
self.protected_send(bytes([0x02]) + data)
def write_s_meter_level(self, level):
self.protected_send({"type":"smeter","value":level})
self.protected_send({"type": "smeter", "value": level})
def write_cpu_usage(self, usage):
self.protected_send({"type":"cpuusage","value":usage})
self.protected_send({"type": "cpuusage", "value": usage})
def write_clients(self, clients):
self.protected_send({"type":"clients","value":clients})
self.protected_send({"type": "clients", "value": clients})
def write_secondary_fft(self, data):
self.protected_send(bytes([0x03]) + data)
def write_secondary_demod(self, data):
self.protected_send(bytes([0x04]) + data)
def write_secondary_dsp_config(self, cfg):
self.protected_send({"type":"secondary_config", "value":cfg})
self.protected_send({"type": "secondary_config", "value": cfg})
def write_config(self, cfg):
self.protected_send({"type":"config","value":cfg})
self.protected_send({"type": "config", "value": cfg})
def write_receiver_details(self, details):
self.protected_send({"type":"receiver_details","value":details})
self.protected_send({"type": "receiver_details", "value": details})
def write_profiles(self, profiles):
self.protected_send({"type":"profiles","value":profiles})
self.protected_send({"type": "profiles", "value": profiles})
def write_features(self, features):
self.protected_send({"type":"features","value":features})
self.protected_send({"type": "features", "value": features})
def write_metadata(self, metadata):
self.protected_send({"type":"metadata","value":metadata})
self.protected_send({"type": "metadata", "value": metadata})
def write_wsjt_message(self, message):
self.protected_send({"type": "wsjt_message", "value": message})
def write_dial_frequendies(self, frequencies):
self.protected_send({"type": "dial_frequencies", "value": frequencies})
class MapConnection(Client):
def __init__(self, conn):
super().__init__(conn)
pm = PropertyManager.getSharedInstance()
self.write_config(pm.collect("google_maps_api_key", "receiver_gps", "map_position_retention_time").__dict__())
Map.getSharedInstance().addClient(self)
def close(self):
Map.getSharedInstance().removeClient(self)
super().close()
def write_config(self, cfg):
self.protected_send({"type": "config", "value": cfg})
def write_update(self, update):
self.protected_send({"type": "update", "value": update})
class WebSocketMessageHandler(object):
def __init__(self):
@ -142,12 +233,21 @@ class WebSocketMessageHandler(object):
self.dsp = None
def handleTextMessage(self, conn, message):
if (message[:16] == "SERVER DE CLIENT"):
# maybe put some more info in there? nothing to store yet.
self.handshake = "completed"
if message[:16] == "SERVER DE CLIENT":
meta = message[17:].split(" ")
self.handshake = {v[0]: "=".join(v[1:]) for v in map(lambda x: x.split("="), meta)}
conn.send("CLIENT DE SERVER server=openwebrx version={version}".format(version=openwebrx_version))
logger.debug("client connection intitialized")
self.client = OpenWebRxClient(conn)
if "type" in self.handshake:
if self.handshake["type"] == "receiver":
self.client = OpenWebRxReceiverClient(conn)
if self.handshake["type"] == "map":
self.client = MapConnection(conn)
# backwards compatibility
else:
self.client = OpenWebRxReceiverClient(conn)
return

View File

@ -1,20 +1,27 @@
import os
import mimetypes
import json
from datetime import datetime
from string import Template
from owrx.websocket import WebSocketConnection
from owrx.config import PropertyManager
from owrx.source import ClientRegistry
from owrx.connection import WebSocketMessageHandler
from owrx.version import openwebrx_version
from owrx.feature import FeatureDetector
from owrx.metrics import Metrics
import logging
logger = logging.getLogger(__name__)
class Controller(object):
def __init__(self, handler, matches):
def __init__(self, handler, request):
self.handler = handler
self.matches = matches
def send_response(self, content, code = 200, content_type = "text/html", last_modified: datetime = None, max_age = None):
self.request = request
def send_response(self, content, code=200, content_type="text/html", last_modified: datetime = None, max_age=None):
self.handler.send_response(code)
if content_type is not None:
self.handler.send_header("Content-Type", content_type)
@ -23,15 +30,10 @@ class Controller(object):
if max_age is not None:
self.handler.send_header("Cache-Control", "max-age: {0}".format(max_age))
self.handler.end_headers()
if (type(content) == str):
if type(content) == str:
content = content.encode()
self.handler.wfile.write(content)
def render_template(self, template, **variables):
f = open('htdocs/' + template)
data = f.read()
f.close()
self.send_response(data)
class StatusController(Controller):
def handle_request(self):
@ -47,41 +49,90 @@ class StatusController(Controller):
"asl": pm["receiver_asl"],
"loc": pm["receiver_location"],
"sw_version": openwebrx_version,
"avatar_ctime": os.path.getctime("htdocs/gfx/openwebrx-avatar.png")
"avatar_ctime": os.path.getctime("htdocs/gfx/openwebrx-avatar.png"),
}
self.send_response("\n".join(["{key}={value}".format(key = key, value = value) for key, value in vars.items()]))
self.send_response("\n".join(["{key}={value}".format(key=key, value=value) for key, value in vars.items()]))
class AssetsController(Controller):
def serve_file(self, file, content_type = None):
def serve_file(self, file, content_type=None):
try:
modified = datetime.fromtimestamp(os.path.getmtime('htdocs/' + file))
modified = datetime.fromtimestamp(os.path.getmtime("htdocs/" + file))
if "If-Modified-Since" in self.handler.headers:
client_modified = datetime.strptime(self.handler.headers["If-Modified-Since"], "%a, %d %b %Y %H:%M:%S %Z")
client_modified = datetime.strptime(
self.handler.headers["If-Modified-Since"], "%a, %d %b %Y %H:%M:%S %Z"
)
if modified <= client_modified:
self.send_response("", code = 304)
self.send_response("", code=304)
return
f = open('htdocs/' + file, 'rb')
f = open("htdocs/" + file, "rb")
data = f.read()
f.close()
if content_type is None:
(content_type, encoding) = mimetypes.MimeTypes().guess_type(file)
self.send_response(data, content_type = content_type, last_modified = modified, max_age = 3600)
self.send_response(data, content_type=content_type, last_modified=modified, max_age=3600)
except FileNotFoundError:
self.send_response("file not found", code = 404)
self.send_response("file not found", code=404)
def handle_request(self):
filename = self.matches.group(1)
filename = self.request.matches.group(1)
self.serve_file(filename)
class IndexController(AssetsController):
class TemplateController(Controller):
def render_template(self, file, **vars):
f = open("htdocs/" + file, "r")
template = Template(f.read())
f.close()
return template.safe_substitute(**vars)
def serve_template(self, file, **vars):
self.send_response(self.render_template(file, **vars), content_type="text/html")
def default_variables(self):
return {}
class WebpageController(TemplateController):
def template_variables(self):
header = self.render_template("include/header.include.html")
return {"header": header}
class IndexController(WebpageController):
def handle_request(self):
self.serve_file("index.html", content_type = "text/html")
self.serve_template("index.html", **self.template_variables())
class MapController(WebpageController):
def handle_request(self):
# TODO check if we have a google maps api key first?
self.serve_template("map.html", **self.template_variables())
class FeatureController(WebpageController):
def handle_request(self):
self.serve_template("features.html", **self.template_variables())
class ApiController(Controller):
def handle_request(self):
data = json.dumps(FeatureDetector().feature_report())
self.send_response(data, content_type="application/json")
class MetricsController(Controller):
def handle_request(self):
data = json.dumps(Metrics.getSharedInstance().getMetrics())
self.send_response(data, content_type="application/json")
class WebSocketController(Controller):
def handle_request(self):
conn = WebSocketConnection(self.handler, WebSocketMessageHandler())
conn.send("CLIENT DE SERVER openwebrx.py")
# enter read loop
conn.read_loop()

View File

@ -4,29 +4,52 @@ from functools import reduce
from operator import and_
import re
from distutils.version import LooseVersion
import inspect
import logging
logger = logging.getLogger(__name__)
class UnknownFeatureException(Exception):
pass
class FeatureDetector(object):
features = {
"core": [ "csdr", "nmux", "nc" ],
"rtl_sdr": [ "rtl_sdr" ],
"sdrplay": [ "rx_tools" ],
"hackrf": [ "hackrf_transfer" ],
"airspy": [ "airspy_rx" ],
"digital_voice_digiham": [ "digiham", "sox" ],
"digital_voice_dsd": [ "dsd", "sox", "digiham" ],
"packet": [ "direwolf" ]
"core": ["csdr", "nmux", "nc"],
"rtl_sdr": ["rtl_sdr"],
"sdrplay": ["rx_tools"],
"hackrf": ["hackrf_transfer"],
"airspy": ["airspy_rx"],
"digital_voice_digiham": ["digiham", "sox"],
"digital_voice_dsd": ["dsd", "sox", "digiham"],
"wsjt-x": ["wsjtx", "sox"],
"packet": [ "direwolf" ],
}
def feature_availability(self):
return {name: self.is_available(name) for name in FeatureDetector.features}
def feature_report(self):
def requirement_details(name):
available = self.has_requirement(name)
return {
"available": available,
# as of now, features are always enabled as soon as they are available. this may change in the future.
"enabled": available,
"description": self.get_requirement_description(name),
}
def feature_details(name):
return {
"description": "",
"available": self.is_available(name),
"requirements": {name: requirement_details(name) for name in self.get_requirements(name)},
}
return {name: feature_details(name) for name in FeatureDetector.features}
def is_available(self, feature):
return self.has_requirements(self.get_requirements(feature))
@ -34,50 +57,89 @@ class FeatureDetector(object):
try:
return FeatureDetector.features[feature]
except KeyError:
raise UnknownFeatureException("Feature \"{0}\" is not known.".format(feature))
raise UnknownFeatureException('Feature "{0}" is not known.'.format(feature))
def has_requirements(self, requirements):
passed = True
for requirement in requirements:
methodname = "has_" + requirement
if hasattr(self, methodname) and callable(getattr(self, methodname)):
passed = passed and getattr(self, methodname)()
else:
logger.error("detection of requirement {0} not implement. please fix in code!".format(requirement))
passed = passed and self.has_requirement(requirement)
return passed
def _get_requirement_method(self, requirement):
methodname = "has_" + requirement
if hasattr(self, methodname) and callable(getattr(self, methodname)):
return getattr(self, methodname)
return None
def has_requirement(self, requirement):
method = self._get_requirement_method(requirement)
if method is not None:
return method()
else:
logger.error("detection of requirement {0} not implement. please fix in code!".format(requirement))
return False
def get_requirement_description(self, requirement):
return inspect.getdoc(self._get_requirement_method(requirement))
def command_is_runnable(self, command):
return os.system("{0} 2>/dev/null >/dev/null".format(command)) != 32512
def has_csdr(self):
"""
OpenWebRX uses the demodulator and pipeline tools provided by the csdr project. Please check out [the project
page on github](https://github.com/simonyiszk/csdr) for further details and installation instructions.
"""
return self.command_is_runnable("csdr")
def has_nmux(self):
"""
Nmux is another tool provided by the csdr project. It is used for internal multiplexing of the IQ data streams.
If you're missing nmux even though you have csdr installed, please update your csdr version.
"""
return self.command_is_runnable("nmux --help")
def has_nc(self):
return self.command_is_runnable('nc --help')
"""
Nc is the client used to connect to the nmux multiplexer. It is provided by either the BSD netcat (recommended
for better performance) or GNU netcat packages. Please check your distribution package manager for options.
"""
return self.command_is_runnable("nc --help")
def has_rtl_sdr(self):
"""
The rtl-sdr command is required to read I/Q data from an RTL SDR USB-Stick. It is available in most
distribution package managers.
"""
return self.command_is_runnable("rtl_sdr --help")
def has_rx_tools(self):
"""
The rx_tools package can be used to interface with SDR devices compatible with SoapySDR. It is currently used
to connect to SDRPlay devices. Please check the following pages for more details:
* [rx_tools GitHub page](https://github.com/rxseger/rx_tools)
* [SoapySDR Project wiki](https://github.com/pothosware/SoapySDR/wiki)
* [SDRPlay homepage](https://www.sdrplay.com/)
"""
return self.command_is_runnable("rx_sdr --help")
"""
To use a HackRF, compile the HackRF host tools from its "stdout" branch:
git clone https://github.com/mossmann/hackrf/
cd hackrf
git fetch
git checkout origin/stdout
cd host
mkdir build
cd build
cmake .. -DINSTALL_UDEV_RULES=ON
make
sudo make install
"""
def has_hackrf_transfer(self):
"""
To use a HackRF, compile the HackRF host tools from its "stdout" branch:
```
git clone https://github.com/mossmann/hackrf/
cd hackrf
git fetch
git checkout origin/stdout
cd host
mkdir build
cd build
cmake .. -DINSTALL_UDEV_RULES=ON
make
sudo make install
```
"""
# TODO i don't have a hackrf, so somebody doublecheck this.
# TODO also check if it has the stdout feature
return self.command_is_runnable("hackrf_transfer --help")
@ -85,18 +147,19 @@ class FeatureDetector(object):
def command_exists(self, command):
return os.system("which {0}".format(command)) == 0
"""
To use DMR and YSF, the digiham package is required. You can find the package and installation instructions here:
https://github.com/jketterl/digiham
Please note: there is close interaction between digiham and openwebrx, so older versions will probably not work.
If you have an older verison of digiham installed, please update it along with openwebrx.
As of now, we require version 0.2 of digiham.
"""
def has_digiham(self):
"""
To use digital voice modes, the digiham package is required. You can find the package and installation
instructions [here](https://github.com/jketterl/digiham).
Please note: there is close interaction between digiham and openwebrx, so older versions will probably not work.
If you have an older verison of digiham installed, please update it along with openwebrx.
As of now, we require version 0.2 of digiham.
"""
required_version = LooseVersion("0.2")
digiham_version_regex = re.compile('^digiham version (.*)$')
digiham_version_regex = re.compile("^digiham version (.*)$")
def check_digiham_version(command):
try:
process = subprocess.Popen([command, "--version"], stdout=subprocess.PIPE)
@ -105,22 +168,52 @@ class FeatureDetector(object):
return version >= required_version
except FileNotFoundError:
return False
return reduce(and_,
map(
check_digiham_version,
["rrc_filter", "ysf_decoder", "dmr_decoder", "mbe_synthesizer", "gfsk_demodulator",
"digitalvoice_filter"]
),
True)
return reduce(
and_,
map(
check_digiham_version,
[
"rrc_filter",
"ysf_decoder",
"dmr_decoder",
"mbe_synthesizer",
"gfsk_demodulator",
"digitalvoice_filter",
],
),
True,
)
def has_dsd(self):
"""
The digital voice modes NXDN and D-Star can be decoded by the dsd project. Please note that you need the version
modified by F4EXB that provides stdin/stdout support. You can find it [here](https://github.com/f4exb/dsd).
"""
return self.command_is_runnable("dsd")
def has_sox(self):
"""
The sox audio library is used to convert between the typical 8 kHz audio sampling rate used by digital modes and
the audio sampling rate requested by the client.
It is available for most distributions through the respective package manager.
"""
return self.command_is_runnable("sox")
def has_direwolf(self):
return self.command_is_runnable("direwolf --help")
def has_airspy_rx(self):
"""
In order to use an Airspy Receiver, you need to install the airspy_rx receiver software.
"""
return self.command_is_runnable("airspy_rx --help 2> /dev/null")
def has_wsjtx(self):
"""
To decode FT8 and other digimodes, you need to install the WSJT-X software suite. Please check the
[WSJT-X homepage](https://physics.princeton.edu/pulsar/k1jt/wsjtx.html) for ready-made packages or instructions
on how to build from source.
"""
return reduce(and_, map(self.command_is_runnable, ["jt9", "wsprd"]), True)

View File

@ -1,17 +1,37 @@
from owrx.controllers import StatusController, IndexController, AssetsController, WebSocketController
from owrx.controllers import (
StatusController,
IndexController,
AssetsController,
WebSocketController,
MapController,
FeatureController,
ApiController,
MetricsController,
)
from http.server import BaseHTTPRequestHandler
import re
from urllib.parse import urlparse, parse_qs
import logging
logger = logging.getLogger(__name__)
class RequestHandler(BaseHTTPRequestHandler):
def __init__(self, request, client_address, server):
self.router = Router()
super().__init__(request, client_address, server)
def do_GET(self):
self.router.route(self)
class Request(object):
def __init__(self, query=None, matches=None):
self.query = query
self.matches = matches
class Router(object):
mappings = [
{"route": "/", "controller": IndexController},
@ -20,8 +40,13 @@ class Router(object):
{"route": "/ws/", "controller": WebSocketController},
{"regex": "(/favicon.ico)", "controller": AssetsController},
# backwards compatibility for the sdr.hu portal
{"regex": "/(gfx/openwebrx-avatar.png)", "controller": AssetsController}
{"regex": "/(gfx/openwebrx-avatar.png)", "controller": AssetsController},
{"route": "/map", "controller": MapController},
{"route": "/features", "controller": FeatureController},
{"route": "/api/features", "controller": ApiController},
{"route": "/metrics", "controller": MetricsController},
]
def find_controller(self, path):
for m in Router.mappings:
if "route" in m:
@ -32,11 +57,17 @@ class Router(object):
matches = regex.match(path)
if matches:
return (m["controller"], matches)
def route(self, handler):
res = self.find_controller(handler.path)
url = urlparse(handler.path)
res = self.find_controller(url.path)
if res is not None:
(controller, matches) = res
logger.debug("path: {0}, controller: {1}, matches: {2}".format(handler.path, controller, matches))
controller(handler, matches).handle_request()
query = parse_qs(url.query)
logger.debug(
"path: {0}, controller: {1}, query: {2}, matches: {3}".format(handler.path, controller, query, matches)
)
request = Request(query, matches)
controller(handler, request).handle_request()
else:
handler.send_error(404, "Not Found", "The page you requested could not be found.")

108
owrx/map.py Normal file
View File

@ -0,0 +1,108 @@
from datetime import datetime, timedelta
import threading, time
from owrx.config import PropertyManager
from owrx.bands import Band
import logging
logger = logging.getLogger(__name__)
class Location(object):
def __dict__(self):
return {}
class Map(object):
sharedInstance = None
@staticmethod
def getSharedInstance():
if Map.sharedInstance is None:
Map.sharedInstance = Map()
return Map.sharedInstance
def __init__(self):
self.clients = []
self.positions = {}
def removeLoop():
while True:
try:
self.removeOldPositions()
except Exception:
logger.exception("error while removing old map positions")
time.sleep(60)
threading.Thread(target=removeLoop, daemon=True).start()
super().__init__()
def broadcast(self, update):
for c in self.clients:
c.write_update(update)
def addClient(self, client):
self.clients.append(client)
client.write_update(
[
{
"callsign": callsign,
"location": record["location"].__dict__(),
"lastseen": record["updated"].timestamp() * 1000,
"mode": record["mode"],
"band": record["band"].getName() if record["band"] is not None else None,
}
for (callsign, record) in self.positions.items()
]
)
def removeClient(self, client):
try:
self.clients.remove(client)
except ValueError:
pass
def updateLocation(self, callsign, loc: Location, mode: str, band: Band = None):
ts = datetime.now()
self.positions[callsign] = {"location": loc, "updated": ts, "mode": mode, "band": band}
self.broadcast(
[
{
"callsign": callsign,
"location": loc.__dict__(),
"lastseen": ts.timestamp() * 1000,
"mode": mode,
"band": band.getName() if band is not None else None,
}
]
)
def removeLocation(self, callsign):
self.positions.pop(callsign, None)
# TODO broadcast removal to clients
def removeOldPositions(self):
pm = PropertyManager.getSharedInstance()
retention = timedelta(seconds=pm["map_position_retention_time"])
cutoff = datetime.now() - retention
to_be_removed = [callsign for (callsign, pos) in self.positions.items() if pos["updated"] < cutoff]
for callsign in to_be_removed:
self.removeLocation(callsign)
class LatLngLocation(Location):
def __init__(self, lat: float, lon: float):
self.lat = lat
self.lon = lon
def __dict__(self):
return {"type": "latlon", "lat": self.lat, "lon": self.lon}
class LocatorLocation(Location):
def __init__(self, locator: str):
self.locator = locator
def __dict__(self):
return {"type": "locator", "locator": self.locator}

View File

@ -4,36 +4,43 @@ import json
from datetime import datetime, timedelta
import logging
import threading
from owrx.map import Map, LatLngLocation
logger = logging.getLogger(__name__)
class DmrCache(object):
sharedInstance = None
@staticmethod
def getSharedInstance():
if DmrCache.sharedInstance is None:
DmrCache.sharedInstance = DmrCache()
return DmrCache.sharedInstance
def __init__(self):
self.cache = {}
self.cacheTimeout = timedelta(seconds = 86400)
self.cacheTimeout = timedelta(seconds=86400)
def isValid(self, key):
if not key in self.cache: return False
if not key in self.cache:
return False
entry = self.cache[key]
return entry["timestamp"] + self.cacheTimeout > datetime.now()
def put(self, key, value):
self.cache[key] = {
"timestamp": datetime.now(),
"data": value
}
self.cache[key] = {"timestamp": datetime.now(), "data": value}
def get(self, key):
if not self.isValid(key): return None
if not self.isValid(key):
return None
return self.cache[key]["data"]
class DmrMetaEnricher(object):
def __init__(self):
self.threads = {}
def downloadRadioIdData(self, id):
cache = DmrCache.getSharedInstance()
try:
@ -44,9 +51,12 @@ class DmrMetaEnricher(object):
except json.JSONDecodeError:
cache.put(id, None)
del self.threads[id]
def enrich(self, meta):
if not PropertyManager.getSharedInstance()["digital_voice_dmr_id_lookup"]: return None
if not "source" in meta: return None
if not PropertyManager.getSharedInstance()["digital_voice_dmr_id_lookup"]:
return None
if not "source" in meta:
return None
id = meta["source"]
cache = DmrCache.getSharedInstance()
if not cache.isValid(id):
@ -60,10 +70,17 @@ class DmrMetaEnricher(object):
return None
class YsfMetaEnricher(object):
def enrich(self, meta):
if "source" in meta and "lat" in meta and "lon" in meta:
# TODO parsing the float values should probably happen earlier
loc = LatLngLocation(float(meta["lat"]), float(meta["lon"]))
Map.getSharedInstance().updateLocation(meta["source"], loc, "YSF")
return None
class MetaParser(object):
enrichers = {
"DMR": DmrMetaEnricher()
}
enrichers = {"DMR": DmrMetaEnricher(), "YSF": YsfMetaEnricher()}
def __init__(self, handler):
self.handler = handler
@ -76,6 +93,6 @@ class MetaParser(object):
protocol = meta["protocol"]
if protocol in MetaParser.enrichers:
additional_data = MetaParser.enrichers[protocol].enrich(meta)
if additional_data is not None: meta["additional"] = additional_data
if additional_data is not None:
meta["additional"] = additional_data
self.handler.write_metadata(meta)

30
owrx/metrics.py Normal file
View File

@ -0,0 +1,30 @@
class Metrics(object):
sharedInstance = None
@staticmethod
def getSharedInstance():
if Metrics.sharedInstance is None:
Metrics.sharedInstance = Metrics()
return Metrics.sharedInstance
def __init__(self):
self.metrics = {}
def pushDecodes(self, band, mode, count=1):
if band is None:
band = "unknown"
else:
band = band.getName()
if mode is None:
mode = "unknown"
if not band in self.metrics:
self.metrics[band] = {}
if not mode in self.metrics[band]:
self.metrics[band][mode] = {"count": 0}
self.metrics[band][mode]["count"] += count
def getMetrics(self):
return self.metrics

View File

@ -4,23 +4,26 @@ import time
from owrx.config import PropertyManager
import logging
logger = logging.getLogger(__name__)
class SdrHuUpdater(threading.Thread):
def __init__(self):
self.doRun = True
super().__init__(daemon = True)
super().__init__(daemon=True)
def update(self):
pm = PropertyManager.getSharedInstance()
cmd = "wget --timeout=15 -4qO- https://sdr.hu/update --post-data \"url=http://{server_hostname}:{web_port}&apikey={sdrhu_key}\" 2>&1".format(**pm.__dict__())
cmd = 'wget --timeout=15 -4qO- https://sdr.hu/update --post-data "url=http://{server_hostname}:{web_port}&apikey={sdrhu_key}" 2>&1'.format(
**pm.__dict__()
)
logger.debug(cmd)
returned=subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE).communicate()
returned=returned[0].decode('utf-8')
returned = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE).communicate()
returned = returned[0].decode("utf-8")
if "UPDATE:" in returned:
retrytime_mins = 20
value=returned.split("UPDATE:")[1].split("\n",1)[0]
value = returned.split("UPDATE:")[1].split("\n", 1)[0]
if value.startswith("SUCCESS"):
logger.info("Update succeeded!")
else:
@ -33,4 +36,4 @@ class SdrHuUpdater(threading.Thread):
def run(self):
while self.doRun:
retrytime_mins = self.update()
time.sleep(60*retrytime_mins)
time.sleep(60 * retrytime_mins)

118
owrx/service.py Normal file
View File

@ -0,0 +1,118 @@
import threading
from owrx.source import SdrService
from owrx.bands import Bandplan
from csdr import dsp, output
from owrx.wsjt import WsjtParser
from owrx.config import PropertyManager
import logging
logger = logging.getLogger(__name__)
class ServiceOutput(output):
def __init__(self, frequency):
self.frequency = frequency
def receive_output(self, t, read_fn):
parser = WsjtParser(WsjtHandler())
parser.setDialFrequency(self.frequency)
target = self.pump(read_fn, parser.parse)
threading.Thread(target=target).start()
def supports_type(self, t):
return t == "wsjt_demod"
class ServiceHandler(object):
def __init__(self, source):
self.services = []
self.source = source
self.startupTimer = None
self.source.addClient(self)
self.source.getProps().collect("center_freq", "samp_rate").wire(self.onFrequencyChange)
self.scheduleServiceStartup()
def onSdrAvailable(self):
self.scheduleServiceStartup()
def onSdrUnavailable(self):
self.stopServices()
def isSupported(self, mode):
return mode in PropertyManager.getSharedInstance()["services_decoders"]
def stopServices(self):
for service in self.services:
service.stop()
self.services = []
def startServices(self):
for service in self.services:
service.start()
def onFrequencyChange(self, key, value):
self.stopServices()
if not self.source.isAvailable():
return
self.scheduleServiceStartup()
def scheduleServiceStartup(self):
if self.startupTimer:
self.startupTimer.cancel()
self.startupTimer = threading.Timer(10, self.updateServices)
self.startupTimer.start()
def updateServices(self):
logger.debug("re-scheduling services due to sdr changes")
self.stopServices()
cf = self.source.getProps()["center_freq"]
srh = self.source.getProps()["samp_rate"] / 2
frequency_range = (cf - srh, cf + srh)
self.services = [
self.setupService(dial["mode"], dial["frequency"])
for dial in Bandplan.getSharedInstance().collectDialFrequencies(frequency_range)
if self.isSupported(dial["mode"])
]
def setupService(self, mode, frequency):
logger.debug("setting up service {0} on frequency {1}".format(mode, frequency))
d = dsp(ServiceOutput(frequency))
d.nc_port = self.source.getPort()
d.set_offset_freq(frequency - self.source.getProps()["center_freq"])
d.set_demodulator("usb")
d.set_bpf(0, 3000)
d.set_secondary_demodulator(mode)
d.set_audio_compression("none")
d.set_samp_rate(self.source.getProps()["samp_rate"])
d.start()
return d
class WsjtHandler(object):
def write_wsjt_message(self, msg):
pass
class ServiceManager(object):
sharedInstance = None
@staticmethod
def getSharedInstance():
if ServiceManager.sharedInstance is None:
ServiceManager.sharedInstance = ServiceManager()
return ServiceManager.sharedInstance
def start(self):
if not PropertyManager.getSharedInstance()["services_enabled"]:
return
for source in SdrService.getSources().values():
ServiceHandler(source)
class Service(object):
pass
class WsjtService(Service):
pass

View File

@ -2,6 +2,7 @@ import subprocess
from owrx.config import PropertyManager
from owrx.feature import FeatureDetector, UnknownFeatureException
from owrx.meta import MetaParser
from owrx.wsjt import WsjtParser
import threading
import csdr
import time
@ -13,10 +14,12 @@ import logging
logger = logging.getLogger(__name__)
class SdrService(object):
sdrProps = None
sources = {}
lastPort = None
@staticmethod
def getNextPort():
pm = PropertyManager.getSharedInstance()
@ -28,50 +31,70 @@ class SdrService(object):
if SdrService.lastPort > end:
raise IndexError("no more available ports to start more sdrs")
return SdrService.lastPort
@staticmethod
def loadProps():
if SdrService.sdrProps is None:
pm = PropertyManager.getSharedInstance()
featureDetector = FeatureDetector()
def loadIntoPropertyManager(dict: dict):
propertyManager = PropertyManager()
for (name, value) in dict.items():
propertyManager[name] = value
return propertyManager
def sdrTypeAvailable(value):
try:
if not featureDetector.is_available(value["type"]):
logger.error("The RTL source type \"{0}\" is not available. please check requirements.".format(value["type"]))
logger.error(
'The RTL source type "{0}" is not available. please check requirements.'.format(
value["type"]
)
)
return False
return True
except UnknownFeatureException:
logger.error("The RTL source type \"{0}\" is invalid. Please check your configuration".format(value["type"]))
logger.error(
'The RTL source type "{0}" is invalid. Please check your configuration'.format(value["type"])
)
return False
# transform all dictionary items into PropertyManager object, filtering out unavailable ones
SdrService.sdrProps = {
name: loadIntoPropertyManager(value) for (name, value) in pm["sdrs"].items() if sdrTypeAvailable(value)
}
logger.info("SDR sources loaded. Availables SDRs: {0}".format(", ".join(map(lambda x: x["name"], SdrService.sdrProps.values()))))
logger.info(
"SDR sources loaded. Availables SDRs: {0}".format(
", ".join(map(lambda x: x["name"], SdrService.sdrProps.values()))
)
)
@staticmethod
def getSource(id = None):
def getSource(id=None):
SdrService.loadProps()
if id is None:
# TODO: configure default sdr in config? right now it will pick the first one off the list.
id = list(SdrService.sdrProps.keys())[0]
sources = SdrService.getSources()
return sources[id]
@staticmethod
def getSources():
SdrService.loadProps()
for id in SdrService.sdrProps.keys():
if not id in SdrService.sources:
props = SdrService.sdrProps[id]
className = ''.join(x for x in props["type"].title() if x.isalnum()) + "Source"
className = "".join(x for x in props["type"].title() if x.isalnum()) + "Source"
cls = getattr(sys.modules[__name__], className)
SdrService.sources[id] = cls(props, SdrService.getNextPort())
return SdrService.sources
class SdrSourceException(Exception):
pass
class SdrSource(object):
def __init__(self, props, port):
self.props = props
@ -84,6 +107,7 @@ class SdrSource(object):
logger.debug("restarting sdr source due to property change: {0} changed to {1}".format(name, value))
self.stop()
self.start()
self.rtlProps.wire(restart)
self.port = port
self.monitor = None
@ -101,15 +125,16 @@ class SdrSource(object):
def getFormatConversion(self):
return None
def activateProfile(self, id = None):
def activateProfile(self, profile_id=None):
profiles = self.props["profiles"]
if id is None:
id = list(profiles.keys())[0]
logger.debug("activating profile {0}".format(id))
profile = profiles[id]
if profile_id is None:
profile_id = list(profiles.keys())[0]
logger.debug("activating profile {0}".format(profile_id))
profile = profiles[profile_id]
for (key, value) in profile.items():
# skip the name, that would overwrite the source name.
if key == "name": continue
if key == "name":
continue
self.props[key] = value
def getProfiles(self):
@ -133,7 +158,9 @@ class SdrSource(object):
props = self.rtlProps
start_sdr_command = self.getCommand().format(
**props.collect("samp_rate", "center_freq", "ppm", "rf_gain", "lna_gain", "rf_amp", "antenna", "if_gain").__dict__()
**props.collect(
"samp_rate", "center_freq", "ppm", "rf_gain", "lna_gain", "rf_amp", "antenna", "if_gain"
).__dict__()
)
format_conversion = self.getFormatConversion()
@ -141,36 +168,54 @@ class SdrSource(object):
start_sdr_command += " | " + format_conversion
nmux_bufcnt = nmux_bufsize = 0
while nmux_bufsize < props["samp_rate"]/4: nmux_bufsize += 4096
while nmux_bufsize * nmux_bufcnt < props["nmux_memory"] * 1e6: nmux_bufcnt += 1
while nmux_bufsize < props["samp_rate"] / 4:
nmux_bufsize += 4096
while nmux_bufsize * nmux_bufcnt < props["nmux_memory"] * 1e6:
nmux_bufcnt += 1
if nmux_bufcnt == 0 or nmux_bufsize == 0:
logger.error("Error: nmux_bufsize or nmux_bufcnt is zero. These depend on nmux_memory and samp_rate options in config_webrx.py")
logger.error(
"Error: nmux_bufsize or nmux_bufcnt is zero. These depend on nmux_memory and samp_rate options in config_webrx.py"
)
self.modificationLock.release()
return
logger.debug("nmux_bufsize = %d, nmux_bufcnt = %d" % (nmux_bufsize, nmux_bufcnt))
cmd = start_sdr_command + " | nmux --bufsize %d --bufcnt %d --port %d --address 127.0.0.1" % (nmux_bufsize, nmux_bufcnt, self.port)
cmd = start_sdr_command + " | nmux --bufsize %d --bufcnt %d --port %d --address 127.0.0.1" % (
nmux_bufsize,
nmux_bufcnt,
self.port,
)
self.process = subprocess.Popen(cmd, shell=True, preexec_fn=os.setpgrp)
logger.info("Started rtl source: " + cmd)
available = False
def wait_for_process_to_end():
rc = self.process.wait()
logger.debug("shut down with RC={0}".format(rc))
self.monitor = None
self.monitor = threading.Thread(target = wait_for_process_to_end)
self.monitor = threading.Thread(target=wait_for_process_to_end)
self.monitor.start()
while True:
retries = 1000
while retries > 0:
retries -= 1
if self.monitor is None:
break
testsock = socket.socket()
try:
testsock.connect(("127.0.0.1", self.getPort()))
testsock.close()
available = True
break
except:
time.sleep(0.1)
self.modificationLock.release()
if not available:
raise SdrSourceException("rtl source failed to start up")
for c in self.clients:
c.onSdrAvailable()
@ -200,6 +245,7 @@ class SdrSource(object):
def addClient(self, c):
self.clients.append(c)
self.start()
def removeClient(self, c):
try:
self.clients.remove(c)
@ -235,6 +281,7 @@ class RtlSdrSource(SdrSource):
def getFormatConversion(self):
return "csdr convert_u8_f"
class HackrfSource(SdrSource):
def getCommand(self):
return "hackrf_transfer -s {samp_rate} -f {center_freq} -g {rf_gain} -l{lna_gain} -a{rf_amp} -r-"
@ -242,39 +289,54 @@ class HackrfSource(SdrSource):
def getFormatConversion(self):
return "csdr convert_s8_f"
class SdrplaySource(SdrSource):
def getCommand(self):
command = "rx_sdr -F CF32 -s {samp_rate} -f {center_freq} -p {ppm}"
gainMap = { "rf_gain" : "RFGR", "if_gain" : "IFGR"}
gains = [ "{0}={{{1}}}".format(gainMap[name], name) for (name, value) in self.rtlProps.collect("rf_gain", "if_gain").__dict__().items() if value is not None ]
gainMap = {"rf_gain": "RFGR", "if_gain": "IFGR"}
gains = [
"{0}={{{1}}}".format(gainMap[name], name)
for (name, value) in self.rtlProps.collect("rf_gain", "if_gain").__dict__().items()
if value is not None
]
if gains:
command += " -g {gains}".format(gains = ",".join(gains))
command += " -g {gains}".format(gains=",".join(gains))
if self.rtlProps["antenna"] is not None:
command += " -a \"{antenna}\""
command += ' -a "{antenna}"'
command += " -"
return command
def sleepOnRestart(self):
time.sleep(1)
class AirspySource(SdrSource):
def getCommand(self):
frequency = self.props['center_freq'] / 1e6
frequency = self.props["center_freq"] / 1e6
command = "airspy_rx"
command += " -f{0}".format(frequency)
command += " -r /dev/stdout -a{samp_rate} -g {rf_gain}"
return command
def getFormatConversion(self):
return "csdr convert_s16_f"
class SpectrumThread(csdr.output):
def __init__(self, sdrSource):
self.sdrSource = sdrSource
super().__init__()
self.props = props = self.sdrSource.props.collect(
"samp_rate", "fft_size", "fft_fps", "fft_voverlap_factor", "fft_compression",
"csdr_dynamic_bufsize", "csdr_print_bufsizes", "csdr_through"
"samp_rate",
"fft_size",
"fft_fps",
"fft_voverlap_factor",
"fft_compression",
"csdr_dynamic_bufsize",
"csdr_print_bufsizes",
"csdr_through",
"temporary_directory",
).defaults(PropertyManager.getSharedInstance())
self.dsp = dsp = csdr.dsp(self)
@ -287,14 +349,19 @@ class SpectrumThread(csdr.output):
fft_fps = props["fft_fps"]
fft_voverlap_factor = props["fft_voverlap_factor"]
dsp.set_fft_averages(int(round(1.0 * samp_rate / fft_size / fft_fps / (1.0 - fft_voverlap_factor))) if fft_voverlap_factor>0 else 0)
dsp.set_fft_averages(
int(round(1.0 * samp_rate / fft_size / fft_fps / (1.0 - fft_voverlap_factor)))
if fft_voverlap_factor > 0
else 0
)
self.subscriptions = [
props.getProperty("samp_rate").wire(dsp.set_samp_rate),
props.getProperty("fft_size").wire(dsp.set_fft_size),
props.getProperty("fft_fps").wire(dsp.set_fft_fps),
props.getProperty("fft_compression").wire(dsp.set_fft_compression),
props.collect("samp_rate", "fft_size", "fft_fps", "fft_voverlap_factor").wire(set_fft_averages)
props.getProperty("temporary_directory").wire(dsp.set_temporary_directory),
props.collect("samp_rate", "fft_size", "fft_fps", "fft_voverlap_factor").wire(set_fft_averages),
]
set_fft_averages(None, None)
@ -309,25 +376,15 @@ class SpectrumThread(csdr.output):
if self.sdrSource.isAvailable():
self.dsp.start()
def add_output(self, type, read_fn):
if type != "audio":
logger.error("unsupported output type received by FFT: %s", type)
return
def supports_type(self, t):
return t == "audio"
def receive_output(self, type, read_fn):
if self.props["csdr_dynamic_bufsize"]:
read_fn(8) #dummy read to skip bufsize & preamble
read_fn(8) # dummy read to skip bufsize & preamble
logger.debug("Note: CSDR_DYNAMIC_BUFSIZE_ON = 1")
def pipe():
run = True
while run:
data = read_fn()
if len(data) == 0:
run = False
else:
self.sdrSource.writeSpectrumData(data)
threading.Thread(target = pipe).start()
threading.Thread(target=self.pump(read_fn, self.sdrSource.writeSpectrumData)).start()
def stop(self):
self.dsp.stop()
@ -338,20 +395,36 @@ class SpectrumThread(csdr.output):
def onSdrAvailable(self):
self.dsp.start()
def onSdrUnavailable(self):
self.dsp.stop()
class DspManager(csdr.output):
def __init__(self, handler, sdrSource):
self.handler = handler
self.sdrSource = sdrSource
self.metaParser = MetaParser(self.handler)
self.wsjtParser = WsjtParser(self.handler)
self.localProps = self.sdrSource.getProps().collect(
"audio_compression", "fft_compression", "digimodes_fft_size", "csdr_dynamic_bufsize",
"csdr_print_bufsizes", "csdr_through", "digimodes_enable", "samp_rate", "digital_voice_unvoiced_quality",
"dmr_filter"
).defaults(PropertyManager.getSharedInstance())
self.localProps = (
self.sdrSource.getProps()
.collect(
"audio_compression",
"fft_compression",
"digimodes_fft_size",
"csdr_dynamic_bufsize",
"csdr_print_bufsizes",
"csdr_through",
"digimodes_enable",
"samp_rate",
"digital_voice_unvoiced_quality",
"dmr_filter",
"temporary_directory",
"center_freq",
)
.defaults(PropertyManager.getSharedInstance())
)
self.dsp = csdr.dsp(self)
self.dsp.nc_port = self.sdrSource.getPort()
@ -366,6 +439,9 @@ class DspManager(csdr.output):
bpf[1] = cut
self.dsp.set_bpf(*bpf)
def set_dial_freq(key, value):
self.wsjtParser.setDialFrequency(self.localProps["center_freq"] + self.localProps["offset_freq"])
self.subscriptions = [
self.localProps.getProperty("audio_compression").wire(self.dsp.set_audio_compression),
self.localProps.getProperty("fft_compression").wire(self.dsp.set_fft_compression),
@ -378,28 +454,35 @@ class DspManager(csdr.output):
self.localProps.getProperty("high_cut").wire(set_high_cut),
self.localProps.getProperty("mod").wire(self.dsp.set_demodulator),
self.localProps.getProperty("digital_voice_unvoiced_quality").wire(self.dsp.set_unvoiced_quality),
self.localProps.getProperty("dmr_filter").wire(self.dsp.set_dmr_filter)
self.localProps.getProperty("dmr_filter").wire(self.dsp.set_dmr_filter),
self.localProps.getProperty("temporary_directory").wire(self.dsp.set_temporary_directory),
self.localProps.collect("center_freq", "offset_freq").wire(set_dial_freq),
]
self.dsp.set_offset_freq(0)
self.dsp.set_bpf(-4000,4000)
self.dsp.set_bpf(-4000, 4000)
self.dsp.csdr_dynamic_bufsize = self.localProps["csdr_dynamic_bufsize"]
self.dsp.csdr_print_bufsizes = self.localProps["csdr_print_bufsizes"]
self.dsp.csdr_through = self.localProps["csdr_through"]
if (self.localProps["digimodes_enable"]):
if self.localProps["digimodes_enable"]:
def set_secondary_mod(mod):
if mod == False: mod = None
if mod == False:
mod = None
self.dsp.set_secondary_demodulator(mod)
if mod is not None:
self.handler.write_secondary_dsp_config({
"secondary_fft_size":self.localProps["digimodes_fft_size"],
"if_samp_rate":self.dsp.if_samp_rate(),
"secondary_bw":self.dsp.secondary_bw()
})
self.handler.write_secondary_dsp_config(
{
"secondary_fft_size": self.localProps["digimodes_fft_size"],
"if_samp_rate": self.dsp.if_samp_rate(),
"secondary_bw": self.dsp.secondary_bw(),
}
)
self.subscriptions += [
self.localProps.getProperty("secondary_mod").wire(set_secondary_mod),
self.localProps.getProperty("secondary_offset_freq").wire(self.dsp.set_secondary_offset_freq)
self.localProps.getProperty("secondary_offset_freq").wire(self.dsp.set_secondary_offset_freq),
]
self.sdrSource.addClient(self)
@ -410,30 +493,19 @@ class DspManager(csdr.output):
if self.sdrSource.isAvailable():
self.dsp.start()
def add_output(self, t, read_fn):
def receive_output(self, t, read_fn):
logger.debug("adding new output of type %s", t)
writers = {
"audio": self.handler.write_dsp_data,
"smeter": self.handler.write_s_meter_level,
"secondary_fft": self.handler.write_secondary_fft,
"secondary_demod": self.handler.write_secondary_demod,
"meta": self.metaParser.parse
"meta": self.metaParser.parse,
"wsjt_demod": self.wsjtParser.parse,
}
write = writers[t]
def pump(read, write):
def copy():
run = True
while run:
data = read()
if data is None or (isinstance(data, bytes) and len(data) == 0):
logger.warning("zero read on {0}".format(t))
run = False
else:
write(data)
return copy
threading.Thread(target=pump(read_fn, write)).start()
threading.Thread(target=self.pump(read_fn, write)).start()
def stop(self):
self.dsp.stop()
@ -453,8 +525,10 @@ class DspManager(csdr.output):
logger.debug("received onSdrUnavailable, shutting down DspSource")
self.dsp.stop()
class CpuUsageThread(threading.Thread):
sharedInstance = None
@staticmethod
def getSharedInstance():
if CpuUsageThread.sharedInstance is None:
@ -482,21 +556,23 @@ class CpuUsageThread(threading.Thread):
def get_cpu_usage(self):
try:
f = open("/proc/stat","r")
f = open("/proc/stat", "r")
except:
return 0 #Workaround, possibly we're on a Mac
return 0 # Workaround, possibly we're on a Mac
line = ""
while not "cpu " in line: line=f.readline()
while not "cpu " in line:
line = f.readline()
f.close()
spl = line.split(" ")
worktime = int(spl[2]) + int(spl[3]) + int(spl[4])
idletime = int(spl[5])
dworktime = (worktime - self.last_worktime)
didletime = (idletime - self.last_idletime)
rate = float(dworktime) / (didletime+dworktime)
dworktime = worktime - self.last_worktime
didletime = idletime - self.last_idletime
rate = float(dworktime) / (didletime + dworktime)
self.last_worktime = worktime
self.last_idletime = idletime
if (self.last_worktime==0): return 0
if self.last_worktime == 0:
return 0
return rate
def add_client(self, c):
@ -514,11 +590,14 @@ class CpuUsageThread(threading.Thread):
CpuUsageThread.sharedInstance = None
self.doRun = False
class TooManyClientsException(Exception):
pass
class ClientRegistry(object):
sharedInstance = None
@staticmethod
def getSharedInstance():
if ClientRegistry.sharedInstance is None:
@ -549,4 +628,4 @@ class ClientRegistry(object):
self.clients.remove(client)
except ValueError:
pass
self.broadcast()
self.broadcast()

View File

@ -1 +1 @@
openwebrx_version = "v0.18"
openwebrx_version = "v0.18"

View File

@ -3,48 +3,76 @@ import hashlib
import json
import logging
logger = logging.getLogger(__name__)
class WebSocketConnection(object):
def __init__(self, handler, messageHandler):
self.handler = handler
self.messageHandler = messageHandler
my_headers = self.handler.headers.items()
my_header_keys = list(map(lambda x:x[0],my_headers))
h_key_exists = lambda x:my_header_keys.count(x)
h_value = lambda x:my_headers[my_header_keys.index(x)][1]
if (not h_key_exists("Upgrade")) or not (h_value("Upgrade")=="websocket") or (not h_key_exists("Sec-WebSocket-Key")):
my_header_keys = list(map(lambda x: x[0], my_headers))
h_key_exists = lambda x: my_header_keys.count(x)
h_value = lambda x: my_headers[my_header_keys.index(x)][1]
if (
(not h_key_exists("Upgrade"))
or not (h_value("Upgrade") == "websocket")
or (not h_key_exists("Sec-WebSocket-Key"))
):
raise WebSocketException
ws_key = h_value("Sec-WebSocket-Key")
shakey = hashlib.sha1()
shakey.update("{ws_key}258EAFA5-E914-47DA-95CA-C5AB0DC85B11".format(ws_key = ws_key).encode())
shakey.update("{ws_key}258EAFA5-E914-47DA-95CA-C5AB0DC85B11".format(ws_key=ws_key).encode())
ws_key_toreturn = base64.b64encode(shakey.digest())
self.handler.wfile.write("HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: {0}\r\nCQ-CQ-de: HA5KFU\r\n\r\n".format(ws_key_toreturn.decode()).encode())
self.handler.wfile.write(
"HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: {0}\r\nCQ-CQ-de: HA5KFU\r\n\r\n".format(
ws_key_toreturn.decode()
).encode()
)
def get_header(self, size, opcode):
ws_first_byte = 0b10000000 | (opcode & 0x0F)
if (size > 125):
return bytes([ws_first_byte, 126, (size>>8) & 0xff, size & 0xff])
if size > 2 ** 16 - 1:
# frame size can be increased up to 2^64 by setting the size to 127
# anything beyond that would need to be segmented into frames. i don't really think we'll need more.
return bytes(
[
ws_first_byte,
127,
(size >> 56) & 0xFF,
(size >> 48) & 0xFF,
(size >> 40) & 0xFF,
(size >> 32) & 0xFF,
(size >> 24) & 0xFF,
(size >> 16) & 0xFF,
(size >> 8) & 0xFF,
size & 0xFF,
]
)
elif size > 125:
# up to 2^16 can be sent using the extended payload size field by putting the size to 126
return bytes([ws_first_byte, 126, (size >> 8) & 0xFF, size & 0xFF])
else:
# 256 bytes binary message in a single unmasked frame
# 125 bytes binary message in a single unmasked frame
return bytes([ws_first_byte, size])
def send(self, data):
# convenience
if (type(data) == dict):
if type(data) == dict:
# allow_nan = False disallows NaN and Infinty to be encoded. Browser JSON will not parse them anyway.
data = json.dumps(data, allow_nan = False)
data = json.dumps(data, allow_nan=False)
# string-type messages are sent as text frames
if (type(data) == str):
if type(data) == str:
header = self.get_header(len(data), 1)
data_to_send = header + data.encode('utf-8')
data_to_send = header + data.encode("utf-8")
# anything else as binary
else:
header = self.get_header(len(data), 2)
data_to_send = header + data
written = self.handler.wfile.write(data_to_send)
if (written != len(data_to_send)):
if written != len(data_to_send):
logger.error("incomplete write! closing socket!")
self.close()
else:
@ -52,25 +80,25 @@ class WebSocketConnection(object):
def read_loop(self):
open = True
while (open):
while open:
header = self.handler.rfile.read(2)
opcode = header[0] & 0x0F
length = header[1] & 0x7F
mask = (header[1] & 0x80) >> 7
if (length == 126):
if length == 126:
header = self.handler.rfile.read(2)
length = (header[0] << 8) + header[1]
if (mask):
if mask:
masking_key = self.handler.rfile.read(4)
data = self.handler.rfile.read(length)
if (mask):
if mask:
data = bytes([b ^ masking_key[index % 4] for (index, b) in enumerate(data)])
if (opcode == 1):
message = data.decode('utf-8')
if opcode == 1:
message = data.decode("utf-8")
self.messageHandler.handleTextMessage(self, message)
elif (opcode == 2):
elif opcode == 2:
self.messageHandler.handleBinaryMessage(self, data)
elif (opcode == 8):
elif opcode == 8:
open = False
self.messageHandler.handleClose(self)
else:

277
owrx/wsjt.py Normal file
View File

@ -0,0 +1,277 @@
import threading
import wave
from datetime import datetime, timedelta, date, timezone
import time
import sched
import subprocess
import os
from multiprocessing.connection import Pipe
from owrx.map import Map, LocatorLocation
import re
from owrx.config import PropertyManager
from owrx.bands import Bandplan
from owrx.metrics import Metrics
import logging
logger = logging.getLogger(__name__)
class WsjtChopper(threading.Thread):
def __init__(self, source):
self.source = source
self.tmp_dir = PropertyManager.getSharedInstance()["temporary_directory"]
(self.wavefilename, self.wavefile) = self.getWaveFile()
self.switchingLock = threading.Lock()
self.scheduler = sched.scheduler(time.time, time.sleep)
self.fileQueue = []
(self.outputReader, self.outputWriter) = Pipe()
self.doRun = True
super().__init__()
def getWaveFile(self):
filename = "{tmp_dir}/openwebrx-wsjtchopper-{id}-{timestamp}.wav".format(
tmp_dir=self.tmp_dir, id=id(self), timestamp=datetime.utcnow().strftime(self.fileTimestampFormat)
)
wavefile = wave.open(filename, "wb")
wavefile.setnchannels(1)
wavefile.setsampwidth(2)
wavefile.setframerate(12000)
return (filename, wavefile)
def getNextDecodingTime(self):
t = datetime.now()
zeroed = t.replace(minute=0, second=0, microsecond=0)
delta = t - zeroed
seconds = (int(delta.total_seconds() / self.interval) + 1) * self.interval
t = zeroed + timedelta(seconds=seconds)
logger.debug("scheduling: {0}".format(t))
return t.timestamp()
def startScheduler(self):
self._scheduleNextSwitch()
threading.Thread(target=self.scheduler.run).start()
def emptyScheduler(self):
for event in self.scheduler.queue:
self.scheduler.cancel(event)
def _scheduleNextSwitch(self):
self.scheduler.enterabs(self.getNextDecodingTime(), 1, self.switchFiles)
def switchFiles(self):
self.switchingLock.acquire()
file = self.wavefile
filename = self.wavefilename
(self.wavefilename, self.wavefile) = self.getWaveFile()
self.switchingLock.release()
file.close()
self.fileQueue.append(filename)
self._scheduleNextSwitch()
def decoder_commandline(self, file):
"""
must be overridden in child classes
"""
return []
def decode(self):
def decode_and_unlink(file):
decoder = subprocess.Popen(self.decoder_commandline(file), stdout=subprocess.PIPE, cwd=self.tmp_dir)
while True:
line = decoder.stdout.readline()
if line is None or (isinstance(line, bytes) and len(line) == 0):
break
self.outputWriter.send(line)
rc = decoder.wait()
if rc != 0:
logger.warning("decoder return code: %i", rc)
os.unlink(file)
if self.fileQueue:
file = self.fileQueue.pop()
logger.debug("processing file {0}".format(file))
threading.Thread(target=decode_and_unlink, args=[file]).start()
def run(self) -> None:
logger.debug("WSJT chopper starting up")
self.startScheduler()
while self.doRun:
data = self.source.read(256)
if data is None or (isinstance(data, bytes) and len(data) == 0):
self.doRun = False
else:
self.switchingLock.acquire()
self.wavefile.writeframes(data)
self.switchingLock.release()
self.decode()
logger.debug("WSJT chopper shutting down")
self.outputReader.close()
self.outputWriter.close()
self.emptyScheduler()
try:
os.unlink(self.wavefilename)
except Exception:
logger.exception("error removing undecoded file")
def read(self):
try:
return self.outputReader.recv()
except EOFError:
return None
class Ft8Chopper(WsjtChopper):
def __init__(self, source):
self.interval = 15
self.fileTimestampFormat = "%y%m%d_%H%M%S"
super().__init__(source)
def decoder_commandline(self, file):
# TODO expose decoding quality parameters through config
return ["jt9", "--ft8", "-d", "3", file]
class WsprChopper(WsjtChopper):
def __init__(self, source):
self.interval = 120
self.fileTimestampFormat = "%y%m%d_%H%M"
super().__init__(source)
def decoder_commandline(self, file):
# TODO expose decoding quality parameters through config
return ["wsprd", "-d", file]
class Jt65Chopper(WsjtChopper):
def __init__(self, source):
self.interval = 60
self.fileTimestampFormat = "%y%m%d_%H%M"
super().__init__(source)
def decoder_commandline(self, file):
# TODO expose decoding quality parameters through config
return ["jt9", "--jt65", "-d", "3", file]
class Jt9Chopper(WsjtChopper):
def __init__(self, source):
self.interval = 60
self.fileTimestampFormat = "%y%m%d_%H%M"
super().__init__(source)
def decoder_commandline(self, file):
# TODO expose decoding quality parameters through config
return ["jt9", "--jt9", "-d", "3", file]
class Ft4Chopper(WsjtChopper):
def __init__(self, source):
self.interval = 7.5
self.fileTimestampFormat = "%y%m%d_%H%M%S"
super().__init__(source)
def decoder_commandline(self, file):
# TODO expose decoding quality parameters through config
return ["jt9", "--ft4", "-d", "3", file]
class WsjtParser(object):
locator_pattern = re.compile(".*\\s([A-Z0-9]+)\\s([A-R]{2}[0-9]{2})$")
wspr_splitter_pattern = re.compile("([A-Z0-9]*)\\s([A-R]{2}[0-9]{2})\\s([0-9]+)")
def __init__(self, handler):
self.handler = handler
self.dial_freq = None
self.band = None
modes = {"~": "FT8", "#": "JT65", "@": "JT9", "+": "FT4"}
def parse(self, data):
try:
msg = data.decode().rstrip()
# known debug messages we know to skip
if msg.startswith("<DecodeFinished>"):
return
if msg.startswith(" EOF on input file"):
return
modes = list(WsjtParser.modes.keys())
if msg[21] in modes or msg[19] in modes:
out = self.parse_from_jt9(msg)
else:
out = self.parse_from_wsprd(msg)
self.handler.write_wsjt_message(out)
except ValueError:
logger.exception("error while parsing wsjt message")
def parse_timestamp(self, instring, dateformat):
ts = datetime.strptime(instring, dateformat)
return int(datetime.combine(date.today(), ts.time()).replace(tzinfo=timezone.utc).timestamp() * 1000)
def parse_from_jt9(self, msg):
# ft8 sample
# '222100 -15 -0.0 508 ~ CQ EA7MJ IM66'
# jt65 sample
# '2352 -7 0.4 1801 # R0WAS R2ABM KO85'
# '0003 -4 0.4 1762 # CQ R2ABM KO85'
modes = list(WsjtParser.modes.keys())
if msg[19] in modes:
dateformat = "%H%M"
else:
dateformat = "%H%M%S"
timestamp = self.parse_timestamp(msg[0 : len(dateformat)], dateformat)
msg = msg[len(dateformat) + 1 :]
modeChar = msg[14:15]
mode = WsjtParser.modes[modeChar] if modeChar in WsjtParser.modes else "unknown"
wsjt_msg = msg[17:53].strip()
self.parseLocator(wsjt_msg, mode)
Metrics.getSharedInstance().pushDecodes(self.band, mode)
return {
"timestamp": timestamp,
"db": float(msg[0:3]),
"dt": float(msg[4:8]),
"freq": int(msg[9:13]),
"mode": mode,
"msg": wsjt_msg,
}
def parseLocator(self, msg, mode):
m = WsjtParser.locator_pattern.match(msg)
if m is None:
return
# this is a valid locator in theory, but it's somewhere in the arctic ocean, near the north pole, so it's very
# likely this just means roger roger goodbye.
if m.group(2) == "RR73":
return
Map.getSharedInstance().updateLocation(m.group(1), LocatorLocation(m.group(2)), mode, self.band)
def parse_from_wsprd(self, msg):
# wspr sample
# '2600 -24 0.4 0.001492 -1 G8AXA JO01 33'
# '0052 -29 2.6 0.001486 0 G02CWT IO92 23'
wsjt_msg = msg[29:].strip()
self.parseWsprMessage(wsjt_msg)
Metrics.getSharedInstance().pushDecodes(self.band, "WSPR")
return {
"timestamp": self.parse_timestamp(msg[0:4], "%H%M"),
"db": float(msg[5:8]),
"dt": float(msg[9:13]),
"freq": float(msg[14:24]),
"drift": int(msg[25:28]),
"mode": "WSPR",
"msg": wsjt_msg,
}
def parseWsprMessage(self, msg):
m = WsjtParser.wspr_splitter_pattern.match(msg)
if m is None:
return
Map.getSharedInstance().updateLocation(m.group(1), LocatorLocation(m.group(2)), "WSPR", self.band)
def setDialFrequency(self, freq):
self.dial_freq = freq
self.band = Bandplan.getSharedInstance().findBand(freq)