from owrx.source import SdrSource, SdrDeviceDescription
from owrx.socket import getAvailablePort
from owrx.property import PropertyDeleted
import socket
from owrx.command import Flag, Option
from typing import List
from owrx.form.input import Input, NumberInput, CheckboxInput
import logging
logger = logging.getLogger(__name__)
class ConnectorSource(SdrSource):
def __init__(self, id, props):
self.controlSocket = None
self.controlPort = getAvailablePort()
super().__init__(id, props)
def getCommandMapper(self):
return (
super()
.getCommandMapper()
.setMappings(
{
"samp_rate": Option("-s"),
"tuner_freq": Option("-f"),
"port": Option("-p"),
"controlPort": Option("-c"),
"device": Option("-d"),
"iqswap": Flag("-i"),
"rtltcp_compat": Option("-r"),
"ppm": Option("-P"),
"rf_gain": Option("-g"),
}
)
)
def sendControlMessage(self, changes):
for prop, value in changes.items():
if value is PropertyDeleted:
value = None
logger.debug("sending property change over control socket: {0} changed to {1}".format(prop, value))
self.controlSocket.sendall("{prop}:{value}\n".format(prop=prop, value=value).encode())
def onPropertyChange(self, changes):
if self.monitor is None:
return
if (
("center_freq" in changes or "lfo_offset" in changes)
and "lfo_offset" in self.sdrProps
and self.sdrProps["lfo_offset"] is not None
):
changes["center_freq"] = self.sdrProps["center_freq"] + self.sdrProps["lfo_offset"]
changes.pop("lfo_offset", None)
self.sendControlMessage(changes)
def postStart(self):
logger.debug("opening control socket...")
self.controlSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.controlSocket.connect(("localhost", self.controlPort))
def stop(self):
super().stop()
if self.controlSocket:
self.controlSocket.close()
self.controlSocket = None
def getControlPort(self):
return self.controlPort
def getCommandValues(self):
values = super().getCommandValues()
values["port"] = self.getPort()
values["controlPort"] = self.getControlPort()
return values
class ConnectorDeviceDescription(SdrDeviceDescription):
def getInputs(self) -> List[Input]:
return super().getInputs() + [
NumberInput(
"rtltcp_compat",
"Port for rtl_tcp compatible data",
infotext="Activate an rtl_tcp compatible interface on the port number specified.
"
+ "Note: Port is only available on the local machine, not on the network.
"
+ "Note: IQ data may be degraded by the downsampling process to 8 bits.",
),
CheckboxInput(
"iqswap",
"Swap I and Q channels",
infotext="Swapping inverts the spectrum, so this is useful in combination with an inverting mixer",
),
]
def getDeviceOptionalKeys(self):
return super().getDeviceOptionalKeys() + ["rtltcp_compat", "iqswap"]
def getProfileOptionalKeys(self):
return super().getProfileOptionalKeys() + ["iqswap"]