openwebrx-clone/owrx/source/connector.py

101 lines
3.5 KiB
Python
Raw Permalink Normal View History

2021-02-19 14:29:17 +00:00
from owrx.source import SdrSource, SdrDeviceDescription
from owrx.socket import getAvailablePort
2021-05-03 17:28:03 +00:00
from owrx.property import PropertyDeleted
import socket
2021-02-19 14:29:17 +00:00
from owrx.command import Flag, Option
2021-02-19 15:29:30 +00:00
from typing import List
2021-04-29 13:17:21 +00:00
from owrx.form.input import Input, NumberInput, CheckboxInput
import logging
logger = logging.getLogger(__name__)
class ConnectorSource(SdrSource):
def __init__(self, id, props):
self.controlSocket = None
self.controlPort = getAvailablePort()
2019-12-31 18:14:05 +00:00
super().__init__(id, props)
def getCommandMapper(self):
2021-01-20 16:01:46 +00:00
return (
super()
.getCommandMapper()
.setMappings(
{
"samp_rate": Option("-s"),
"tuner_freq": Option("-f"),
"port": Option("-p"),
"controlPort": Option("-c"),
"device": Option("-d"),
"iqswap": Flag("-i"),
"rtltcp_compat": Option("-r"),
"ppm": Option("-P"),
"rf_gain": Option("-g"),
}
)
2019-12-28 00:24:07 +00:00
)
2020-12-30 16:18:46 +00:00
def sendControlMessage(self, changes):
for prop, value in changes.items():
2021-05-03 17:28:03 +00:00
if value is PropertyDeleted:
value = None
2020-12-30 16:18:46 +00:00
logger.debug("sending property change over control socket: {0} changed to {1}".format(prop, value))
self.controlSocket.sendall("{prop}:{value}\n".format(prop=prop, value=value).encode())
2020-12-30 16:18:46 +00:00
def onPropertyChange(self, changes):
2019-12-27 23:26:45 +00:00
if self.monitor is None:
return
if (
2020-12-30 16:18:46 +00:00
("center_freq" in changes or "lfo_offset" in changes)
2020-03-24 21:52:17 +00:00
and "lfo_offset" in self.sdrProps
and self.sdrProps["lfo_offset"] is not None
2019-12-27 23:26:45 +00:00
):
2020-12-30 16:18:46 +00:00
changes["center_freq"] = self.sdrProps["center_freq"] + self.sdrProps["lfo_offset"]
changes.pop("lfo_offset", None)
self.sendControlMessage(changes)
def postStart(self):
logger.debug("opening control socket...")
self.controlSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.controlSocket.connect(("localhost", self.controlPort))
def stop(self):
super().stop()
if self.controlSocket:
self.controlSocket.close()
self.controlSocket = None
2019-12-27 23:26:45 +00:00
def getControlPort(self):
return self.controlPort
2019-12-27 23:26:45 +00:00
def getCommandValues(self):
values = super().getCommandValues()
values["port"] = self.getPort()
values["controlPort"] = self.getControlPort()
return values
2021-02-19 14:29:17 +00:00
class ConnectorDeviceDescription(SdrDeviceDescription):
2021-02-19 15:29:30 +00:00
def getInputs(self) -> List[Input]:
return super().getInputs() + [
NumberInput(
"rtltcp_compat",
"Port for rtl_tcp compatible data",
infotext="Activate an rtl_tcp compatible interface on the port number specified.<br />"
+ "Note: Port is only available on the local machine, not on the network.<br />"
+ "Note: IQ data may be degraded by the downsampling process to 8 bits.",
),
CheckboxInput(
"iqswap",
"Swap I and Q channels",
infotext="Swapping inverts the spectrum, so this is useful in combination with an inverting mixer",
),
]
def getDeviceOptionalKeys(self):
return super().getDeviceOptionalKeys() + ["rtltcp_compat", "iqswap"]
2021-02-23 17:32:23 +00:00
def getProfileOptionalKeys(self):
return super().getProfileOptionalKeys() + ["iqswap"]