openwebrx-clone/owrx/source/__init__.py

345 lines
10 KiB
Python
Raw Normal View History

from owrx.config import Config
import threading
import subprocess
import os
import socket
import shlex
import time
import signal
2019-12-27 23:26:45 +00:00
from abc import ABC, abstractmethod
from owrx.command import CommandMapper
from owrx.socket import getAvailablePort
2020-03-24 21:13:42 +00:00
from owrx.property import PropertyStack, PropertyLayer
2021-01-02 02:12:21 +00:00
from pycsdr.modules import SocketClient, Buffer
import logging
logger = logging.getLogger(__name__)
class SdrSourceEventClient(ABC):
@abstractmethod
def onStateChange(self, state):
pass
@abstractmethod
def onBusyStateChange(self, state):
pass
def getClientClass(self):
return SdrSource.CLIENT_INACTIVE
2019-12-27 23:26:45 +00:00
class SdrSource(ABC):
STATE_STOPPED = 0
STATE_STARTING = 1
STATE_RUNNING = 2
STATE_STOPPING = 3
STATE_TUNING = 4
STATE_FAILED = 5
BUSYSTATE_IDLE = 0
BUSYSTATE_BUSY = 1
CLIENT_INACTIVE = 0
CLIENT_BACKGROUND = 1
CLIENT_USER = 2
def __init__(self, id, props):
self.id = id
2020-03-24 21:13:42 +00:00
self.commandMapper = None
self.socketClient = None
self.buffer = None
2020-03-24 21:13:42 +00:00
self.props = PropertyStack()
# layer 0 reserved for profile properties
self.props.addLayer(1, props)
self.props.addLayer(2, Config.get())
2020-03-24 21:52:17 +00:00
self.sdrProps = self.props.filter(*self.getEventNames())
2020-03-24 21:13:42 +00:00
self.profile_id = None
self.activateProfile()
self.wireEvents()
self.port = getAvailablePort()
self.monitor = None
self.clients = []
self.spectrumClients = []
self.spectrumThread = None
self.spectrumLock = threading.Lock()
self.process = None
self.modificationLock = threading.Lock()
self.failed = False
self.state = SdrSource.STATE_STOPPED
self.busyState = SdrSource.BUSYSTATE_IDLE
2019-12-31 18:14:05 +00:00
if self.isAlwaysOn():
self.start()
def isAlwaysOn(self):
return "always-on" in self.props and self.props["always-on"]
def getEventNames(self):
return [
"samp_rate",
"center_freq",
"ppm",
"rf_gain",
"lfo_offset",
] + list(self.getCommandMapper().keys())
2019-12-27 23:26:45 +00:00
def getCommandMapper(self):
2019-12-31 18:14:05 +00:00
if self.commandMapper is None:
self.commandMapper = CommandMapper()
2019-12-27 23:26:45 +00:00
return self.commandMapper
2019-12-27 23:26:45 +00:00
@abstractmethod
2020-12-30 16:18:46 +00:00
def onPropertyChange(self, changes):
pass
2019-12-27 23:26:45 +00:00
def wireEvents(self):
2020-03-24 21:52:17 +00:00
self.sdrProps.wire(self.onPropertyChange)
2019-12-27 23:26:45 +00:00
def getCommand(self):
return [self.getCommandMapper().map(self.getCommandValues())]
def activateProfile(self, profile_id=None):
profiles = self.props["profiles"]
if profile_id is None:
profile_id = list(profiles.keys())[0]
2019-12-23 20:12:28 +00:00
if profile_id not in profiles:
logger.warning("invalid profile %s for sdr %s. ignoring", profile_id, self.id)
return
if profile_id == self.profile_id:
return
logger.debug("activating profile {0}".format(profile_id))
self.props["profile_id"] = profile_id
2020-03-24 21:13:42 +00:00
profile = profiles[profile_id]
self.profile_id = profile_id
layer = PropertyLayer()
for (key, value) in profile.items():
# skip the name, that would overwrite the source name.
if key == "name":
continue
2020-03-24 21:13:42 +00:00
layer[key] = value
self.props.replaceLayer(0, layer)
def getId(self):
return self.id
def getProfileId(self):
return self.profile_id
def getProfiles(self):
return self.props["profiles"]
def getName(self):
return self.props["name"]
def getProps(self):
return self.props
def getPort(self):
return self.port
def _getSocketCLient(self):
with self.modificationLock:
if self.socketClient is None:
self.socketClient = SocketClient(self.port)
return self.socketClient
def getBuffer(self):
if self.buffer is None:
self.buffer = Buffer()
self._getSocketCLient().setOutput(self.buffer)
return self.buffer
def getCommandValues(self):
2020-03-24 21:52:17 +00:00
dict = self.sdrProps.__dict__()
if "lfo_offset" in dict and dict["lfo_offset"] is not None:
dict["tuner_freq"] = dict["center_freq"] + dict["lfo_offset"]
else:
dict["tuner_freq"] = dict["center_freq"]
return dict
def start(self):
2019-12-27 23:26:45 +00:00
with self.modificationLock:
if self.monitor:
return
if self.isFailed():
return
2019-12-31 15:20:36 +00:00
try:
self.preStart()
except Exception:
logger.exception("Exception during preStart()")
2019-12-27 23:26:45 +00:00
cmd = self.getCommand()
cmd = [c for c in cmd if c is not None]
# don't use shell mode for commands without piping
if len(cmd) > 1:
# multiple commands with pipes
cmd = "|".join(cmd)
self.process = subprocess.Popen(cmd, shell=True, start_new_session=True)
2019-12-27 23:26:45 +00:00
else:
# single command
cmd = cmd[0]
# start_new_session can go as soon as there's no piped commands left
2019-12-27 23:26:45 +00:00
# the os.killpg call must be replaced with something more reasonable at the same time
self.process = subprocess.Popen(shlex.split(cmd), start_new_session=True)
2019-12-28 15:44:45 +00:00
logger.info("Started sdr source: " + cmd)
2019-12-27 23:26:45 +00:00
available = False
def wait_for_process_to_end():
rc = self.process.wait()
logger.debug("shut down with RC={0}".format(rc))
self.monitor = None
2020-08-30 21:48:05 +00:00
if self.getState() == SdrSource.STATE_RUNNING:
self.failed = True
self.setState(SdrSource.STATE_FAILED)
else:
self.setState(SdrSource.STATE_STOPPED)
2019-12-27 23:26:45 +00:00
2020-08-14 18:22:25 +00:00
self.monitor = threading.Thread(target=wait_for_process_to_end, name="source_monitor")
2019-12-27 23:26:45 +00:00
self.monitor.start()
retries = 1000
while retries > 0 and not self.isFailed():
2019-12-27 23:26:45 +00:00
retries -= 1
if self.monitor is None:
break
testsock = socket.socket()
try:
testsock.connect(("127.0.0.1", self.getPort()))
testsock.close()
available = True
break
except:
time.sleep(0.1)
if not available:
self.failed = True
2019-12-27 23:26:45 +00:00
try:
self.postStart()
except Exception:
logger.exception("Exception during postStart()")
self.failed = True
self.setState(SdrSource.STATE_FAILED if self.failed else SdrSource.STATE_RUNNING)
2019-12-31 15:20:36 +00:00
def preStart(self):
"""
override this method in subclasses if there's anything to be done before starting up the actual SDR
"""
pass
def postStart(self):
2019-12-31 15:20:36 +00:00
"""
override this method in subclasses if there's things to do after the actual SDR has started up
"""
pass
def isAvailable(self):
return self.monitor is not None
def isFailed(self):
return self.failed
def stop(self):
self.setState(SdrSource.STATE_STOPPING)
2019-12-27 23:26:45 +00:00
with self.modificationLock:
2019-12-27 23:26:45 +00:00
if self.process is not None:
try:
os.killpg(os.getpgid(self.process.pid), signal.SIGTERM)
except ProcessLookupError:
# been killed by something else, ignore
pass
if self.monitor:
self.monitor.join()
if self.socketClient is not None:
self.socketClient.stop()
self.socketClient = None
def hasClients(self, *args):
clients = [c for c in self.clients if c.getClientClass() in args]
return len(clients) > 0
def addClient(self, c: SdrSourceEventClient):
self.clients.append(c)
c.onStateChange(self.getState())
hasUsers = self.hasClients(SdrSource.CLIENT_USER)
hasBackgroundTasks = self.hasClients(SdrSource.CLIENT_BACKGROUND)
if hasUsers or hasBackgroundTasks:
self.start()
self.setBusyState(SdrSource.BUSYSTATE_BUSY if hasUsers else SdrSource.BUSYSTATE_IDLE)
def removeClient(self, c: SdrSourceEventClient):
try:
self.clients.remove(c)
except ValueError:
pass
hasUsers = self.hasClients(SdrSource.CLIENT_USER)
self.setBusyState(SdrSource.BUSYSTATE_BUSY if hasUsers else SdrSource.BUSYSTATE_IDLE)
2019-12-31 18:14:05 +00:00
# no need to check for users if we are always-on
if self.isAlwaysOn():
return
hasBackgroundTasks = self.hasClients(SdrSource.CLIENT_BACKGROUND)
if not hasUsers and not hasBackgroundTasks:
self.stop()
def addSpectrumClient(self, c):
if c in self.spectrumClients:
return
# local import due to circular depencency
from owrx.fft import SpectrumThread
2019-12-28 00:24:07 +00:00
self.spectrumClients.append(c)
with self.spectrumLock:
if self.spectrumThread is None:
self.spectrumThread = SpectrumThread(self)
self.spectrumThread.start()
def removeSpectrumClient(self, c):
try:
self.spectrumClients.remove(c)
except ValueError:
pass
with self.spectrumLock:
if not self.spectrumClients and self.spectrumThread is not None:
self.spectrumThread.stop()
self.spectrumThread = None
def writeSpectrumData(self, data):
for c in self.spectrumClients:
c.write_spectrum_data(data)
2019-12-23 20:12:28 +00:00
def getState(self):
return self.state
def setState(self, state):
if state == self.state:
return
self.state = state
for c in self.clients:
c.onStateChange(state)
def setBusyState(self, state):
if state == self.busyState:
return
self.busyState = state
for c in self.clients:
c.onBusyStateChange(state)