From 42b315ef8630626c3e262e3ac43fa4b82192e156 Mon Sep 17 00:00:00 2001 From: Jakob Ketterl Date: Fri, 27 Aug 2021 16:11:03 +0200 Subject: [PATCH] handle empty converter chain --- csdr/chain/__init__.py | 20 +++++++++++++++++++- csdr/chain/clientaudio.py | 23 +++++++++++++++++++---- owrx/dsp.py | 25 +++++++++++++++++++------ 3 files changed, 57 insertions(+), 11 deletions(-) diff --git a/csdr/chain/__init__.py b/csdr/chain/__init__.py index 1a39d3d..0055b20 100644 --- a/csdr/chain/__init__.py +++ b/csdr/chain/__init__.py @@ -11,6 +11,9 @@ class Chain: for i in range(1, len(self.workers)): self._connect(self.workers[i - 1], self.workers[i]) + def empty(self): + return not self.workers + def _connect(self, w1, w2): writer = Buffer(w1.getOutputFormat()) w1.setWriter(writer) @@ -73,7 +76,7 @@ class Chain: error = e if error is not None: - raise e + raise error def append(self, newWorker): previousWorker = None @@ -90,6 +93,21 @@ class Chain: if self.writer is not None: newWorker.setWriter(self.writer) + def insert(self, newWorker): + nextWorker = None + if self.workers: + nextWorker = self.workers[0] + + self.workers.insert(0, newWorker) + + if nextWorker: + self._connect(newWorker, nextWorker) + elif self.writer is not None: + newWorker.setWriter(self.writer) + + if self.reader is not None: + newWorker.setReader(self.reader) + def remove(self, index): removedWorker = self.workers[index] self.workers.remove(removedWorker) diff --git a/csdr/chain/clientaudio.py b/csdr/chain/clientaudio.py index aab887a..febfd15 100644 --- a/csdr/chain/clientaudio.py +++ b/csdr/chain/clientaudio.py @@ -21,7 +21,10 @@ class ClientAudioChain(Chain): self.format = format self.inputRate = inputRate self.clientRate = clientRate - workers = [self._buildConverter()] + workers = [] + converter = self._buildConverter() + if not converter.empty(): + workers += [converter] if compression == "adpcm": workers += [AdpcmEncoder(sync=True)] super().__init__(workers) @@ -29,23 +32,35 @@ class ClientAudioChain(Chain): def _buildConverter(self): return Converter(self.format, self.inputRate, self.clientRate) + def _updateConverter(self): + converter = self._buildConverter() + index = self.indexOf(lambda x: isinstance(x, Converter)) + if converter.empty(): + if index >= 0: + self.remove(index) + else: + if index >= 0: + self.replace(index, converter) + else: + self.insert(converter) + def setFormat(self, format: Format) -> None: if format == self.format: return self.format = format - self.replace(0, self._buildConverter()) + self._updateConverter() def setInputRate(self, inputRate: int) -> None: if inputRate == self.inputRate: return self.inputRate = inputRate - self.replace(0, self._buildConverter()) + self._updateConverter() def setClientRate(self, clientRate: int) -> None: if clientRate == self.clientRate: return self.clientRate = clientRate - self.replace(0, self._buildConverter()) + self._updateConverter() def setAudioCompression(self, compression: str) -> None: index = self.indexOf(lambda x: isinstance(x, AdpcmEncoder)) diff --git a/owrx/dsp.py b/owrx/dsp.py index aa79338..03f0b38 100644 --- a/owrx/dsp.py +++ b/owrx/dsp.py @@ -33,7 +33,7 @@ class ClientDemodulatorChain(Chain): self.selector = Selector(sampleRate, outputRate, 0.0) self.selector.setBandpass(-4000, 4000) self.demodulator = demod - self.clientAudioChain = ClientAudioChain(Format.FLOAT, outputRate, outputRate, audioCompression) + self.clientAudioChain = ClientAudioChain(demod.getOutputFormat(), outputRate, outputRate, audioCompression) self.metaWriter = None self.squelchLevel = -150 super().__init__([self.selector, self.demodulator, self.clientAudioChain]) @@ -192,7 +192,7 @@ class DspManager(Output, SdrSourceEventClient): self.props["audio_compression"] ) - self.readers = [] + self.readers = {} # wire audio output buffer = Buffer(self.chain.getOutputFormat()) @@ -235,7 +235,7 @@ class DspManager(Output, SdrSourceEventClient): self.chain.setFrequencyOffset(0) self.subscriptions = [ - self.props.wireProperty("audio_compression", self.chain.setAudioCompression), + self.props.wireProperty("audio_compression", self.setAudioCompression), # probably unused: # self.props.wireProperty("fft_compression", self.dsp.set_fft_compression), # TODO @@ -322,6 +322,15 @@ class DspManager(Output, SdrSourceEventClient): raise ValueError("unsupported demodulator: {}".format(mod)) self.chain.setDemodulator(demodulator) + def setAudioCompression(self, comp): + try: + self.chain.setAudioCompression(comp) + except ValueError: + # wrong output format... need to re-wire + buffer = Buffer(self.chain.getOutputFormat()) + self.chain.setWriter(buffer) + self.wireOutput("audio", buffer) + def start(self): if self.sdrSource.isAvailable(): self.chain.setReader(self.sdrSource.getBuffer().getReader()) @@ -342,15 +351,19 @@ class DspManager(Output, SdrSourceEventClient): write = writers[t] + if t in self.readers: + self.readers[t].stop() + reader = buffer.getReader() - self.readers.append(reader) + self.readers[t] = reader threading.Thread(target=self.pump(reader.read, write), name="dsp_pump_{}".format(t)).start() def stop(self): self.chain.stop() self.chain = None - while self.readers: - self.readers.pop().stop() + for reader in self.readers.values(): + reader.stop() + self.readers = {} self.startOnAvailable = False self.sdrSource.removeClient(self)