2019-12-27 23:26:45 +00:00
|
|
|
from abc import ABCMeta, abstractmethod
|
|
|
|
from owrx.command import Option
|
|
|
|
|
2019-12-21 19:58:28 +00:00
|
|
|
from .connector import ConnectorSource
|
|
|
|
|
|
|
|
|
2019-12-27 23:26:45 +00:00
|
|
|
class SoapyConnectorSource(ConnectorSource, metaclass=ABCMeta):
|
2019-12-31 18:14:05 +00:00
|
|
|
def getCommandMapper(self):
|
|
|
|
return super().getCommandMapper().setBase("soapy_connector").setMappings({"antenna": Option("-a")})
|
2019-12-27 23:26:45 +00:00
|
|
|
|
2019-12-21 19:58:28 +00:00
|
|
|
"""
|
|
|
|
must be implemented by child classes to be able to build a driver-based device selector by default.
|
|
|
|
return value must be the corresponding soapy driver identifier.
|
|
|
|
"""
|
2019-12-28 00:24:07 +00:00
|
|
|
|
2019-12-27 23:26:45 +00:00
|
|
|
@abstractmethod
|
2019-12-21 19:58:28 +00:00
|
|
|
def getDriver(self):
|
|
|
|
pass
|
|
|
|
|
2019-12-27 23:26:45 +00:00
|
|
|
def getEventNames(self):
|
|
|
|
return super().getEventNames() + [
|
|
|
|
"antenna",
|
|
|
|
]
|
|
|
|
|
2019-12-21 19:58:28 +00:00
|
|
|
def parseDeviceString(self, dstr):
|
|
|
|
def decodeComponent(c):
|
|
|
|
kv = c.split("=", 1)
|
|
|
|
if len(kv) < 2:
|
|
|
|
return c
|
|
|
|
else:
|
|
|
|
return {kv[0]: kv[1]}
|
|
|
|
|
|
|
|
return [decodeComponent(c) for c in dstr.split(",")]
|
|
|
|
|
|
|
|
def encodeDeviceString(self, dobj):
|
|
|
|
def encodeComponent(c):
|
|
|
|
if isinstance(c, str):
|
|
|
|
return c
|
|
|
|
else:
|
|
|
|
return ",".join(["{0}={1}".format(key, value) for key, value in c.items()])
|
|
|
|
|
|
|
|
return ",".join([encodeComponent(c) for c in dobj])
|
|
|
|
|
2020-02-09 12:59:37 +00:00
|
|
|
def buildSoapyDeviceParameters(self, parsed, values):
|
|
|
|
"""
|
|
|
|
this method always attempts to inject a driver= part into the soapysdr query, depending on what connector was used.
|
|
|
|
this prevents the soapy_connector from using the wrong device in scenarios where there's no same-type sdrs.
|
|
|
|
"""
|
|
|
|
parsed = [v for v in parsed if "driver" not in v]
|
|
|
|
parsed += [{"driver": self.getDriver()}]
|
|
|
|
return parsed
|
2019-12-28 00:24:07 +00:00
|
|
|
|
2019-12-21 19:58:28 +00:00
|
|
|
def getCommandValues(self):
|
|
|
|
values = super().getCommandValues()
|
|
|
|
if "device" in values and values["device"] is not None:
|
|
|
|
parsed = self.parseDeviceString(values["device"])
|
|
|
|
else:
|
2020-02-09 12:59:37 +00:00
|
|
|
parsed = []
|
|
|
|
modified = self.buildSoapyDeviceParameters(parsed, values)
|
|
|
|
values["device"] = self.encodeDeviceString(modified)
|
2019-12-21 19:58:28 +00:00
|
|
|
return values
|