from csdr.chain import Chain from pycsdr.modules import Shift, FirDecimate, Bandpass, Squelch, FractionalDecimator, Writer from pycsdr.types import Format import math class Decimator(Chain): def __init__(self, inputRate: int, outputRate: int): if outputRate > inputRate: raise ValueError("impossible decimation: cannot upsample {} to {}".format(inputRate, outputRate)) self.inputRate = inputRate self.outputRate = outputRate decimation, fraction = self._getDecimation(outputRate) transition = 0.15 * (outputRate / float(self.inputRate)) # set the cutoff on the fist decimation stage lower so that the resulting output # is already prepared for the second (fractional) decimation stage. # this spares us a second filter. cutoff = 0.5 * decimation / (self.inputRate / outputRate) workers = [ FirDecimate(decimation, transition, cutoff), ] if fraction != 1.0: workers += [FractionalDecimator(Format.COMPLEX_FLOAT, fraction)] super().__init__(workers) def _getDecimation(self, outputRate: int) -> (int, float): d = self.inputRate / outputRate dInt = int(d) dFloat = float(self.inputRate / dInt) / outputRate return dInt, dFloat def _reconfigure(self): decimation, fraction = self._getDecimation(self.outputRate) transition = 0.15 * (self.outputRate / float(self.inputRate)) cutoff = 0.5 * decimation / (self.inputRate / self.outputRate) self.replace(0, FirDecimate(decimation, transition, cutoff)) index = self.indexOf(lambda x: isinstance(x, FractionalDecimator)) if fraction != 1.0: decimator = FractionalDecimator(Format.COMPLEX_FLOAT, fraction) if index >= 0: self.replace(index, decimator) else: self.append(decimator) elif index >= 0: self.remove(index) def setOutputRate(self, outputRate: int) -> None: if outputRate == self.outputRate: return self.outputRate = outputRate self._reconfigure() def setInputRate(self, inputRate: int) -> None: if inputRate == self.inputRate: return self.inputRate = inputRate self._reconfigure() class Selector(Chain): def __init__(self, inputRate: int, outputRate: int, withSquelch: bool = True): self.inputRate = inputRate self.outputRate = outputRate self.frequencyOffset = 0 self.shift = Shift(0.0) self.decimation = Decimator(inputRate, outputRate) self.bandpass = self._buildBandpass() self.bandpassCutoffs = None self.setBandpass(-4000, 4000) workers = [self.shift, self.decimation, self.bandpass] if withSquelch: self.readings_per_second = 4 # s-meter readings are available every 1024 samples # the reporting interval is measured in those 1024-sample blocks self.squelch = Squelch(5, int(outputRate / (self.readings_per_second * 1024))) workers += [self.squelch] super().__init__(workers) def _buildBandpass(self) -> Bandpass: bp_transition = 320.0 / self.outputRate return Bandpass(transition=bp_transition, use_fft=True) def setFrequencyOffset(self, offset: int) -> None: if offset == self.frequencyOffset: return self.frequencyOffset = offset self._updateShift() def _updateShift(self): shift = -self.frequencyOffset / self.inputRate self.shift.setRate(shift) def _convertToLinear(self, db: float) -> float: return float(math.pow(10, db / 10)) def setSquelchLevel(self, level: float) -> None: self.squelch.setSquelchLevel(self._convertToLinear(level)) def setBandpass(self, lowCut: float, highCut: float) -> None: self.bandpassCutoffs = [lowCut, highCut] scaled = [x / self.outputRate for x in self.bandpassCutoffs] self.bandpass.setBandpass(*scaled) def setLowCut(self, lowCut: float) -> None: self.bandpassCutoffs[0] = lowCut self.setBandpass(*self.bandpassCutoffs) def setHighCut(self, highCut: float) -> None: self.bandpassCutoffs[1] = highCut self.setBandpass(*self.bandpassCutoffs) def setPowerWriter(self, writer: Writer) -> None: self.squelch.setPowerWriter(writer) def setOutputRate(self, outputRate: int) -> None: if outputRate == self.outputRate: return self.outputRate = outputRate self.decimation.setOutputRate(outputRate) self.squelch.setReportInterval(int(outputRate / (self.readings_per_second * 1024))) self.bandpass = self._buildBandpass() self.setBandpass(*self.bandpassCutoffs) self.replace(2, self.bandpass) def setInputRate(self, inputRate: int) -> None: if inputRate == self.inputRate: return self.inputRate = inputRate self.decimation.setInputRate(inputRate) self._updateShift() class SecondarySelector(Chain): def __init__(self, sampleRate: int, bandwidth: float): self.sampleRate = sampleRate self.frequencyOffset = 0 self.shift = Shift(0.0) cutoffRate = bandwidth / sampleRate self.bandpass = Bandpass(-cutoffRate, cutoffRate, cutoffRate, use_fft=True) workers = [self.shift, self.bandpass] super().__init__(workers) def setFrequencyOffset(self, offset: int) -> None: if offset == self.frequencyOffset: return self.frequencyOffset = offset if self.frequencyOffset is None: return self.shift.setRate(-offset / self.sampleRate)