openwebrx-clone/owrx/source/direct.py

76 lines
2.4 KiB
Python

from abc import ABCMeta
from owrx.source import SdrSource, SdrDeviceDescription
from csdr.chain import Chain
from typing import Optional
from pycsdr.modules import Buffer
from pycsdr.types import Format
class DirectSource(SdrSource, metaclass=ABCMeta):
def __init__(self, id, props):
self._conversion = None
super().__init__(id, props)
def onPropertyChange(self, changes):
self.logger.debug("restarting sdr source due to property changes: {0}".format(changes))
self.stop()
self.sleepOnRestart()
self.start()
def nmux_memory(self):
# in megabytes. This sets the approximate size of the circular buffer used by nmux.
return 50
def getNmuxCommand(self):
props = self.sdrProps
nmux_bufcnt = nmux_bufsize = 0
while nmux_bufsize < props["samp_rate"] / 4:
nmux_bufsize += 4096
while nmux_bufsize * nmux_bufcnt < self.nmux_memory() * 1e6:
nmux_bufcnt += 1
if nmux_bufcnt == 0 or nmux_bufsize == 0:
raise ValueError("Error: unable to calculate nmux buffer parameters.")
return [
"nmux --bufsize %d --bufcnt %d --port %d --address 127.0.0.1"
% (
nmux_bufsize,
nmux_bufcnt,
self.port,
)
]
def getCommand(self):
return super().getCommand() + self.getNmuxCommand()
# override this in subclasses, if necessary
def getFormatConversion(self) -> Optional[Chain]:
return None
def _getTcpSourceFormat(self):
conversion = self.getFormatConversion()
return Format.COMPLEX_FLOAT if conversion is None else conversion.getInputFormat()
# override this in subclasses, if necessary
def sleepOnRestart(self):
pass
def getBuffer(self):
if self.buffer is None:
source = self._getTcpSource()
buffer = Buffer(source.getOutputFormat())
source.setWriter(buffer)
self._conversion = self.getFormatConversion()
if self._conversion is not None:
self._conversion.setReader(buffer.getReader())
# this one must be COMPLEX_FLOAT
buffer = Buffer(Format.COMPLEX_FLOAT)
self._conversion.setWriter(buffer)
self.buffer = buffer
return self.buffer
class DirectSourceDeviceDescription(SdrDeviceDescription):
pass