openwebrx-clone/owrx/source/fifi_sdr.py

72 lines
2.3 KiB
Python

from owrx.command import Option
from owrx.source.direct import DirectSource, DirectSourceDeviceDescription
from owrx.log import LogPipe
from subprocess import Popen
from csdr.chain import Chain
from pycsdr.modules import Convert, Gain
from pycsdr.types import Format
from typing import List
from owrx.form.input import Input, TextInput
import logging
class FifiSdrSource(DirectSource):
def getCommandMapper(self):
return (
super()
.getCommandMapper()
.setBase("arecord")
.setMappings({"device": Option("-D"), "samp_rate": Option("-r")})
.setStatic("-t raw -f S16_LE -c2 -")
)
def getEventNames(self):
return super().getEventNames() + ["device"]
def getFormatConversion(self) -> Chain:
return Chain([Convert(Format.COMPLEX_SHORT, Format.COMPLEX_FLOAT), Gain(Format.COMPLEX_FLOAT, 5.0)])
def sendRockProgFrequency(self, frequency):
stdoutPipe = LogPipe(logging.DEBUG, self.logger, "STDOUT")
stderrPipe = LogPipe(logging.DEBUG, self.logger, "STDERR")
process = Popen(
["rockprog", "--vco", "-w", "--freq={}".format(frequency / 1e6)],
stdout=stdoutPipe,
stderr=stderrPipe
)
process.communicate()
rc = process.wait()
if rc != 0:
self.logger.warning("rockprog failed to set frequency; rc=%i", rc)
stdoutPipe.close()
stderrPipe.close()
def preStart(self):
values = self.getCommandValues()
self.sendRockProgFrequency(values["tuner_freq"])
def onPropertyChange(self, changes):
if "center_freq" in changes:
self.sendRockProgFrequency(changes["center_freq"])
class FifiSdrDeviceDescription(DirectSourceDeviceDescription):
def getName(self):
return "FiFi SDR"
def supportsPpm(self):
# not currently mapped, and it's unclear how this should be sent to the device
return False
def getInputs(self) -> List[Input]:
return super().getInputs() + [
TextInput(
"device",
"Device identifier",
infotext="Alsa audio device identifier",
),
]
def getDeviceOptionalKeys(self):
return super().getDeviceOptionalKeys() + ["device"]