openwebrx-clone/owrx/source/soapy.py

86 lines
2.9 KiB
Python
Raw Normal View History

2019-12-27 23:26:45 +00:00
from abc import ABCMeta, abstractmethod
from owrx.command import Option
from .connector import ConnectorSource
2019-12-27 23:26:45 +00:00
class SoapyConnectorSource(ConnectorSource, metaclass=ABCMeta):
2019-12-31 18:14:05 +00:00
def getCommandMapper(self):
return super().getCommandMapper().setBase("soapy_connector").setMappings(
{
"antenna": Option("-a"),
"soapy_settings": Option("-t"),
}
)
2019-12-27 23:26:45 +00:00
"""
must be implemented by child classes to be able to build a driver-based device selector by default.
return value must be the corresponding soapy driver identifier.
"""
2019-12-28 00:24:07 +00:00
2019-12-27 23:26:45 +00:00
@abstractmethod
def getDriver(self):
pass
2019-12-27 23:26:45 +00:00
def getEventNames(self):
return super().getEventNames() + [
"antenna",
] + list(self.getSoapySettingsMappings().keys())
2019-12-27 23:26:45 +00:00
def parseDeviceString(self, dstr):
def decodeComponent(c):
kv = c.split("=", 1)
if len(kv) < 2:
return c
else:
return {kv[0]: kv[1]}
return [decodeComponent(c) for c in dstr.split(",")]
def encodeDeviceString(self, dobj):
def encodeComponent(c):
if isinstance(c, str):
return c
else:
return ",".join(["{0}={1}".format(key, value) for key, value in c.items()])
return ",".join([encodeComponent(c) for c in dobj])
2020-02-09 12:59:37 +00:00
def buildSoapyDeviceParameters(self, parsed, values):
"""
this method always attempts to inject a driver= part into the soapysdr query, depending on what connector was used.
this prevents the soapy_connector from using the wrong device in scenarios where there's no same-type sdrs.
"""
parsed = [v for v in parsed if "driver" not in v]
parsed += [{"driver": self.getDriver()}]
return parsed
2019-12-28 00:24:07 +00:00
def getSoapySettingsMappings(self):
return {}
def buildSoapySettings(self, values):
settings = {}
for k, v in self.getSoapySettingsMappings().items():
2020-03-14 00:21:30 +00:00
if k in values and values[k] is not None:
settings[v] = values[k]
return settings
def getCommandValues(self):
values = super().getCommandValues()
if "device" in values and values["device"] is not None:
parsed = self.parseDeviceString(values["device"])
else:
2020-02-09 12:59:37 +00:00
parsed = []
modified = self.buildSoapyDeviceParameters(parsed, values)
values["device"] = self.encodeDeviceString(modified)
settings = ",".join(["{0}={1}".format(k, v) for k, v in self.buildSoapySettings(values).items()])
if len(settings):
values["soapy_settings"] = settings
return values
def onPropertyChange(self, prop, value):
mappings = self.getSoapySettingsMappings()
if prop in mappings.keys():
value = "{0}={1}".format(mappings[prop], value)
prop = "settings"
super().onPropertyChange(prop, value)