openwebrx-clone/csdr/chain/selector.py

161 lines
5.7 KiB
Python
Raw Normal View History

2021-08-23 14:25:28 +02:00
from csdr.chain import Chain
from pycsdr.modules import Shift, FirDecimate, Bandpass, Squelch, FractionalDecimator, Writer
from pycsdr.types import Format
import math
class Decimator(Chain):
def __init__(self, inputRate: int, outputRate: int):
if outputRate > inputRate:
raise ValueError("impossible decimation: cannot upsample {} to {}".format(inputRate, outputRate))
self.inputRate = inputRate
self.outputRate = outputRate
decimation, fraction = self._getDecimation(outputRate)
transition = 0.15 * (outputRate / float(self.inputRate))
# set the cutoff on the fist decimation stage lower so that the resulting output
# is already prepared for the second (fractional) decimation stage.
# this spares us a second filter.
cutoff = 0.5 * decimation / (self.inputRate / outputRate)
workers = [
FirDecimate(decimation, transition, cutoff),
]
if fraction != 1.0:
workers += [FractionalDecimator(Format.COMPLEX_FLOAT, fraction)]
super().__init__(workers)
def _getDecimation(self, outputRate: int) -> (int, float):
d = self.inputRate / outputRate
dInt = int(d)
dFloat = float(self.inputRate / dInt) / outputRate
return dInt, dFloat
def _reconfigure(self):
decimation, fraction = self._getDecimation(self.outputRate)
transition = 0.15 * (self.outputRate / float(self.inputRate))
cutoff = 0.5 * decimation / (self.inputRate / self.outputRate)
self.replace(0, FirDecimate(decimation, transition, cutoff))
index = self.indexOf(lambda x: isinstance(x, FractionalDecimator))
if fraction != 1.0:
decimator = FractionalDecimator(Format.COMPLEX_FLOAT, fraction)
if index >= 0:
self.replace(index, decimator)
else:
self.append(decimator)
elif index >= 0:
self.remove(index)
def setOutputRate(self, outputRate: int) -> None:
if outputRate == self.outputRate:
return
self.outputRate = outputRate
self._reconfigure()
def setInputRate(self, inputRate: int) -> None:
if inputRate == self.inputRate:
return
self.inputRate = inputRate
self._reconfigure()
class Selector(Chain):
2021-09-23 18:43:41 +02:00
def __init__(self, inputRate: int, outputRate: int, withSquelch: bool = True):
self.inputRate = inputRate
2021-08-23 14:25:28 +02:00
self.outputRate = outputRate
2021-09-23 18:43:41 +02:00
self.frequencyOffset = 0
2021-08-23 14:25:28 +02:00
2021-09-23 18:43:41 +02:00
self.shift = Shift(0.0)
2021-08-23 14:25:28 +02:00
self.decimation = Decimator(inputRate, outputRate)
self.bandpass = self._buildBandpass()
self.bandpassCutoffs = None
self.setBandpass(-4000, 4000)
2021-09-06 15:05:33 +02:00
workers = [self.shift, self.decimation, self.bandpass]
2021-08-23 14:25:28 +02:00
2021-09-06 15:05:33 +02:00
if withSquelch:
self.readings_per_second = 4
# s-meter readings are available every 1024 samples
# the reporting interval is measured in those 1024-sample blocks
self.squelch = Squelch(5, int(outputRate / (self.readings_per_second * 1024)))
workers += [self.squelch]
2021-08-23 14:25:28 +02:00
super().__init__(workers)
def _buildBandpass(self) -> Bandpass:
bp_transition = 320.0 / self.outputRate
return Bandpass(transition=bp_transition, use_fft=True)
2021-09-23 18:43:41 +02:00
def setFrequencyOffset(self, offset: int) -> None:
if offset == self.frequencyOffset:
return
self.frequencyOffset = offset
self._updateShift()
def _updateShift(self):
shift = -self.frequencyOffset / self.inputRate
self.shift.setRate(shift)
2021-08-23 14:25:28 +02:00
def _convertToLinear(self, db: float) -> float:
return float(math.pow(10, db / 10))
def setSquelchLevel(self, level: float) -> None:
self.squelch.setSquelchLevel(self._convertToLinear(level))
def setBandpass(self, lowCut: float, highCut: float) -> None:
self.bandpassCutoffs = [lowCut, highCut]
scaled = [x / self.outputRate for x in self.bandpassCutoffs]
self.bandpass.setBandpass(*scaled)
def setLowCut(self, lowCut: float) -> None:
self.bandpassCutoffs[0] = lowCut
self.setBandpass(*self.bandpassCutoffs)
def setHighCut(self, highCut: float) -> None:
self.bandpassCutoffs[1] = highCut
self.setBandpass(*self.bandpassCutoffs)
def setPowerWriter(self, writer: Writer) -> None:
self.squelch.setPowerWriter(writer)
def setOutputRate(self, outputRate: int) -> None:
if outputRate == self.outputRate:
return
self.outputRate = outputRate
self.decimation.setOutputRate(outputRate)
self.squelch.setReportInterval(int(outputRate / (self.readings_per_second * 1024)))
self.bandpass = self._buildBandpass()
self.setBandpass(*self.bandpassCutoffs)
self.replace(2, self.bandpass)
def setInputRate(self, inputRate: int) -> None:
2021-09-23 18:43:41 +02:00
if inputRate == self.inputRate:
return
self.inputRate = inputRate
2021-08-23 14:25:28 +02:00
self.decimation.setInputRate(inputRate)
2021-09-23 18:43:41 +02:00
self._updateShift()
class SecondarySelector(Chain):
def __init__(self, sampleRate: int, bandwidth: float):
self.sampleRate = sampleRate
self.frequencyOffset = 0
self.shift = Shift(0.0)
cutoffRate = bandwidth / sampleRate
self.bandpass = Bandpass(-cutoffRate, cutoffRate, cutoffRate, use_fft=True)
workers = [self.shift, self.bandpass]
super().__init__(workers)
def setFrequencyOffset(self, offset: int) -> None:
if offset == self.frequencyOffset:
return
self.frequencyOffset = offset
if self.frequencyOffset is None:
return
self.shift.setRate(-offset / self.sampleRate)