break apart the ever-growing owrx/source.py

This commit is contained in:
Jakob Ketterl 2019-12-21 20:58:28 +01:00
parent 5b08dae28d
commit 292fe80acf
20 changed files with 1043 additions and 980 deletions

View File

@ -2,7 +2,7 @@ from http.server import HTTPServer
from owrx.http import RequestHandler
from owrx.config import PropertyManager
from owrx.feature import FeatureDetector
from owrx.source import SdrService
from owrx.sdr import SdrService
from socketserver import ThreadingMixIn
from owrx.sdrhu import SdrHuUpdater
from owrx.service import Services

50
owrx/client.py Normal file
View File

@ -0,0 +1,50 @@
from owrx.config import PropertyManager
from owrx.metrics import Metrics, DirectMetric
import threading
import logging
logger = logging.getLogger(__name__)
class TooManyClientsException(Exception):
pass
class ClientRegistry(object):
sharedInstance = None
creationLock = threading.Lock()
@staticmethod
def getSharedInstance():
with ClientRegistry.creationLock:
if ClientRegistry.sharedInstance is None:
ClientRegistry.sharedInstance = ClientRegistry()
return ClientRegistry.sharedInstance
def __init__(self):
self.clients = []
Metrics.getSharedInstance().addMetric("openwebrx.users", DirectMetric(self.clientCount))
super().__init__()
def broadcast(self):
n = self.clientCount()
for c in self.clients:
c.write_clients(n)
def addClient(self, client):
pm = PropertyManager.getSharedInstance()
if len(self.clients) >= pm["max_clients"]:
raise TooManyClientsException()
self.clients.append(client)
self.broadcast()
def clientCount(self):
return len(self.clients)
def removeClient(self, client):
try:
self.clients.remove(client)
except ValueError:
pass
self.broadcast()

View File

@ -1,5 +1,8 @@
from owrx.config import PropertyManager
from owrx.source import DspManager, CpuUsageThread, SdrService, ClientRegistry
from owrx.dsp import DspManager
from owrx.cpu import CpuUsageThread
from owrx.sdr import SdrService
from owrx.client import ClientRegistry
from owrx.feature import FeatureDetector
from owrx.version import openwebrx_version
from owrx.bands import Bandplan

View File

@ -6,7 +6,7 @@ from datetime import datetime
from string import Template
from owrx.websocket import WebSocketConnection
from owrx.config import PropertyManager
from owrx.source import ClientRegistry
from owrx.client import ClientRegistry
from owrx.connection import WebSocketMessageHandler
from owrx.version import openwebrx_version
from owrx.feature import FeatureDetector

73
owrx/cpu.py Normal file
View File

@ -0,0 +1,73 @@
import threading
import logging
logger = logging.getLogger(__name__)
class CpuUsageThread(threading.Thread):
sharedInstance = None
@staticmethod
def getSharedInstance():
if CpuUsageThread.sharedInstance is None:
CpuUsageThread.sharedInstance = CpuUsageThread()
return CpuUsageThread.sharedInstance
def __init__(self):
self.clients = []
self.doRun = True
self.last_worktime = 0
self.last_idletime = 0
self.endEvent = threading.Event()
super().__init__()
def run(self):
while self.doRun:
try:
cpu_usage = self.get_cpu_usage()
except:
cpu_usage = 0
for c in self.clients:
c.write_cpu_usage(cpu_usage)
self.endEvent.wait(timeout=3)
logger.debug("cpu usage thread shut down")
def get_cpu_usage(self):
try:
f = open("/proc/stat", "r")
except:
return 0 # Workaround, possibly we're on a Mac
line = ""
while not "cpu " in line:
line = f.readline()
f.close()
spl = line.split(" ")
worktime = int(spl[2]) + int(spl[3]) + int(spl[4])
idletime = int(spl[5])
dworktime = worktime - self.last_worktime
didletime = idletime - self.last_idletime
rate = float(dworktime) / (didletime + dworktime)
self.last_worktime = worktime
self.last_idletime = idletime
if self.last_worktime == 0:
return 0
return rate
def add_client(self, c):
self.clients.append(c)
if not self.is_alive():
self.start()
def remove_client(self, c):
try:
self.clients.remove(c)
except ValueError:
pass
if not self.clients:
self.shutdown()
def shutdown(self):
CpuUsageThread.sharedInstance = None
self.doRun = False
self.endEvent.set()

152
owrx/dsp.py Normal file
View File

@ -0,0 +1,152 @@
from owrx.config import PropertyManager
from owrx.meta import MetaParser
from owrx.wsjt import WsjtParser
from owrx.aprs import AprsParser
from owrx.source import SdrSource
from csdr import csdr
import threading
import logging
logger = logging.getLogger(__name__)
class DspManager(csdr.output):
def __init__(self, handler, sdrSource):
self.handler = handler
self.sdrSource = sdrSource
self.metaParser = MetaParser(self.handler)
self.wsjtParser = WsjtParser(self.handler)
self.aprsParser = AprsParser(self.handler)
self.localProps = (
self.sdrSource.getProps()
.collect(
"audio_compression",
"fft_compression",
"digimodes_fft_size",
"csdr_dynamic_bufsize",
"csdr_print_bufsizes",
"csdr_through",
"digimodes_enable",
"samp_rate",
"digital_voice_unvoiced_quality",
"dmr_filter",
"temporary_directory",
"center_freq",
)
.defaults(PropertyManager.getSharedInstance())
)
self.dsp = csdr.dsp(self)
self.dsp.nc_port = self.sdrSource.getPort()
def set_low_cut(cut):
bpf = self.dsp.get_bpf()
bpf[0] = cut
self.dsp.set_bpf(*bpf)
def set_high_cut(cut):
bpf = self.dsp.get_bpf()
bpf[1] = cut
self.dsp.set_bpf(*bpf)
def set_dial_freq(key, value):
freq = self.localProps["center_freq"] + self.localProps["offset_freq"]
self.wsjtParser.setDialFrequency(freq)
self.aprsParser.setDialFrequency(freq)
self.metaParser.setDialFrequency(freq)
self.subscriptions = [
self.localProps.getProperty("audio_compression").wire(self.dsp.set_audio_compression),
self.localProps.getProperty("fft_compression").wire(self.dsp.set_fft_compression),
self.localProps.getProperty("digimodes_fft_size").wire(self.dsp.set_secondary_fft_size),
self.localProps.getProperty("samp_rate").wire(self.dsp.set_samp_rate),
self.localProps.getProperty("output_rate").wire(self.dsp.set_output_rate),
self.localProps.getProperty("offset_freq").wire(self.dsp.set_offset_freq),
self.localProps.getProperty("squelch_level").wire(self.dsp.set_squelch_level),
self.localProps.getProperty("low_cut").wire(set_low_cut),
self.localProps.getProperty("high_cut").wire(set_high_cut),
self.localProps.getProperty("mod").wire(self.dsp.set_demodulator),
self.localProps.getProperty("digital_voice_unvoiced_quality").wire(self.dsp.set_unvoiced_quality),
self.localProps.getProperty("dmr_filter").wire(self.dsp.set_dmr_filter),
self.localProps.getProperty("temporary_directory").wire(self.dsp.set_temporary_directory),
self.localProps.collect("center_freq", "offset_freq").wire(set_dial_freq),
]
self.dsp.set_offset_freq(0)
self.dsp.set_bpf(-4000, 4000)
self.dsp.csdr_dynamic_bufsize = self.localProps["csdr_dynamic_bufsize"]
self.dsp.csdr_print_bufsizes = self.localProps["csdr_print_bufsizes"]
self.dsp.csdr_through = self.localProps["csdr_through"]
if self.localProps["digimodes_enable"]:
def set_secondary_mod(mod):
if mod == False:
mod = None
self.dsp.set_secondary_demodulator(mod)
if mod is not None:
self.handler.write_secondary_dsp_config(
{
"secondary_fft_size": self.localProps["digimodes_fft_size"],
"if_samp_rate": self.dsp.if_samp_rate(),
"secondary_bw": self.dsp.secondary_bw(),
}
)
self.subscriptions += [
self.localProps.getProperty("secondary_mod").wire(set_secondary_mod),
self.localProps.getProperty("secondary_offset_freq").wire(self.dsp.set_secondary_offset_freq),
]
self.sdrSource.addClient(self)
super().__init__()
def start(self):
if self.sdrSource.isAvailable():
self.dsp.start()
def receive_output(self, t, read_fn):
logger.debug("adding new output of type %s", t)
writers = {
"audio": self.handler.write_dsp_data,
"smeter": self.handler.write_s_meter_level,
"secondary_fft": self.handler.write_secondary_fft,
"secondary_demod": self.handler.write_secondary_demod,
"meta": self.metaParser.parse,
"wsjt_demod": self.wsjtParser.parse,
"packet_demod": self.aprsParser.parse,
}
write = writers[t]
threading.Thread(target=self.pump(read_fn, write)).start()
def stop(self):
self.dsp.stop()
self.sdrSource.removeClient(self)
for sub in self.subscriptions:
sub.cancel()
self.subscriptions = []
def setProperty(self, prop, value):
self.localProps.getProperty(prop).setValue(value)
def getClientClass(self):
return SdrSource.CLIENT_USER
def onStateChange(self, state):
if state == SdrSource.STATE_RUNNING:
logger.debug("received STATE_RUNNING, attempting DspSource restart")
self.dsp.start()
elif state == SdrSource.STATE_STOPPING:
logger.debug("received STATE_STOPPING, shutting down DspSource")
self.dsp.stop()
elif state == SdrSource.STATE_FAILED:
logger.debug("received STATE_FAILED, shutting down DspSource")
self.dsp.stop()
self.handler.handleSdrFailure("sdr device failed")
def onBusyStateChange(self, state):
pass

92
owrx/fft.py Normal file
View File

@ -0,0 +1,92 @@
from owrx.config import PropertyManager
from csdr import csdr
import threading
from owrx.source import SdrSource
import logging
logger = logging.getLogger(__name__)
class SpectrumThread(csdr.output):
def __init__(self, sdrSource):
self.sdrSource = sdrSource
super().__init__()
self.props = props = self.sdrSource.props.collect(
"samp_rate",
"fft_size",
"fft_fps",
"fft_voverlap_factor",
"fft_compression",
"csdr_dynamic_bufsize",
"csdr_print_bufsizes",
"csdr_through",
"temporary_directory",
).defaults(PropertyManager.getSharedInstance())
self.dsp = dsp = csdr.dsp(self)
dsp.nc_port = self.sdrSource.getPort()
dsp.set_demodulator("fft")
def set_fft_averages(key, value):
samp_rate = props["samp_rate"]
fft_size = props["fft_size"]
fft_fps = props["fft_fps"]
fft_voverlap_factor = props["fft_voverlap_factor"]
dsp.set_fft_averages(
int(round(1.0 * samp_rate / fft_size / fft_fps / (1.0 - fft_voverlap_factor)))
if fft_voverlap_factor > 0
else 0
)
self.subscriptions = [
props.getProperty("samp_rate").wire(dsp.set_samp_rate),
props.getProperty("fft_size").wire(dsp.set_fft_size),
props.getProperty("fft_fps").wire(dsp.set_fft_fps),
props.getProperty("fft_compression").wire(dsp.set_fft_compression),
props.getProperty("temporary_directory").wire(dsp.set_temporary_directory),
props.collect("samp_rate", "fft_size", "fft_fps", "fft_voverlap_factor").wire(set_fft_averages),
]
set_fft_averages(None, None)
dsp.csdr_dynamic_bufsize = props["csdr_dynamic_bufsize"]
dsp.csdr_print_bufsizes = props["csdr_print_bufsizes"]
dsp.csdr_through = props["csdr_through"]
logger.debug("Spectrum thread initialized successfully.")
def start(self):
self.sdrSource.addClient(self)
if self.sdrSource.isAvailable():
self.dsp.start()
def supports_type(self, t):
return t == "audio"
def receive_output(self, type, read_fn):
if self.props["csdr_dynamic_bufsize"]:
read_fn(8) # dummy read to skip bufsize & preamble
logger.debug("Note: CSDR_DYNAMIC_BUFSIZE_ON = 1")
threading.Thread(target=self.pump(read_fn, self.sdrSource.writeSpectrumData)).start()
def stop(self):
self.dsp.stop()
self.sdrSource.removeClient(self)
for c in self.subscriptions:
c.cancel()
self.subscriptions = []
def getClientClass(self):
return SdrSource.CLIENT_USER
def onStateChange(self, state):
if state in [SdrSource.STATE_STOPPING, SdrSource.STATE_FAILED]:
self.dsp.stop()
elif state == SdrSource.STATE_RUNNING:
self.dsp.start()
def onBusyStateChange(self, state):
pass

90
owrx/sdr.py Normal file
View File

@ -0,0 +1,90 @@
from owrx.config import PropertyManager
from owrx.feature import FeatureDetector, UnknownFeatureException
import sys
import logging
logger = logging.getLogger(__name__)
class SdrService(object):
sdrProps = None
sources = {}
lastPort = None
@staticmethod
def getNextPort():
pm = PropertyManager.getSharedInstance()
(start, end) = pm["iq_port_range"]
if SdrService.lastPort is None:
SdrService.lastPort = start
else:
SdrService.lastPort += 1
if SdrService.lastPort > end:
raise IndexError("no more available ports to start more sdrs")
return SdrService.lastPort
@staticmethod
def loadProps():
if SdrService.sdrProps is None:
pm = PropertyManager.getSharedInstance()
featureDetector = FeatureDetector()
def loadIntoPropertyManager(dict: dict):
propertyManager = PropertyManager()
for (name, value) in dict.items():
propertyManager[name] = value
return propertyManager
def sdrTypeAvailable(value):
try:
if not featureDetector.is_available(value["type"]):
logger.error(
'The RTL source type "{0}" is not available. please check requirements.'.format(
value["type"]
)
)
return False
return True
except UnknownFeatureException:
logger.error(
'The RTL source type "{0}" is invalid. Please check your configuration'.format(value["type"])
)
return False
# transform all dictionary items into PropertyManager object, filtering out unavailable ones
SdrService.sdrProps = {
name: loadIntoPropertyManager(value) for (name, value) in pm["sdrs"].items() if sdrTypeAvailable(value)
}
logger.info(
"SDR sources loaded. Availables SDRs: {0}".format(
", ".join(map(lambda x: x["name"], SdrService.sdrProps.values()))
)
)
@staticmethod
def getSource(id=None):
SdrService.loadProps()
sources = SdrService.getSources()
if not sources:
return None
if id is None:
# TODO: configure default sdr in config? right now it will pick the first one off the list.
id = list(sources.keys())[0]
if not id in sources:
return None
return sources[id]
@staticmethod
def getSources():
SdrService.loadProps()
for id in SdrService.sdrProps.keys():
if not id in SdrService.sources:
props = SdrService.sdrProps[id]
sdrType = props["type"]
className = "".join(x for x in sdrType.title() if x.isalnum()) + "Source"
module = __import__("owrx.source.{0}".format(sdrType), fromlist=[className])
cls = getattr(module, className)
SdrService.sources[id] = cls(id, props, SdrService.getNextPort())
return {key: s for key, s in SdrService.sources.items() if not s.isFailed()}

View File

@ -1,13 +1,14 @@
import threading
from owrx.socket import getAvailablePort
from datetime import datetime, timezone, timedelta
from owrx.source import SdrService, SdrSource
from owrx.source import SdrSource
from owrx.sdr import SdrService
from owrx.bands import Bandplan
from csdr.csdr import dsp, output
from owrx.wsjt import WsjtParser
from owrx.aprs import AprsParser
from owrx.config import PropertyManager
from owrx.source import Resampler
from owrx.source.resampler import Resampler
from owrx.feature import FeatureDetector
import logging

View File

@ -1,975 +0,0 @@
import subprocess
from owrx.config import PropertyManager
from owrx.feature import FeatureDetector, UnknownFeatureException
from owrx.meta import MetaParser
from owrx.wsjt import WsjtParser
from owrx.aprs import AprsParser
from owrx.metrics import Metrics, DirectMetric
from owrx.socket import getAvailablePort
import threading
from csdr import csdr
import time
import os
import signal
import sys
import socket
import logging
import shlex
logger = logging.getLogger(__name__)
class SdrService(object):
sdrProps = None
sources = {}
lastPort = None
@staticmethod
def getNextPort():
pm = PropertyManager.getSharedInstance()
(start, end) = pm["iq_port_range"]
if SdrService.lastPort is None:
SdrService.lastPort = start
else:
SdrService.lastPort += 1
if SdrService.lastPort > end:
raise IndexError("no more available ports to start more sdrs")
return SdrService.lastPort
@staticmethod
def loadProps():
if SdrService.sdrProps is None:
pm = PropertyManager.getSharedInstance()
featureDetector = FeatureDetector()
def loadIntoPropertyManager(dict: dict):
propertyManager = PropertyManager()
for (name, value) in dict.items():
propertyManager[name] = value
return propertyManager
def sdrTypeAvailable(value):
try:
if not featureDetector.is_available(value["type"]):
logger.error(
'The RTL source type "{0}" is not available. please check requirements.'.format(
value["type"]
)
)
return False
return True
except UnknownFeatureException:
logger.error(
'The RTL source type "{0}" is invalid. Please check your configuration'.format(value["type"])
)
return False
# transform all dictionary items into PropertyManager object, filtering out unavailable ones
SdrService.sdrProps = {
name: loadIntoPropertyManager(value) for (name, value) in pm["sdrs"].items() if sdrTypeAvailable(value)
}
logger.info(
"SDR sources loaded. Availables SDRs: {0}".format(
", ".join(map(lambda x: x["name"], SdrService.sdrProps.values()))
)
)
@staticmethod
def getSource(id=None):
SdrService.loadProps()
sources = SdrService.getSources()
if not sources:
return None
if id is None:
# TODO: configure default sdr in config? right now it will pick the first one off the list.
id = list(sources.keys())[0]
if not id in sources:
return None
return sources[id]
@staticmethod
def getSources():
SdrService.loadProps()
for id in SdrService.sdrProps.keys():
if not id in SdrService.sources:
props = SdrService.sdrProps[id]
className = "".join(x for x in props["type"].title() if x.isalnum()) + "Source"
cls = getattr(sys.modules[__name__], className)
SdrService.sources[id] = cls(id, props, SdrService.getNextPort())
return {key: s for key, s in SdrService.sources.items() if not s.isFailed()}
class SdrSource(object):
STATE_STOPPED = 0
STATE_STARTING = 1
STATE_RUNNING = 2
STATE_STOPPING = 3
STATE_TUNING = 4
STATE_FAILED = 5
BUSYSTATE_IDLE = 0
BUSYSTATE_BUSY = 1
CLIENT_INACTIVE = 0
CLIENT_BACKGROUND = 1
CLIENT_USER = 2
def __init__(self, id, props, port):
self.id = id
self.props = props
self.profile_id = None
self.activateProfile()
self.rtlProps = self.props.collect(*self.getEventNames()).defaults(PropertyManager.getSharedInstance())
self.wireEvents()
self.port = port
self.monitor = None
self.clients = []
self.spectrumClients = []
self.spectrumThread = None
self.process = None
self.modificationLock = threading.Lock()
self.failed = False
self.state = SdrSource.STATE_STOPPED
self.busyState = SdrSource.BUSYSTATE_IDLE
def getEventNames(self):
return [
"samp_rate",
"nmux_memory",
"center_freq",
"ppm",
"rf_gain",
"lna_gain",
"rf_amp",
"antenna",
"if_gain",
"lfo_offset",
]
def wireEvents(self):
def restart(name, value):
logger.debug("restarting sdr source due to property change: {0} changed to {1}".format(name, value))
self.stop()
self.start()
self.rtlProps.wire(restart)
# override this in subclasses
def getCommand(self):
pass
# override this in subclasses, if necessary
def getFormatConversion(self):
return None
def activateProfile(self, profile_id=None):
profiles = self.props["profiles"]
if profile_id is None:
profile_id = list(profiles.keys())[0]
if profile_id == self.profile_id:
return
logger.debug("activating profile {0}".format(profile_id))
self.profile_id = profile_id
profile = profiles[profile_id]
self.props["profile_id"] = profile_id
for (key, value) in profile.items():
# skip the name, that would overwrite the source name.
if key == "name":
continue
self.props[key] = value
def getId(self):
return self.id
def getProfileId(self):
return self.profile_id
def getProfiles(self):
return self.props["profiles"]
def getName(self):
return self.props["name"]
def getProps(self):
return self.props
def getPort(self):
return self.port
def useNmux(self):
return True
def getCommandValues(self):
dict = self.rtlProps.collect(*self.getEventNames()).__dict__()
if "lfo_offset" in dict and dict["lfo_offset"] is not None:
dict["tuner_freq"] = dict["center_freq"] + dict["lfo_offset"]
else:
dict["tuner_freq"] = dict["center_freq"]
return dict
def start(self):
self.modificationLock.acquire()
if self.monitor:
self.modificationLock.release()
return
props = self.rtlProps
cmd = self.getCommand().format(**self.getCommandValues())
format_conversion = self.getFormatConversion()
if format_conversion is not None:
cmd += " | " + format_conversion
if self.useNmux():
nmux_bufcnt = nmux_bufsize = 0
while nmux_bufsize < props["samp_rate"] / 4:
nmux_bufsize += 4096
while nmux_bufsize * nmux_bufcnt < props["nmux_memory"] * 1e6:
nmux_bufcnt += 1
if nmux_bufcnt == 0 or nmux_bufsize == 0:
logger.error(
"Error: nmux_bufsize or nmux_bufcnt is zero. These depend on nmux_memory and samp_rate options in config_webrx.py"
)
self.modificationLock.release()
return
logger.debug("nmux_bufsize = %d, nmux_bufcnt = %d" % (nmux_bufsize, nmux_bufcnt))
cmd = cmd + " | nmux --bufsize %d --bufcnt %d --port %d --address 127.0.0.1" % (
nmux_bufsize,
nmux_bufcnt,
self.port,
)
# don't use shell mode for commands without piping
if "|" in cmd:
self.process = subprocess.Popen(cmd, shell=True, preexec_fn=os.setpgrp)
else:
# preexec_fn can go as soon as there's no piped commands left
# the os.killpg call must be replaced with something more reasonable at the same time
self.process = subprocess.Popen(shlex.split(cmd), preexec_fn=os.setpgrp)
logger.info("Started rtl source: " + cmd)
available = False
def wait_for_process_to_end():
rc = self.process.wait()
logger.debug("shut down with RC={0}".format(rc))
self.monitor = None
self.monitor = threading.Thread(target=wait_for_process_to_end)
self.monitor.start()
retries = 1000
while retries > 0:
retries -= 1
if self.monitor is None:
break
testsock = socket.socket()
try:
testsock.connect(("127.0.0.1", self.getPort()))
testsock.close()
available = True
break
except:
time.sleep(0.1)
if not available:
self.failed = True
self.postStart()
self.modificationLock.release()
self.setState(SdrSource.STATE_FAILED if self.failed else SdrSource.STATE_RUNNING)
def postStart(self):
pass
def isAvailable(self):
return self.monitor is not None
def isFailed(self):
return self.failed
def stop(self):
self.setState(SdrSource.STATE_STOPPING)
self.modificationLock.acquire()
if self.process is not None:
try:
os.killpg(os.getpgid(self.process.pid), signal.SIGTERM)
except ProcessLookupError:
# been killed by something else, ignore
pass
if self.monitor:
self.monitor.join()
self.sleepOnRestart()
self.modificationLock.release()
self.setState(SdrSource.STATE_STOPPED)
def sleepOnRestart(self):
pass
def hasClients(self, *args):
clients = [c for c in self.clients if c.getClientClass() in args]
return len(clients) > 0
def addClient(self, c):
self.clients.append(c)
hasUsers = self.hasClients(SdrSource.CLIENT_USER)
hasBackgroundTasks = self.hasClients(SdrSource.CLIENT_BACKGROUND)
if hasUsers or hasBackgroundTasks:
self.start()
self.setBusyState(SdrSource.BUSYSTATE_BUSY if hasUsers else SdrSource.BUSYSTATE_IDLE)
def removeClient(self, c):
try:
self.clients.remove(c)
except ValueError:
pass
hasUsers = self.hasClients(SdrSource.CLIENT_USER)
hasBackgroundTasks = self.hasClients(SdrSource.CLIENT_BACKGROUND)
self.setBusyState(SdrSource.BUSYSTATE_BUSY if hasUsers else SdrSource.BUSYSTATE_IDLE)
if not hasUsers and not hasBackgroundTasks:
self.stop()
def addSpectrumClient(self, c):
self.spectrumClients.append(c)
if self.spectrumThread is None:
self.spectrumThread = SpectrumThread(self)
self.spectrumThread.start()
def removeSpectrumClient(self, c):
try:
self.spectrumClients.remove(c)
except ValueError:
pass
if not self.spectrumClients and self.spectrumThread is not None:
self.spectrumThread.stop()
self.spectrumThread = None
def writeSpectrumData(self, data):
for c in self.spectrumClients:
c.write_spectrum_data(data)
def setState(self, state):
if state == self.state:
return
self.state = state
for c in self.clients:
c.onStateChange(state)
def setBusyState(self, state):
if state == self.busyState:
return
self.busyState = state
for c in self.clients:
c.onBusyStateChange(state)
class Resampler(SdrSource):
def __init__(self, props, port, sdr):
sdrProps = sdr.getProps()
self.shift = (sdrProps["center_freq"] - props["center_freq"]) / sdrProps["samp_rate"]
self.decimation = int(float(sdrProps["samp_rate"]) / props["samp_rate"])
if_samp_rate = sdrProps["samp_rate"] / self.decimation
self.transition_bw = 0.15 * (if_samp_rate / float(sdrProps["samp_rate"]))
props["samp_rate"] = if_samp_rate
self.sdr = sdr
super().__init__(None, props, port)
def start(self):
if self.isFailed():
return
self.modificationLock.acquire()
if self.monitor:
self.modificationLock.release()
return
self.setState(SdrSource.STATE_STARTING)
props = self.rtlProps
resampler_command = [
"nc -v 127.0.0.1 {nc_port}".format(nc_port=self.sdr.getPort()),
"csdr shift_addition_cc {shift}".format(shift=self.shift),
"csdr fir_decimate_cc {decimation} {ddc_transition_bw} HAMMING".format(
decimation=self.decimation, ddc_transition_bw=self.transition_bw
),
]
nmux_bufcnt = nmux_bufsize = 0
while nmux_bufsize < props["samp_rate"] / 4:
nmux_bufsize += 4096
while nmux_bufsize * nmux_bufcnt < props["nmux_memory"] * 1e6:
nmux_bufcnt += 1
if nmux_bufcnt == 0 or nmux_bufsize == 0:
logger.error(
"Error: nmux_bufsize or nmux_bufcnt is zero. These depend on nmux_memory and samp_rate options in config_webrx.py"
)
self.modificationLock.release()
return
logger.debug("nmux_bufsize = %d, nmux_bufcnt = %d" % (nmux_bufsize, nmux_bufcnt))
resampler_command += [
"nmux --bufsize %d --bufcnt %d --port %d --address 127.0.0.1" % (nmux_bufsize, nmux_bufcnt, self.port)
]
cmd = " | ".join(resampler_command)
logger.debug("resampler command: %s", cmd)
self.process = subprocess.Popen(cmd, shell=True, preexec_fn=os.setpgrp)
logger.info("Started resampler source: " + cmd)
available = False
def wait_for_process_to_end():
rc = self.process.wait()
logger.debug("shut down with RC={0}".format(rc))
self.monitor = None
self.monitor = threading.Thread(target=wait_for_process_to_end)
self.monitor.start()
retries = 1000
while retries > 0:
retries -= 1
if self.monitor is None:
break
testsock = socket.socket()
try:
testsock.connect(("127.0.0.1", self.getPort()))
testsock.close()
available = True
break
except:
time.sleep(0.1)
if not available:
self.failed = True
self.modificationLock.release()
self.setState(SdrSource.STATE_FAILED if self.failed else SdrSource.STATE_RUNNING)
def activateProfile(self, profile_id=None):
pass
class FifiSdrSource(SdrSource):
def getCommand(self):
return "arecord -D hw:2,0 -f S16_LE -r {samp_rate} -c2 -"
def getFormatConversion(self):
return "csdr convert_s16_f | csdr gain_ff 30"
class ConnectorSource(SdrSource):
def __init__(self, id, props, port):
super().__init__(id, props, port)
self.controlSocket = None
self.controlPort = getAvailablePort()
def getEventNames(self):
return [
"samp_rate",
"center_freq",
"ppm",
"rf_gain",
"device",
"iqswap",
"lfo_offset",
"rtltcp_compat",
]
def sendControlMessage(self, prop, value):
logger.debug("sending property change over control socket: {0} changed to {1}".format(prop, value))
self.controlSocket.sendall("{prop}:{value}\n".format(prop=prop, value=value).encode())
def wireEvents(self):
def reconfigure(prop, value):
if self.monitor is None:
return
if (
(prop == "center_freq" or prop == "lfo_offset")
and "lfo_offset" in self.rtlProps
and self.rtlProps["lfo_offset"] is not None
):
freq = self.rtlProps["center_freq"] + self.rtlProps["lfo_offset"]
self.sendControlMessage("center_freq", freq)
else:
self.sendControlMessage(prop, value)
self.rtlProps.wire(reconfigure)
def postStart(self):
logger.debug("opening control socket...")
self.controlSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.controlSocket.connect(("localhost", self.controlPort))
def stop(self):
super().stop()
if self.controlSocket:
self.controlSocket.close()
self.controlSocket = None
def getFormatConversion(self):
return None
def useNmux(self):
return False
class RtlSdrSource(ConnectorSource):
def getCommand(self):
cmd = (
"rtl_connector -p {port} -c {controlPort}".format(port=self.port, controlPort=self.controlPort)
+ " -s {samp_rate} -f {tuner_freq} -g {rf_gain} -P {ppm}"
)
if "device" in self.rtlProps and self.rtlProps["device"] is not None:
cmd += ' -d "{device}"'
if self.rtlProps["iqswap"]:
cmd += " -i"
if self.rtlProps["rtltcp_compat"]:
cmd += " -r"
return cmd
class SoapyConnectorSource(ConnectorSource):
"""
must be implemented by child classes to be able to build a driver-based device selector by default.
return value must be the corresponding soapy driver identifier.
"""
def getDriver(self):
pass
def parseDeviceString(self, dstr):
def decodeComponent(c):
kv = c.split("=", 1)
if len(kv) < 2:
return c
else:
return {kv[0]: kv[1]}
return [decodeComponent(c) for c in dstr.split(",")]
def encodeDeviceString(self, dobj):
def encodeComponent(c):
if isinstance(c, str):
return c
else:
return ",".join(["{0}={1}".format(key, value) for key, value in c.items()])
return ",".join([encodeComponent(c) for c in dobj])
"""
this method always attempts to inject a driver= part into the soapysdr query, depending on what connector was used.
this prevents the soapy_connector from using the wrong device in scenarios where there's no same-type sdrs.
"""
def getCommandValues(self):
values = super().getCommandValues()
if "device" in values and values["device"] is not None:
parsed = self.parseDeviceString(values["device"])
parsed = [v for v in parsed if "driver" not in v]
parsed += [{"driver": self.getDriver()}]
values["device"] = self.encodeDeviceString(parsed)
else:
values["device"] = "driver={0}".format(self.getDriver())
return values
class SdrplaySource(SoapyConnectorSource):
def getDriver(self):
return "sdrplay"
def getEventNames(self):
return super().getEventNames() + ["antenna"]
def getCommand(self):
cmd = (
"soapy_connector -p {port} -c {controlPort}".format(port=self.port, controlPort=self.controlPort)
+ ' -s {samp_rate} -f {tuner_freq} -g "{rf_gain}" -P {ppm} -a "{antenna}" -d "{device}"'
)
values = self.getCommandValues()
if values["iqswap"]:
cmd += " -i"
if self.rtlProps["rtltcp_compat"]:
cmd += " -r"
return cmd
class AirspySource(SoapyConnectorSource):
def getDriver(self):
return "airspy"
def getEventNames(self):
return super().getEventNames() + ["bias_tee"]
def getCommand(self):
cmd = (
"soapy_connector -p {port} -c {controlPort}".format(port=self.port, controlPort=self.controlPort)
+ ' -s {samp_rate} -f {tuner_freq} -g "{rf_gain}" -P {ppm} -d "{device}"'
)
values = self.getCommandValues()
if values["iqswap"]:
cmd += " -i"
if self.rtlProps["rtltcp_compat"]:
cmd += " -r"
if values["bias_tee"]:
cmd += " -t biastee=true"
return cmd
class AirspyhfSource(AirspySource):
def getDriver(self):
return "airspyhf"
class HackrfSource(SdrSource):
def getCommand(self):
return "hackrf_transfer -s {samp_rate} -f {tuner_freq} -g {rf_gain} -l{lna_gain} -a{rf_amp} -r-"
def getFormatConversion(self):
return "csdr convert_s8_f"
class SpectrumThread(csdr.output):
def __init__(self, sdrSource):
self.sdrSource = sdrSource
super().__init__()
self.props = props = self.sdrSource.props.collect(
"samp_rate",
"fft_size",
"fft_fps",
"fft_voverlap_factor",
"fft_compression",
"csdr_dynamic_bufsize",
"csdr_print_bufsizes",
"csdr_through",
"temporary_directory",
).defaults(PropertyManager.getSharedInstance())
self.dsp = dsp = csdr.dsp(self)
dsp.nc_port = self.sdrSource.getPort()
dsp.set_demodulator("fft")
def set_fft_averages(key, value):
samp_rate = props["samp_rate"]
fft_size = props["fft_size"]
fft_fps = props["fft_fps"]
fft_voverlap_factor = props["fft_voverlap_factor"]
dsp.set_fft_averages(
int(round(1.0 * samp_rate / fft_size / fft_fps / (1.0 - fft_voverlap_factor)))
if fft_voverlap_factor > 0
else 0
)
self.subscriptions = [
props.getProperty("samp_rate").wire(dsp.set_samp_rate),
props.getProperty("fft_size").wire(dsp.set_fft_size),
props.getProperty("fft_fps").wire(dsp.set_fft_fps),
props.getProperty("fft_compression").wire(dsp.set_fft_compression),
props.getProperty("temporary_directory").wire(dsp.set_temporary_directory),
props.collect("samp_rate", "fft_size", "fft_fps", "fft_voverlap_factor").wire(set_fft_averages),
]
set_fft_averages(None, None)
dsp.csdr_dynamic_bufsize = props["csdr_dynamic_bufsize"]
dsp.csdr_print_bufsizes = props["csdr_print_bufsizes"]
dsp.csdr_through = props["csdr_through"]
logger.debug("Spectrum thread initialized successfully.")
def start(self):
self.sdrSource.addClient(self)
if self.sdrSource.isAvailable():
self.dsp.start()
def supports_type(self, t):
return t == "audio"
def receive_output(self, type, read_fn):
if self.props["csdr_dynamic_bufsize"]:
read_fn(8) # dummy read to skip bufsize & preamble
logger.debug("Note: CSDR_DYNAMIC_BUFSIZE_ON = 1")
threading.Thread(target=self.pump(read_fn, self.sdrSource.writeSpectrumData)).start()
def stop(self):
self.dsp.stop()
self.sdrSource.removeClient(self)
for c in self.subscriptions:
c.cancel()
self.subscriptions = []
def getClientClass(self):
return SdrSource.CLIENT_USER
def onStateChange(self, state):
if state in [SdrSource.STATE_STOPPING, SdrSource.STATE_FAILED]:
self.dsp.stop()
elif state == SdrSource.STATE_RUNNING:
self.dsp.start()
def onBusyStateChange(self, state):
pass
class DspManager(csdr.output):
def __init__(self, handler, sdrSource):
self.handler = handler
self.sdrSource = sdrSource
self.metaParser = MetaParser(self.handler)
self.wsjtParser = WsjtParser(self.handler)
self.aprsParser = AprsParser(self.handler)
self.localProps = (
self.sdrSource.getProps()
.collect(
"audio_compression",
"fft_compression",
"digimodes_fft_size",
"csdr_dynamic_bufsize",
"csdr_print_bufsizes",
"csdr_through",
"digimodes_enable",
"samp_rate",
"digital_voice_unvoiced_quality",
"dmr_filter",
"temporary_directory",
"center_freq",
)
.defaults(PropertyManager.getSharedInstance())
)
self.dsp = csdr.dsp(self)
self.dsp.nc_port = self.sdrSource.getPort()
def set_low_cut(cut):
bpf = self.dsp.get_bpf()
bpf[0] = cut
self.dsp.set_bpf(*bpf)
def set_high_cut(cut):
bpf = self.dsp.get_bpf()
bpf[1] = cut
self.dsp.set_bpf(*bpf)
def set_dial_freq(key, value):
freq = self.localProps["center_freq"] + self.localProps["offset_freq"]
self.wsjtParser.setDialFrequency(freq)
self.aprsParser.setDialFrequency(freq)
self.metaParser.setDialFrequency(freq)
self.subscriptions = [
self.localProps.getProperty("audio_compression").wire(self.dsp.set_audio_compression),
self.localProps.getProperty("fft_compression").wire(self.dsp.set_fft_compression),
self.localProps.getProperty("digimodes_fft_size").wire(self.dsp.set_secondary_fft_size),
self.localProps.getProperty("samp_rate").wire(self.dsp.set_samp_rate),
self.localProps.getProperty("output_rate").wire(self.dsp.set_output_rate),
self.localProps.getProperty("offset_freq").wire(self.dsp.set_offset_freq),
self.localProps.getProperty("squelch_level").wire(self.dsp.set_squelch_level),
self.localProps.getProperty("low_cut").wire(set_low_cut),
self.localProps.getProperty("high_cut").wire(set_high_cut),
self.localProps.getProperty("mod").wire(self.dsp.set_demodulator),
self.localProps.getProperty("digital_voice_unvoiced_quality").wire(self.dsp.set_unvoiced_quality),
self.localProps.getProperty("dmr_filter").wire(self.dsp.set_dmr_filter),
self.localProps.getProperty("temporary_directory").wire(self.dsp.set_temporary_directory),
self.localProps.collect("center_freq", "offset_freq").wire(set_dial_freq),
]
self.dsp.set_offset_freq(0)
self.dsp.set_bpf(-4000, 4000)
self.dsp.csdr_dynamic_bufsize = self.localProps["csdr_dynamic_bufsize"]
self.dsp.csdr_print_bufsizes = self.localProps["csdr_print_bufsizes"]
self.dsp.csdr_through = self.localProps["csdr_through"]
if self.localProps["digimodes_enable"]:
def set_secondary_mod(mod):
if mod == False:
mod = None
self.dsp.set_secondary_demodulator(mod)
if mod is not None:
self.handler.write_secondary_dsp_config(
{
"secondary_fft_size": self.localProps["digimodes_fft_size"],
"if_samp_rate": self.dsp.if_samp_rate(),
"secondary_bw": self.dsp.secondary_bw(),
}
)
self.subscriptions += [
self.localProps.getProperty("secondary_mod").wire(set_secondary_mod),
self.localProps.getProperty("secondary_offset_freq").wire(self.dsp.set_secondary_offset_freq),
]
self.sdrSource.addClient(self)
super().__init__()
def start(self):
if self.sdrSource.isAvailable():
self.dsp.start()
def receive_output(self, t, read_fn):
logger.debug("adding new output of type %s", t)
writers = {
"audio": self.handler.write_dsp_data,
"smeter": self.handler.write_s_meter_level,
"secondary_fft": self.handler.write_secondary_fft,
"secondary_demod": self.handler.write_secondary_demod,
"meta": self.metaParser.parse,
"wsjt_demod": self.wsjtParser.parse,
"packet_demod": self.aprsParser.parse,
}
write = writers[t]
threading.Thread(target=self.pump(read_fn, write)).start()
def stop(self):
self.dsp.stop()
self.sdrSource.removeClient(self)
for sub in self.subscriptions:
sub.cancel()
self.subscriptions = []
def setProperty(self, prop, value):
self.localProps.getProperty(prop).setValue(value)
def getClientClass(self):
return SdrSource.CLIENT_USER
def onStateChange(self, state):
if state == SdrSource.STATE_RUNNING:
logger.debug("received STATE_RUNNING, attempting DspSource restart")
self.dsp.start()
elif state == SdrSource.STATE_STOPPING:
logger.debug("received STATE_STOPPING, shutting down DspSource")
self.dsp.stop()
elif state == SdrSource.STATE_FAILED:
logger.debug("received STATE_FAILED, shutting down DspSource")
self.dsp.stop()
self.handler.handleSdrFailure("sdr device failed")
def onBusyStateChange(self, state):
pass
class CpuUsageThread(threading.Thread):
sharedInstance = None
@staticmethod
def getSharedInstance():
if CpuUsageThread.sharedInstance is None:
CpuUsageThread.sharedInstance = CpuUsageThread()
return CpuUsageThread.sharedInstance
def __init__(self):
self.clients = []
self.doRun = True
self.last_worktime = 0
self.last_idletime = 0
self.endEvent = threading.Event()
super().__init__()
def run(self):
while self.doRun:
try:
cpu_usage = self.get_cpu_usage()
except:
cpu_usage = 0
for c in self.clients:
c.write_cpu_usage(cpu_usage)
self.endEvent.wait(timeout=3)
logger.debug("cpu usage thread shut down")
def get_cpu_usage(self):
try:
f = open("/proc/stat", "r")
except:
return 0 # Workaround, possibly we're on a Mac
line = ""
while not "cpu " in line:
line = f.readline()
f.close()
spl = line.split(" ")
worktime = int(spl[2]) + int(spl[3]) + int(spl[4])
idletime = int(spl[5])
dworktime = worktime - self.last_worktime
didletime = idletime - self.last_idletime
rate = float(dworktime) / (didletime + dworktime)
self.last_worktime = worktime
self.last_idletime = idletime
if self.last_worktime == 0:
return 0
return rate
def add_client(self, c):
self.clients.append(c)
if not self.is_alive():
self.start()
def remove_client(self, c):
try:
self.clients.remove(c)
except ValueError:
pass
if not self.clients:
self.shutdown()
def shutdown(self):
CpuUsageThread.sharedInstance = None
self.doRun = False
self.endEvent.set()
class TooManyClientsException(Exception):
pass
class ClientRegistry(object):
sharedInstance = None
creationLock = threading.Lock()
@staticmethod
def getSharedInstance():
with ClientRegistry.creationLock:
if ClientRegistry.sharedInstance is None:
ClientRegistry.sharedInstance = ClientRegistry()
return ClientRegistry.sharedInstance
def __init__(self):
self.clients = []
Metrics.getSharedInstance().addMetric("openwebrx.users", DirectMetric(self.clientCount))
super().__init__()
def broadcast(self):
n = self.clientCount()
for c in self.clients:
c.write_clients(n)
def addClient(self, client):
pm = PropertyManager.getSharedInstance()
if len(self.clients) >= pm["max_clients"]:
raise TooManyClientsException()
self.clients.append(client)
self.broadcast()
def clientCount(self):
return len(self.clients)
def removeClient(self, client):
try:
self.clients.remove(client)
except ValueError:
pass
self.broadcast()

286
owrx/source/__init__.py Normal file
View File

@ -0,0 +1,286 @@
from owrx.config import PropertyManager
import threading
import subprocess
import os
import socket
import shlex
import time
import signal
import logging
logger = logging.getLogger(__name__)
class SdrSource(object):
STATE_STOPPED = 0
STATE_STARTING = 1
STATE_RUNNING = 2
STATE_STOPPING = 3
STATE_TUNING = 4
STATE_FAILED = 5
BUSYSTATE_IDLE = 0
BUSYSTATE_BUSY = 1
CLIENT_INACTIVE = 0
CLIENT_BACKGROUND = 1
CLIENT_USER = 2
def __init__(self, id, props, port):
self.id = id
self.props = props
self.profile_id = None
self.activateProfile()
self.rtlProps = self.props.collect(*self.getEventNames()).defaults(PropertyManager.getSharedInstance())
self.wireEvents()
self.port = port
self.monitor = None
self.clients = []
self.spectrumClients = []
self.spectrumThread = None
self.process = None
self.modificationLock = threading.Lock()
self.failed = False
self.state = SdrSource.STATE_STOPPED
self.busyState = SdrSource.BUSYSTATE_IDLE
def getEventNames(self):
return [
"samp_rate",
"nmux_memory",
"center_freq",
"ppm",
"rf_gain",
"lna_gain",
"rf_amp",
"antenna",
"if_gain",
"lfo_offset",
]
def wireEvents(self):
def restart(name, value):
logger.debug("restarting sdr source due to property change: {0} changed to {1}".format(name, value))
self.stop()
self.start()
self.rtlProps.wire(restart)
# override this in subclasses
def getCommand(self):
pass
# override this in subclasses, if necessary
def getFormatConversion(self):
return None
def activateProfile(self, profile_id=None):
profiles = self.props["profiles"]
if profile_id is None:
profile_id = list(profiles.keys())[0]
if profile_id == self.profile_id:
return
logger.debug("activating profile {0}".format(profile_id))
self.profile_id = profile_id
profile = profiles[profile_id]
self.props["profile_id"] = profile_id
for (key, value) in profile.items():
# skip the name, that would overwrite the source name.
if key == "name":
continue
self.props[key] = value
def getId(self):
return self.id
def getProfileId(self):
return self.profile_id
def getProfiles(self):
return self.props["profiles"]
def getName(self):
return self.props["name"]
def getProps(self):
return self.props
def getPort(self):
return self.port
def useNmux(self):
return True
def getCommandValues(self):
dict = self.rtlProps.collect(*self.getEventNames()).__dict__()
if "lfo_offset" in dict and dict["lfo_offset"] is not None:
dict["tuner_freq"] = dict["center_freq"] + dict["lfo_offset"]
else:
dict["tuner_freq"] = dict["center_freq"]
return dict
def start(self):
self.modificationLock.acquire()
if self.monitor:
self.modificationLock.release()
return
props = self.rtlProps
cmd = self.getCommand().format(**self.getCommandValues())
format_conversion = self.getFormatConversion()
if format_conversion is not None:
cmd += " | " + format_conversion
if self.useNmux():
nmux_bufcnt = nmux_bufsize = 0
while nmux_bufsize < props["samp_rate"] / 4:
nmux_bufsize += 4096
while nmux_bufsize * nmux_bufcnt < props["nmux_memory"] * 1e6:
nmux_bufcnt += 1
if nmux_bufcnt == 0 or nmux_bufsize == 0:
logger.error(
"Error: nmux_bufsize or nmux_bufcnt is zero. These depend on nmux_memory and samp_rate options in config_webrx.py"
)
self.modificationLock.release()
return
logger.debug("nmux_bufsize = %d, nmux_bufcnt = %d" % (nmux_bufsize, nmux_bufcnt))
cmd = cmd + " | nmux --bufsize %d --bufcnt %d --port %d --address 127.0.0.1" % (
nmux_bufsize,
nmux_bufcnt,
self.port,
)
# don't use shell mode for commands without piping
if "|" in cmd:
self.process = subprocess.Popen(cmd, shell=True, preexec_fn=os.setpgrp)
else:
# preexec_fn can go as soon as there's no piped commands left
# the os.killpg call must be replaced with something more reasonable at the same time
self.process = subprocess.Popen(shlex.split(cmd), preexec_fn=os.setpgrp)
logger.info("Started rtl source: " + cmd)
available = False
def wait_for_process_to_end():
rc = self.process.wait()
logger.debug("shut down with RC={0}".format(rc))
self.monitor = None
self.monitor = threading.Thread(target=wait_for_process_to_end)
self.monitor.start()
retries = 1000
while retries > 0:
retries -= 1
if self.monitor is None:
break
testsock = socket.socket()
try:
testsock.connect(("127.0.0.1", self.getPort()))
testsock.close()
available = True
break
except:
time.sleep(0.1)
if not available:
self.failed = True
self.postStart()
self.modificationLock.release()
self.setState(SdrSource.STATE_FAILED if self.failed else SdrSource.STATE_RUNNING)
def postStart(self):
pass
def isAvailable(self):
return self.monitor is not None
def isFailed(self):
return self.failed
def stop(self):
self.setState(SdrSource.STATE_STOPPING)
self.modificationLock.acquire()
if self.process is not None:
try:
os.killpg(os.getpgid(self.process.pid), signal.SIGTERM)
except ProcessLookupError:
# been killed by something else, ignore
pass
if self.monitor:
self.monitor.join()
self.sleepOnRestart()
self.modificationLock.release()
self.setState(SdrSource.STATE_STOPPED)
def sleepOnRestart(self):
pass
def hasClients(self, *args):
clients = [c for c in self.clients if c.getClientClass() in args]
return len(clients) > 0
def addClient(self, c):
self.clients.append(c)
hasUsers = self.hasClients(SdrSource.CLIENT_USER)
hasBackgroundTasks = self.hasClients(SdrSource.CLIENT_BACKGROUND)
if hasUsers or hasBackgroundTasks:
self.start()
self.setBusyState(SdrSource.BUSYSTATE_BUSY if hasUsers else SdrSource.BUSYSTATE_IDLE)
def removeClient(self, c):
try:
self.clients.remove(c)
except ValueError:
pass
hasUsers = self.hasClients(SdrSource.CLIENT_USER)
hasBackgroundTasks = self.hasClients(SdrSource.CLIENT_BACKGROUND)
self.setBusyState(SdrSource.BUSYSTATE_BUSY if hasUsers else SdrSource.BUSYSTATE_IDLE)
if not hasUsers and not hasBackgroundTasks:
self.stop()
def addSpectrumClient(self, c):
self.spectrumClients.append(c)
if self.spectrumThread is None:
# local import due to circular depencency
from owrx.fft import SpectrumThread
self.spectrumThread = SpectrumThread(self)
self.spectrumThread.start()
def removeSpectrumClient(self, c):
try:
self.spectrumClients.remove(c)
except ValueError:
pass
if not self.spectrumClients and self.spectrumThread is not None:
self.spectrumThread.stop()
self.spectrumThread = None
def writeSpectrumData(self, data):
for c in self.spectrumClients:
c.write_spectrum_data(data)
def setState(self, state):
if state == self.state:
return
self.state = state
for c in self.clients:
c.onStateChange(state)
def setBusyState(self, state):
if state == self.busyState:
return
self.busyState = state
for c in self.clients:
c.onBusyStateChange(state)

23
owrx/source/airspy.py Normal file
View File

@ -0,0 +1,23 @@
from .soapy import SoapyConnectorSource
class AirspySource(SoapyConnectorSource):
def getDriver(self):
return "airspy"
def getEventNames(self):
return super().getEventNames() + ["bias_tee"]
def getCommand(self):
cmd = (
"soapy_connector -p {port} -c {controlPort}".format(port=self.port, controlPort=self.controlPort)
+ ' -s {samp_rate} -f {tuner_freq} -g "{rf_gain}" -P {ppm} -d "{device}"'
)
values = self.getCommandValues()
if values["iqswap"]:
cmd += " -i"
if self.rtlProps["rtltcp_compat"]:
cmd += " -r"
if values["bias_tee"]:
cmd += " -t biastee=true"
return cmd

6
owrx/source/airspyhf.py Normal file
View File

@ -0,0 +1,6 @@
from .airspy import AirspySource
class AirspyhfSource(AirspySource):
def getDriver(self):
return "airspyhf"

63
owrx/source/connector.py Normal file
View File

@ -0,0 +1,63 @@
from . import SdrSource
from owrx.socket import getAvailablePort
import socket
import logging
logger = logging.getLogger(__name__)
class ConnectorSource(SdrSource):
def __init__(self, id, props, port):
super().__init__(id, props, port)
self.controlSocket = None
self.controlPort = getAvailablePort()
def getEventNames(self):
return [
"samp_rate",
"center_freq",
"ppm",
"rf_gain",
"device",
"iqswap",
"lfo_offset",
"rtltcp_compat",
]
def sendControlMessage(self, prop, value):
logger.debug("sending property change over control socket: {0} changed to {1}".format(prop, value))
self.controlSocket.sendall("{prop}:{value}\n".format(prop=prop, value=value).encode())
def wireEvents(self):
def reconfigure(prop, value):
if self.monitor is None:
return
if (
(prop == "center_freq" or prop == "lfo_offset")
and "lfo_offset" in self.rtlProps
and self.rtlProps["lfo_offset"] is not None
):
freq = self.rtlProps["center_freq"] + self.rtlProps["lfo_offset"]
self.sendControlMessage("center_freq", freq)
else:
self.sendControlMessage(prop, value)
self.rtlProps.wire(reconfigure)
def postStart(self):
logger.debug("opening control socket...")
self.controlSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.controlSocket.connect(("localhost", self.controlPort))
def stop(self):
super().stop()
if self.controlSocket:
self.controlSocket.close()
self.controlSocket = None
def getFormatConversion(self):
return None
def useNmux(self):
return False

9
owrx/source/fifi.py Normal file
View File

@ -0,0 +1,9 @@
from . import SdrSource
class FifiSdrSource(SdrSource):
def getCommand(self):
return "arecord -D hw:2,0 -f S16_LE -r {samp_rate} -c2 -"
def getFormatConversion(self):
return "csdr convert_s16_f | csdr gain_ff 30"

9
owrx/source/hackrf.py Normal file
View File

@ -0,0 +1,9 @@
from . import SdrSource
class HackrfSource(SdrSource):
def getCommand(self):
return "hackrf_transfer -s {samp_rate} -f {tuner_freq} -g {rf_gain} -l{lna_gain} -a{rf_amp} -r-"
def getFormatConversion(self):
return "csdr convert_s8_f"

98
owrx/source/resampler.py Normal file
View File

@ -0,0 +1,98 @@
from . import SdrSource
import subprocess
import threading
import os
import socket
import time
import logging
logger = logging.getLogger(__name__)
class Resampler(SdrSource):
def __init__(self, props, port, sdr):
sdrProps = sdr.getProps()
self.shift = (sdrProps["center_freq"] - props["center_freq"]) / sdrProps["samp_rate"]
self.decimation = int(float(sdrProps["samp_rate"]) / props["samp_rate"])
if_samp_rate = sdrProps["samp_rate"] / self.decimation
self.transition_bw = 0.15 * (if_samp_rate / float(sdrProps["samp_rate"]))
props["samp_rate"] = if_samp_rate
self.sdr = sdr
super().__init__(None, props, port)
def start(self):
if self.isFailed():
return
self.modificationLock.acquire()
if self.monitor:
self.modificationLock.release()
return
self.setState(SdrSource.STATE_STARTING)
props = self.rtlProps
resampler_command = [
"nc -v 127.0.0.1 {nc_port}".format(nc_port=self.sdr.getPort()),
"csdr shift_addition_cc {shift}".format(shift=self.shift),
"csdr fir_decimate_cc {decimation} {ddc_transition_bw} HAMMING".format(
decimation=self.decimation, ddc_transition_bw=self.transition_bw
),
]
nmux_bufcnt = nmux_bufsize = 0
while nmux_bufsize < props["samp_rate"] / 4:
nmux_bufsize += 4096
while nmux_bufsize * nmux_bufcnt < props["nmux_memory"] * 1e6:
nmux_bufcnt += 1
if nmux_bufcnt == 0 or nmux_bufsize == 0:
logger.error(
"Error: nmux_bufsize or nmux_bufcnt is zero. These depend on nmux_memory and samp_rate options in config_webrx.py"
)
self.modificationLock.release()
return
logger.debug("nmux_bufsize = %d, nmux_bufcnt = %d" % (nmux_bufsize, nmux_bufcnt))
resampler_command += [
"nmux --bufsize %d --bufcnt %d --port %d --address 127.0.0.1" % (nmux_bufsize, nmux_bufcnt, self.port)
]
cmd = " | ".join(resampler_command)
logger.debug("resampler command: %s", cmd)
self.process = subprocess.Popen(cmd, shell=True, preexec_fn=os.setpgrp)
logger.info("Started resampler source: " + cmd)
available = False
def wait_for_process_to_end():
rc = self.process.wait()
logger.debug("shut down with RC={0}".format(rc))
self.monitor = None
self.monitor = threading.Thread(target=wait_for_process_to_end)
self.monitor.start()
retries = 1000
while retries > 0:
retries -= 1
if self.monitor is None:
break
testsock = socket.socket()
try:
testsock.connect(("127.0.0.1", self.getPort()))
testsock.close()
available = True
break
except:
time.sleep(0.1)
if not available:
self.failed = True
self.modificationLock.release()
self.setState(SdrSource.STATE_FAILED if self.failed else SdrSource.STATE_RUNNING)
def activateProfile(self, profile_id=None):
pass

16
owrx/source/rtl_sdr.py Normal file
View File

@ -0,0 +1,16 @@
from .connector import ConnectorSource
class RtlSdrSource(ConnectorSource):
def getCommand(self):
cmd = (
"rtl_connector -p {port} -c {controlPort}".format(port=self.port, controlPort=self.controlPort)
+ " -s {samp_rate} -f {tuner_freq} -g {rf_gain} -P {ppm}"
)
if "device" in self.rtlProps and self.rtlProps["device"] is not None:
cmd += ' -d "{device}"'
if self.rtlProps["iqswap"]:
cmd += " -i"
if self.rtlProps["rtltcp_compat"]:
cmd += " -r"
return cmd

21
owrx/source/sdrplay.py Normal file
View File

@ -0,0 +1,21 @@
from .soapy import SoapyConnectorSource
class SdrplaySource(SoapyConnectorSource):
def getDriver(self):
return "sdrplay"
def getEventNames(self):
return super().getEventNames() + ["antenna"]
def getCommand(self):
cmd = (
"soapy_connector -p {port} -c {controlPort}".format(port=self.port, controlPort=self.controlPort)
+ ' -s {samp_rate} -f {tuner_freq} -g "{rf_gain}" -P {ppm} -a "{antenna}" -d "{device}"'
)
values = self.getCommandValues()
if values["iqswap"]:
cmd += " -i"
if self.rtlProps["rtltcp_compat"]:
cmd += " -r"
return cmd

46
owrx/source/soapy.py Normal file
View File

@ -0,0 +1,46 @@
from .connector import ConnectorSource
class SoapyConnectorSource(ConnectorSource):
"""
must be implemented by child classes to be able to build a driver-based device selector by default.
return value must be the corresponding soapy driver identifier.
"""
def getDriver(self):
pass
def parseDeviceString(self, dstr):
def decodeComponent(c):
kv = c.split("=", 1)
if len(kv) < 2:
return c
else:
return {kv[0]: kv[1]}
return [decodeComponent(c) for c in dstr.split(",")]
def encodeDeviceString(self, dobj):
def encodeComponent(c):
if isinstance(c, str):
return c
else:
return ",".join(["{0}={1}".format(key, value) for key, value in c.items()])
return ",".join([encodeComponent(c) for c in dobj])
"""
this method always attempts to inject a driver= part into the soapysdr query, depending on what connector was used.
this prevents the soapy_connector from using the wrong device in scenarios where there's no same-type sdrs.
"""
def getCommandValues(self):
values = super().getCommandValues()
if "device" in values and values["device"] is not None:
parsed = self.parseDeviceString(values["device"])
parsed = [v for v in parsed if "driver" not in v]
parsed += [{"driver": self.getDriver()}]
values["device"] = self.encodeDeviceString(parsed)
else:
values["device"] = "driver={0}".format(self.getDriver())
return values