openwebrx-clone/owrx/source/soapy.py

97 lines
3.3 KiB
Python

from abc import ABCMeta, abstractmethod
from owrx.command import Option
from .connector import ConnectorSource
class SoapyConnectorSource(ConnectorSource, metaclass=ABCMeta):
def getCommandMapper(self):
return (
super()
.getCommandMapper()
.setBase("soapy_connector")
.setMappings(
{
"antenna": Option("-a"),
"soapy_settings": Option("-t"),
}
)
)
"""
must be implemented by child classes to be able to build a driver-based device selector by default.
return value must be the corresponding soapy driver identifier.
"""
@abstractmethod
def getDriver(self):
pass
def getEventNames(self):
return super().getEventNames() + list(self.getSoapySettingsMappings().keys())
def parseDeviceString(self, dstr):
def decodeComponent(c):
kv = c.split("=", 1)
if len(kv) < 2:
return c
else:
return {kv[0]: kv[1]}
return [decodeComponent(c) for c in dstr.split(",")]
def encodeDeviceString(self, dobj):
def encodeComponent(c):
if isinstance(c, str):
return c
else:
return ",".join(["{0}={1}".format(key, value) for key, value in c.items()])
return ",".join([encodeComponent(c) for c in dobj])
def buildSoapyDeviceParameters(self, parsed, values):
"""
this method always attempts to inject a driver= part into the soapysdr query, depending on what connector was used.
this prevents the soapy_connector from using the wrong device in scenarios where there's no same-type sdrs.
"""
parsed = [v for v in parsed if "driver" not in v]
parsed += [{"driver": self.getDriver()}]
return parsed
def getSoapySettingsMappings(self):
return {}
def buildSoapySettings(self, values):
settings = {}
for k, v in self.getSoapySettingsMappings().items():
if k in values and values[k] is not None:
settings[v] = self.convertSoapySettingsValue(values[k])
return settings
def convertSoapySettingsValue(self, value):
if isinstance(value, bool):
return "true" if value else "false"
return value
def getCommandValues(self):
values = super().getCommandValues()
if "device" in values and values["device"] is not None:
parsed = self.parseDeviceString(values["device"])
else:
parsed = []
modified = self.buildSoapyDeviceParameters(parsed, values)
values["device"] = self.encodeDeviceString(modified)
settings = ",".join(["{0}={1}".format(k, v) for k, v in self.buildSoapySettings(values).items()])
if len(settings):
values["soapy_settings"] = settings
return values
def onPropertyChange(self, changes):
mappings = self.getSoapySettingsMappings()
settings = {}
for prop, value in changes.items():
if prop in mappings.keys():
settings[mappings[prop]] = self.convertSoapySettingsValue(value)
if settings:
changes["settings"] = ",".join("{0}={1}".format(k, v) for k, v in settings.items())
super().onPropertyChange(changes)