91 lines
3.1 KiB
Python
91 lines
3.1 KiB
Python
from abc import ABCMeta, abstractmethod
|
|
from owrx.command import Option
|
|
from .connector import ConnectorSource
|
|
|
|
|
|
class SoapyConnectorSource(ConnectorSource, metaclass=ABCMeta):
|
|
def getCommandMapper(self):
|
|
return super().getCommandMapper().setBase("soapy_connector").setMappings(
|
|
{
|
|
"antenna": Option("-a"),
|
|
"soapy_settings": Option("-t"),
|
|
}
|
|
)
|
|
|
|
"""
|
|
must be implemented by child classes to be able to build a driver-based device selector by default.
|
|
return value must be the corresponding soapy driver identifier.
|
|
"""
|
|
|
|
@abstractmethod
|
|
def getDriver(self):
|
|
pass
|
|
|
|
def getEventNames(self):
|
|
return super().getEventNames() + [
|
|
"antenna",
|
|
] + list(self.getSoapySettingsMappings().keys())
|
|
|
|
def parseDeviceString(self, dstr):
|
|
def decodeComponent(c):
|
|
kv = c.split("=", 1)
|
|
if len(kv) < 2:
|
|
return c
|
|
else:
|
|
return {kv[0]: kv[1]}
|
|
|
|
return [decodeComponent(c) for c in dstr.split(",")]
|
|
|
|
def encodeDeviceString(self, dobj):
|
|
def encodeComponent(c):
|
|
if isinstance(c, str):
|
|
return c
|
|
else:
|
|
return ",".join(["{0}={1}".format(key, value) for key, value in c.items()])
|
|
|
|
return ",".join([encodeComponent(c) for c in dobj])
|
|
|
|
def buildSoapyDeviceParameters(self, parsed, values):
|
|
"""
|
|
this method always attempts to inject a driver= part into the soapysdr query, depending on what connector was used.
|
|
this prevents the soapy_connector from using the wrong device in scenarios where there's no same-type sdrs.
|
|
"""
|
|
parsed = [v for v in parsed if "driver" not in v]
|
|
parsed += [{"driver": self.getDriver()}]
|
|
return parsed
|
|
|
|
def getSoapySettingsMappings(self):
|
|
return {}
|
|
|
|
def buildSoapySettings(self, values):
|
|
settings = {}
|
|
for k, v in self.getSoapySettingsMappings().items():
|
|
if k in values and values[k] is not None:
|
|
settings[v] = self.convertSoapySettingsValue(values[k])
|
|
return settings
|
|
|
|
def convertSoapySettingsValue(self, value):
|
|
if isinstance(value, bool):
|
|
return "true" if value else "false"
|
|
return value
|
|
|
|
def getCommandValues(self):
|
|
values = super().getCommandValues()
|
|
if "device" in values and values["device"] is not None:
|
|
parsed = self.parseDeviceString(values["device"])
|
|
else:
|
|
parsed = []
|
|
modified = self.buildSoapyDeviceParameters(parsed, values)
|
|
values["device"] = self.encodeDeviceString(modified)
|
|
settings = ",".join(["{0}={1}".format(k, v) for k, v in self.buildSoapySettings(values).items()])
|
|
if len(settings):
|
|
values["soapy_settings"] = settings
|
|
return values
|
|
|
|
def onPropertyChange(self, prop, value):
|
|
mappings = self.getSoapySettingsMappings()
|
|
if prop in mappings.keys():
|
|
value = "{0}={1}".format(mappings[prop], self.convertSoapySettingsValue(value))
|
|
prop = "settings"
|
|
super().onPropertyChange(prop, value)
|