openwebrx-clone/owrx/source/soapy.py

111 lines
3.8 KiB
Python
Raw Normal View History

2019-12-27 23:26:45 +00:00
from abc import ABCMeta, abstractmethod
from owrx.command import Option
2021-02-19 14:29:17 +00:00
from owrx.source.connector import ConnectorSource, ConnectorDeviceDescription
from typing import List
from owrx.form import Input, TextInput
2021-02-19 17:45:29 +00:00
from owrx.form.converter import OptionalConverter
from owrx.form.device import GainInput
from owrx.soapy import SoapySettings
2019-12-27 23:26:45 +00:00
class SoapyConnectorSource(ConnectorSource, metaclass=ABCMeta):
2019-12-31 18:14:05 +00:00
def getCommandMapper(self):
2021-01-20 16:01:46 +00:00
return (
super()
.getCommandMapper()
.setBase("soapy_connector")
.setMappings(
{
"antenna": Option("-a"),
"soapy_settings": Option("-t"),
}
)
)
2019-12-27 23:26:45 +00:00
"""
must be implemented by child classes to be able to build a driver-based device selector by default.
return value must be the corresponding soapy driver identifier.
"""
2019-12-28 00:24:07 +00:00
2019-12-27 23:26:45 +00:00
@abstractmethod
def getDriver(self):
pass
2019-12-27 23:26:45 +00:00
def getEventNames(self):
return super().getEventNames() + list(self.getSoapySettingsMappings().keys())
2019-12-27 23:26:45 +00:00
2020-02-09 12:59:37 +00:00
def buildSoapyDeviceParameters(self, parsed, values):
"""
this method always attempts to inject a driver= part into the soapysdr query, depending on what connector was used.
this prevents the soapy_connector from using the wrong device in scenarios where there's no same-type sdrs.
"""
parsed = [v for v in parsed if "driver" not in v]
parsed += [{"driver": self.getDriver()}]
return parsed
2019-12-28 00:24:07 +00:00
def getSoapySettingsMappings(self):
return {}
def buildSoapySettings(self, values):
settings = {}
for k, v in self.getSoapySettingsMappings().items():
2020-03-14 00:21:30 +00:00
if k in values and values[k] is not None:
settings[v] = self.convertSoapySettingsValue(values[k])
return settings
def convertSoapySettingsValue(self, value):
if isinstance(value, bool):
return "true" if value else "false"
return value
def getCommandValues(self):
values = super().getCommandValues()
if "device" in values and values["device"] is not None:
parsed = SoapySettings.parse(values["device"])
else:
2020-02-09 12:59:37 +00:00
parsed = []
modified = self.buildSoapyDeviceParameters(parsed, values)
values["device"] = SoapySettings.encode(modified)
settings = ",".join(["{0}={1}".format(k, v) for k, v in self.buildSoapySettings(values).items()])
if len(settings):
values["soapy_settings"] = settings
return values
2020-12-30 16:18:46 +00:00
def onPropertyChange(self, changes):
mappings = self.getSoapySettingsMappings()
2020-12-30 16:18:46 +00:00
settings = {}
for prop, value in changes.items():
if prop in mappings.keys():
settings[mappings[prop]] = self.convertSoapySettingsValue(value)
if settings:
changes["settings"] = ",".join("{0}={1}".format(k, v) for k, v in settings.items())
super().onPropertyChange(changes)
2021-02-19 14:29:17 +00:00
class SoapyConnectorDeviceDescription(ConnectorDeviceDescription):
def getInputs(self) -> List[Input]:
return super().getInputs() + [
TextInput(
"device",
"Device Identifier",
infotext='SoapySDR device identifier string (example: "serial=123456789")',
converter=OptionalConverter()
),
GainInput(
"rf_gain",
"Device Gain",
gain_stages=self.getGainStages(),
),
TextInput(
"antenna",
"Antenna",
converter=OptionalConverter(),
),
]
def getOptionalKeys(self):
return super().getOptionalKeys() + ["device", "rf_gain", "antenna"]
2021-02-19 17:18:25 +00:00
def getGainStages(self):
return None