635 lines
20 KiB
Python
635 lines
20 KiB
Python
from owrx.config import Config
|
|
import threading
|
|
import subprocess
|
|
import os
|
|
import socket
|
|
import shlex
|
|
import time
|
|
import signal
|
|
import pkgutil
|
|
from abc import ABC, abstractmethod
|
|
from owrx.command import CommandMapper
|
|
from owrx.socket import getAvailablePort
|
|
from owrx.property import PropertyStack, PropertyLayer, PropertyFilter, PropertyCarousel, PropertyDeleted
|
|
from owrx.property.filter import ByLambda
|
|
from owrx.form.input import Input, TextInput, NumberInput, CheckboxInput, ModesInput, ExponentialInput
|
|
from owrx.form.input.converter import OptionalConverter
|
|
from owrx.form.input.device import GainInput, SchedulerInput, WaterfallLevelsInput
|
|
from owrx.form.input.validator import RequiredValidator
|
|
from owrx.form.section import OptionalSection
|
|
from owrx.feature import FeatureDetector
|
|
from owrx.log import LogPipe, HistoryHandler
|
|
from typing import List
|
|
from enum import Enum
|
|
|
|
from pycsdr.modules import TcpSource, Buffer
|
|
from pycsdr.types import Format
|
|
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class SdrSourceState(Enum):
|
|
STOPPED = "Stopped"
|
|
STARTING = "Starting"
|
|
RUNNING = "Running"
|
|
STOPPING = "Stopping"
|
|
TUNING = "Tuning"
|
|
|
|
def __str__(self):
|
|
return self.value
|
|
|
|
|
|
class SdrBusyState(Enum):
|
|
IDLE = 1
|
|
BUSY = 2
|
|
|
|
|
|
class SdrClientClass(Enum):
|
|
INACTIVE = 1
|
|
BACKGROUND = 2
|
|
USER = 3
|
|
|
|
|
|
class SdrSourceEventClient(object):
|
|
def onStateChange(self, state: SdrSourceState):
|
|
pass
|
|
|
|
def onBusyStateChange(self, state: SdrBusyState):
|
|
pass
|
|
|
|
def onFail(self):
|
|
pass
|
|
|
|
def onShutdown(self):
|
|
pass
|
|
|
|
def onDisable(self):
|
|
pass
|
|
|
|
def onEnable(self):
|
|
pass
|
|
|
|
def getClientClass(self) -> SdrClientClass:
|
|
return SdrClientClass.INACTIVE
|
|
|
|
|
|
class SdrProfileCarousel(PropertyCarousel):
|
|
def __init__(self, props):
|
|
super().__init__()
|
|
if "profiles" not in props:
|
|
return
|
|
|
|
for profile_id, profile in props["profiles"].items():
|
|
self.addLayer(profile_id, profile)
|
|
# activate first available profile
|
|
self.switch()
|
|
|
|
props["profiles"].wire(self.handleProfileUpdate)
|
|
|
|
def addLayer(self, profile_id, profile):
|
|
profile_stack = PropertyStack()
|
|
profile_stack.addLayer(0, PropertyLayer(profile_id=profile_id).readonly())
|
|
profile_stack.addLayer(1, profile)
|
|
super().addLayer(profile_id, profile_stack)
|
|
|
|
def handleProfileUpdate(self, changes):
|
|
for profile_id, profile in changes.items():
|
|
if profile is PropertyDeleted:
|
|
self.removeLayer(profile_id)
|
|
else:
|
|
self.addLayer(profile_id, profile)
|
|
|
|
def _getDefaultLayer(self):
|
|
# return the first available profile, or the default empty layer if we don't have any
|
|
if self.layers:
|
|
return next(iter(self.layers.values()))
|
|
return super()._getDefaultLayer()
|
|
|
|
|
|
class SdrSource(ABC):
|
|
def __init__(self, id, props):
|
|
self.id = id
|
|
|
|
self.commandMapper = None
|
|
self.tcpSource = None
|
|
self.buffer = None
|
|
self.logger = logger.getChild(id) if id is not None else logger
|
|
self.logger.addHandler(HistoryHandler.getHandler(self.logger.name))
|
|
self.stdoutPipe = None
|
|
self.stderrPipe = None
|
|
|
|
self.props = PropertyStack()
|
|
|
|
# layer 0 reserved for profile properties
|
|
self.profileCarousel = SdrProfileCarousel(props)
|
|
# prevent profile names from overriding the device name
|
|
self.props.addLayer(0, PropertyFilter(self.profileCarousel, ByLambda(lambda x: x != "name")))
|
|
|
|
# props from our device config
|
|
self.props.addLayer(1, props)
|
|
|
|
# the sdr_id is constant, so we put it in a separate layer
|
|
# this is used to detect device changes, that are then sent to the client
|
|
self.props.addLayer(2, PropertyLayer(sdr_id=id).readonly())
|
|
|
|
# finally, accept global config properties from the top-level config
|
|
self.props.addLayer(3, Config.get())
|
|
|
|
self.sdrProps = self.props.filter(*self.getEventNames())
|
|
|
|
self.wireEvents()
|
|
|
|
self.port = getAvailablePort()
|
|
self.monitor = None
|
|
self.clients = []
|
|
self.spectrumClients = []
|
|
self.spectrumThread = None
|
|
self.spectrumLock = threading.Lock()
|
|
self.process = None
|
|
self.modificationLock = threading.Lock()
|
|
self.state = SdrSourceState.STOPPED
|
|
self.enabled = "enabled" not in props or props["enabled"]
|
|
props.filter("enabled").wire(self._handleEnableChanged)
|
|
self.failed = False
|
|
self.busyState = SdrBusyState.IDLE
|
|
|
|
self.validateProfiles()
|
|
|
|
if self.isAlwaysOn() and self.isEnabled():
|
|
self.start()
|
|
|
|
def isEnabled(self):
|
|
return self.enabled
|
|
|
|
def _handleEnableChanged(self, changes):
|
|
if "enabled" in changes and changes["enabled"] is not PropertyDeleted:
|
|
self.enabled = changes["enabled"]
|
|
else:
|
|
self.enabled = True
|
|
if not self.enabled:
|
|
self.stop()
|
|
for c in self.clients.copy():
|
|
if self.isEnabled():
|
|
c.onEnable()
|
|
else:
|
|
c.onDisable()
|
|
|
|
def isFailed(self):
|
|
return self.failed
|
|
|
|
def fail(self):
|
|
self.failed = True
|
|
for c in self.clients.copy():
|
|
c.onFail()
|
|
|
|
def validateProfiles(self):
|
|
props = PropertyStack()
|
|
props.addLayer(1, self.props)
|
|
for id, p in self.props["profiles"].items():
|
|
props.replaceLayer(0, p)
|
|
if "center_freq" not in props:
|
|
self.logger.warning('Profile "%s" does not specify a center_freq', id)
|
|
continue
|
|
if "samp_rate" not in props:
|
|
self.logger.warning('Profile "%s" does not specify a samp_rate', id)
|
|
continue
|
|
if "start_freq" in props:
|
|
start_freq = props["start_freq"]
|
|
srh = props["samp_rate"] / 2
|
|
center_freq = props["center_freq"]
|
|
if start_freq < center_freq - srh or start_freq > center_freq + srh:
|
|
self.logger.warning('start_freq for profile "%s" is out of range', id)
|
|
|
|
def isAlwaysOn(self):
|
|
return "always-on" in self.props and self.props["always-on"]
|
|
|
|
def getEventNames(self):
|
|
return [
|
|
"samp_rate",
|
|
"center_freq",
|
|
"ppm",
|
|
"rf_gain",
|
|
"lfo_offset",
|
|
] + list(self.getCommandMapper().keys())
|
|
|
|
def getCommandMapper(self):
|
|
if self.commandMapper is None:
|
|
self.commandMapper = CommandMapper()
|
|
return self.commandMapper
|
|
|
|
@abstractmethod
|
|
def onPropertyChange(self, changes):
|
|
pass
|
|
|
|
def wireEvents(self):
|
|
self.sdrProps.wire(self.onPropertyChange)
|
|
|
|
def getCommand(self):
|
|
return [self.getCommandMapper().map(self.getCommandValues())]
|
|
|
|
def activateProfile(self, profile_id):
|
|
self.logger.debug("activating profile {0} for {1}".format(profile_id, self.getId()))
|
|
try:
|
|
self.profileCarousel.switch(profile_id)
|
|
except KeyError:
|
|
self.logger.warning("invalid profile %s for sdr %s. ignoring", profile_id, self.getId())
|
|
|
|
def getId(self):
|
|
return self.id
|
|
|
|
def getProfileId(self):
|
|
return self.props["profile_id"]
|
|
|
|
def getProfiles(self):
|
|
return self.props["profiles"]
|
|
|
|
def getName(self):
|
|
return self.props["name"]
|
|
|
|
def getProps(self):
|
|
return self.props
|
|
|
|
def getPort(self):
|
|
return self.port
|
|
|
|
def _getTcpSourceFormat(self):
|
|
return Format.COMPLEX_FLOAT
|
|
|
|
def _getTcpSource(self):
|
|
with self.modificationLock:
|
|
if self.tcpSource is None:
|
|
self.tcpSource = TcpSource(self.port, self._getTcpSourceFormat())
|
|
return self.tcpSource
|
|
|
|
def getBuffer(self):
|
|
if self.buffer is None:
|
|
self.buffer = Buffer(Format.COMPLEX_FLOAT)
|
|
self._getTcpSource().setWriter(self.buffer)
|
|
return self.buffer
|
|
|
|
def getCommandValues(self):
|
|
dict = self.sdrProps.__dict__()
|
|
if "lfo_offset" in dict and dict["lfo_offset"] is not None:
|
|
dict["tuner_freq"] = dict["center_freq"] + dict["lfo_offset"]
|
|
else:
|
|
dict["tuner_freq"] = dict["center_freq"]
|
|
return dict
|
|
|
|
def start(self):
|
|
with self.modificationLock:
|
|
if self.monitor:
|
|
return
|
|
|
|
if self.isFailed():
|
|
return
|
|
|
|
try:
|
|
self.preStart()
|
|
except Exception:
|
|
self.logger.exception("Exception during preStart()")
|
|
|
|
cmd = self.getCommand()
|
|
cmd = [c for c in cmd if c is not None]
|
|
|
|
self.stdoutPipe = LogPipe(logging.INFO, self.logger, "STDOUT")
|
|
self.stderrPipe = LogPipe(logging.WARNING, self.logger, "STDERR")
|
|
|
|
# don't use shell mode for commands without piping
|
|
if len(cmd) > 1:
|
|
# multiple commands with pipes
|
|
cmd = "|".join(cmd)
|
|
self.process = subprocess.Popen(
|
|
cmd,
|
|
shell=True,
|
|
start_new_session=True,
|
|
stdout=self.stdoutPipe,
|
|
stderr=self.stderrPipe
|
|
)
|
|
else:
|
|
# single command
|
|
cmd = cmd[0]
|
|
# start_new_session can go as soon as there's no piped commands left
|
|
# the os.killpg call must be replaced with something more reasonable at the same time
|
|
self.process = subprocess.Popen(
|
|
shlex.split(cmd),
|
|
start_new_session=True,
|
|
stdout=self.stdoutPipe,
|
|
stderr=self.stderrPipe
|
|
)
|
|
self.logger.info("Started sdr source: " + cmd)
|
|
|
|
available = False
|
|
failed = False
|
|
|
|
def wait_for_process_to_end():
|
|
nonlocal failed
|
|
rc = self.process.wait()
|
|
self.logger.debug("shut down with RC={0}".format(rc))
|
|
self.process = None
|
|
self.monitor = None
|
|
self.stdoutPipe.close()
|
|
self.stdoutPipe = None
|
|
self.stderrPipe.close()
|
|
self.stderrPipe = None
|
|
if self.getState() is SdrSourceState.RUNNING:
|
|
self.fail()
|
|
else:
|
|
failed = True
|
|
self.setState(SdrSourceState.STOPPED)
|
|
|
|
self.monitor = threading.Thread(target=wait_for_process_to_end, name="source_monitor")
|
|
self.monitor.start()
|
|
|
|
retries = 1000
|
|
while retries > 0 and not failed:
|
|
retries -= 1
|
|
if self.monitor is None:
|
|
break
|
|
testsock = socket.socket()
|
|
testsock.settimeout(1)
|
|
try:
|
|
testsock.connect(("127.0.0.1", self.getPort()))
|
|
testsock.close()
|
|
available = True
|
|
break
|
|
except:
|
|
time.sleep(0.1)
|
|
|
|
if not available:
|
|
failed = True
|
|
|
|
try:
|
|
self.postStart()
|
|
except Exception:
|
|
self.logger.exception("Exception during postStart()")
|
|
failed = True
|
|
|
|
if failed:
|
|
self.fail()
|
|
else:
|
|
self.setState(SdrSourceState.RUNNING)
|
|
|
|
def preStart(self):
|
|
"""
|
|
override this method in subclasses if there's anything to be done before starting up the actual SDR
|
|
"""
|
|
pass
|
|
|
|
def postStart(self):
|
|
"""
|
|
override this method in subclasses if there's things to do after the actual SDR has started up
|
|
"""
|
|
pass
|
|
|
|
def isAvailable(self):
|
|
return self.monitor is not None
|
|
|
|
def stop(self):
|
|
with self.modificationLock:
|
|
if self.process is not None:
|
|
self.setState(SdrSourceState.STOPPING)
|
|
try:
|
|
os.killpg(os.getpgid(self.process.pid), signal.SIGTERM)
|
|
if self.monitor:
|
|
# wait 10 seconds for a regular shutdown
|
|
self.monitor.join(10)
|
|
# if the monitor is still running, the process still hasn't ended, so kill it
|
|
if self.monitor:
|
|
self.logger.warning("source has not shut down normally within 10 seconds, sending SIGKILL")
|
|
os.killpg(os.getpgid(self.process.pid), signal.SIGKILL)
|
|
except ProcessLookupError:
|
|
# been killed by something else, ignore
|
|
pass
|
|
except AttributeError:
|
|
# self.process has been overwritten by the monitor since we checked it, which is fine
|
|
pass
|
|
if self.monitor:
|
|
self.monitor.join()
|
|
if self.tcpSource is not None:
|
|
self.tcpSource.stop()
|
|
self.tcpSource = None
|
|
self.buffer = None
|
|
|
|
def shutdown(self):
|
|
self.stop()
|
|
for c in self.clients.copy():
|
|
c.onShutdown()
|
|
|
|
def getClients(self, *args):
|
|
if not args:
|
|
return self.clients
|
|
return [c for c in self.clients if c.getClientClass() in args]
|
|
|
|
def hasClients(self, *args):
|
|
return len(self.getClients(*args)) > 0
|
|
|
|
def addClient(self, c: SdrSourceEventClient):
|
|
if c in self.clients:
|
|
return
|
|
self.clients.append(c)
|
|
c.onStateChange(self.getState())
|
|
hasUsers = self.hasClients(SdrClientClass.USER)
|
|
hasBackgroundTasks = self.hasClients(SdrClientClass.BACKGROUND)
|
|
if hasUsers or hasBackgroundTasks:
|
|
self.start()
|
|
self.setBusyState(SdrBusyState.BUSY if hasUsers else SdrBusyState.IDLE)
|
|
|
|
def removeClient(self, c: SdrSourceEventClient):
|
|
if c not in self.clients:
|
|
return
|
|
|
|
self.clients.remove(c)
|
|
|
|
self.checkStatus()
|
|
|
|
def checkStatus(self):
|
|
hasUsers = self.hasClients(SdrClientClass.USER)
|
|
self.setBusyState(SdrBusyState.BUSY if hasUsers else SdrBusyState.IDLE)
|
|
|
|
# no need to check for users if we are always-on
|
|
if self.isAlwaysOn():
|
|
return
|
|
|
|
hasBackgroundTasks = self.hasClients(SdrClientClass.BACKGROUND)
|
|
if not hasUsers and not hasBackgroundTasks:
|
|
self.stop()
|
|
|
|
def addSpectrumClient(self, c):
|
|
if c in self.spectrumClients:
|
|
return
|
|
|
|
# local import due to circular depencency
|
|
from owrx.fft import SpectrumThread
|
|
|
|
self.spectrumClients.append(c)
|
|
with self.spectrumLock:
|
|
if self.spectrumThread is None:
|
|
self.spectrumThread = SpectrumThread(self)
|
|
self.spectrumThread.start()
|
|
|
|
def removeSpectrumClient(self, c):
|
|
try:
|
|
self.spectrumClients.remove(c)
|
|
except ValueError:
|
|
pass
|
|
with self.spectrumLock:
|
|
if not self.spectrumClients and self.spectrumThread is not None:
|
|
self.spectrumThread.stop()
|
|
self.spectrumThread = None
|
|
|
|
def writeSpectrumData(self, data):
|
|
for c in self.spectrumClients:
|
|
c.write_spectrum_data(data)
|
|
|
|
def getState(self) -> SdrSourceState:
|
|
return self.state
|
|
|
|
def setState(self, state: SdrSourceState):
|
|
if state == self.state:
|
|
return
|
|
self.state = state
|
|
for c in self.clients.copy():
|
|
c.onStateChange(state)
|
|
|
|
def setBusyState(self, state: SdrBusyState):
|
|
if state == self.busyState:
|
|
return
|
|
self.busyState = state
|
|
for c in self.clients.copy():
|
|
c.onBusyStateChange(state)
|
|
|
|
|
|
class SdrDeviceDescriptionMissing(Exception):
|
|
pass
|
|
|
|
|
|
class SdrDeviceDescription(object):
|
|
@staticmethod
|
|
def getByType(sdr_type: str) -> "SdrDeviceDescription":
|
|
try:
|
|
className = "".join(x for x in sdr_type.title() if x.isalnum()) + "DeviceDescription"
|
|
module = __import__("owrx.source.{0}".format(sdr_type), fromlist=[className])
|
|
cls = getattr(module, className)
|
|
return cls()
|
|
except (ImportError, AttributeError):
|
|
raise SdrDeviceDescriptionMissing("Device description for type {} not available".format(sdr_type))
|
|
|
|
@staticmethod
|
|
def getTypes():
|
|
def get_description(module_name):
|
|
try:
|
|
description = SdrDeviceDescription.getByType(module_name)
|
|
return description.getName()
|
|
except SdrDeviceDescriptionMissing:
|
|
return None
|
|
|
|
descriptions = {
|
|
module_name: get_description(module_name) for _, module_name, _ in pkgutil.walk_packages(__path__)
|
|
}
|
|
# filter out empty names and unavailable types
|
|
fd = FeatureDetector()
|
|
return {k: v for k, v in descriptions.items() if v is not None and fd.is_available(k)}
|
|
|
|
def getName(self):
|
|
"""
|
|
must be overridden with a textual representation of the device, to be used for device type selection
|
|
|
|
:return: str
|
|
"""
|
|
return None
|
|
|
|
def supportsPpm(self):
|
|
"""
|
|
can be overridden if the device does not support configuring PPM correction
|
|
|
|
:return: bool
|
|
"""
|
|
return True
|
|
|
|
def getDeviceInputs(self) -> List[Input]:
|
|
keys = self.getDeviceMandatoryKeys() + self.getDeviceOptionalKeys()
|
|
return [TextInput("name", "Device name", validator=RequiredValidator())] + [
|
|
i for i in self.getInputs() if i.id in keys
|
|
]
|
|
|
|
def getProfileInputs(self) -> List[Input]:
|
|
keys = self.getProfileMandatoryKeys() + self.getProfileOptionalKeys()
|
|
return [TextInput("name", "Profile name", validator=RequiredValidator())] + [
|
|
i for i in self.getInputs() if i.id in keys
|
|
]
|
|
|
|
def getInputs(self) -> List[Input]:
|
|
return [
|
|
CheckboxInput("enabled", "Enable this device", converter=OptionalConverter(defaultFormValue=True)),
|
|
GainInput("rf_gain", "Device gain", self.hasAgc()),
|
|
NumberInput(
|
|
"ppm",
|
|
"Frequency correction",
|
|
append="ppm",
|
|
),
|
|
CheckboxInput(
|
|
"always-on",
|
|
"Keep device running at all times",
|
|
infotext="Prevents shutdown of the device when idle. Useful for devices with unreliable startup.",
|
|
),
|
|
CheckboxInput(
|
|
"services",
|
|
"Run background services on this device",
|
|
),
|
|
ExponentialInput(
|
|
"lfo_offset",
|
|
"Oscillator offset",
|
|
"Hz",
|
|
infotext="Use this when the actual receiving frequency differs from the frequency to be tuned on the"
|
|
+ " device. <br/> Formula: Center frequency + oscillator offset = sdr tune frequency",
|
|
),
|
|
WaterfallLevelsInput("waterfall_levels", "Waterfall levels"),
|
|
SchedulerInput("scheduler", "Scheduler"),
|
|
ExponentialInput("center_freq", "Center frequency", "Hz"),
|
|
ExponentialInput("samp_rate", "Sample rate", "S/s"),
|
|
ExponentialInput("start_freq", "Initial frequency", "Hz"),
|
|
ModesInput("start_mod", "Initial modulation"),
|
|
NumberInput("initial_squelch_level", "Initial squelch level", append="dBFS"),
|
|
]
|
|
|
|
def hasAgc(self):
|
|
# default is True since most devices have agc. override in subclasses if agc is not available
|
|
return True
|
|
|
|
def getDeviceMandatoryKeys(self):
|
|
return ["name", "enabled"]
|
|
|
|
def getDeviceOptionalKeys(self):
|
|
keys = [
|
|
"always-on",
|
|
"services",
|
|
"rf_gain",
|
|
"lfo_offset",
|
|
"waterfall_levels",
|
|
"scheduler",
|
|
]
|
|
if self.supportsPpm():
|
|
keys += ["ppm"]
|
|
return keys
|
|
|
|
def getProfileMandatoryKeys(self):
|
|
return ["name", "center_freq", "samp_rate", "start_freq", "start_mod"]
|
|
|
|
def getProfileOptionalKeys(self):
|
|
return ["initial_squelch_level", "rf_gain", "lfo_offset", "waterfall_levels"]
|
|
|
|
def getDeviceSection(self):
|
|
return OptionalSection(
|
|
"Device settings", self.getDeviceInputs(), self.getDeviceMandatoryKeys(), self.getDeviceOptionalKeys()
|
|
)
|
|
|
|
def getProfileSection(self):
|
|
return OptionalSection(
|
|
"Profile settings",
|
|
self.getProfileInputs(),
|
|
self.getProfileMandatoryKeys(),
|
|
self.getProfileOptionalKeys(),
|
|
)
|