openwebrx-clone/csdr/chain/fft.py
Jakob Ketterl a2d731503f update api
2021-01-02 03:12:21 +01:00

106 lines
3.6 KiB
Python

from csdr.chain import Chain
from pycsdr.modules import Fft, LogPower, LogAveragePower, FftExchangeSides, CompressFftAdpcm
import logging
logger = logging.getLogger(__name__)
class FftAverager(Chain):
def __init__(self, fft_size, fft_averages):
self.fftSize = fft_size
self.fftAverages = None
self.worker = None
self.input = None
self.output = None
self.setFftAverages(fft_averages)
workers = [self.worker]
super().__init__(*workers)
def setFftAverages(self, fft_averages):
if self.fftAverages == fft_averages:
return
if fft_averages == 0 and self.fftAverages != 0:
if self.worker is not None:
self.worker.stop()
self.worker = LogPower(add_db=70)
if self.output is not None:
self.worker.setOutput(self.output)
if self.input is not None:
self.worker.setInput(self.input)
elif fft_averages != 0:
if self.fftAverages == 0 or self.worker is None:
if self.worker is not None:
self.worker.stop()
self.worker = LogAveragePower(add_db=-70, fft_size=self.fftSize, avg_number=fft_averages)
if self.output is not None:
self.worker.setOutput(self.output)
if self.input is not None:
self.worker.setInput(self.input)
else:
self.worker.setFftAverages(avg_number=fft_averages)
self.workers = [self.worker]
self.fftAverages = fft_averages
class FftChain(Chain):
def __init__(self, samp_rate, fft_size, fft_v_overlap_factor, fft_fps, fft_compression):
self.sampleRate = samp_rate
self.vOverlapFactor = fft_v_overlap_factor
self.fps = fft_fps
self.size = fft_size
self.blockSize = 0
self.fft = Fft(size=self.size, every_n_samples=self.blockSize)
self.averager = FftAverager(fft_size=self.size, fft_averages=10)
self.fftExchangeSides = FftExchangeSides(fft_size=self.size)
workers = [
self.fft,
self.averager,
self.fftExchangeSides,
]
self.compressFftAdpcm = None
if fft_compression == "adpcm":
self.compressFftAdpcm = CompressFftAdpcm(fft_size=self.size)
workers += [self.compressFftAdpcm]
self._updateParameters()
super().__init__(*workers)
def _setBlockSize(self, fft_block_size):
if self.blockSize == int(fft_block_size):
return
self.blockSize = int(fft_block_size)
self.fft.setEveryNSamples(self.blockSize)
def setVOverlapFactor(self, fft_v_overlap_factor):
if self.vOverlapFactor == fft_v_overlap_factor:
return
self.vOverlapFactor = fft_v_overlap_factor
self._updateParameters()
def setFps(self, fft_fps):
if self.fps == fft_fps:
return
self.fps = fft_fps
self._updateParameters()
def setSampleRate(self, samp_rate):
if self.sampleRate == samp_rate:
return
self.sampleRate = samp_rate
self._updateParameters()
def _updateParameters(self):
fftAverages = 0
if self.vOverlapFactor > 0:
fftAverages = int(round(1.0 * self.sampleRate / self.size / self.fps / (1.0 - self.vOverlapFactor)))
self.averager.setFftAverages(fftAverages)
if fftAverages == 0:
self._setBlockSize(self.sampleRate / self.fps)
else:
self._setBlockSize(self.sampleRate / self.fps / fftAverages)