From 0f1feb9d473241ce193921d274b170dfd942fa2f Mon Sep 17 00:00:00 2001 From: Jakob Ketterl Date: Mon, 16 Aug 2021 16:41:18 +0200 Subject: [PATCH] return to the simpler API --- csdr/__init__.py | 6 +-- csdr/chain/__init__.py | 101 ++++++++++++++++------------------------- csdr/chain/ssb.py | 5 -- owrx/fft.py | 4 +- 4 files changed, 43 insertions(+), 73 deletions(-) diff --git a/csdr/__init__.py b/csdr/__init__.py index 0c5a0bd..e5c70ab 100644 --- a/csdr/__init__.py +++ b/csdr/__init__.py @@ -117,7 +117,7 @@ class Dsp(DirewolfConfigSubscriber): def setBuffer(self, buffer): self.buffer = buffer if self.pycsdr_chain is not None: - self.pycsdr_chain.setInput(buffer) + self.pycsdr_chain.setReader(buffer.getReader()) def set_service(self, flag=True): self.is_service = flag @@ -754,14 +754,14 @@ class Dsp(DirewolfConfigSubscriber): self.set_bpf(self.low_cut, self.high_cut) self.set_offset_freq(self.offset_freq) - chain.setInput(self.buffer) + chain.setReader(self.buffer.getReader()) output_rate = self.get_hd_output_rate() if self.isHdAudio() else self.get_output_rate() audio_rate = 8000 if self.isDigitalVoice() else self.get_audio_rate() self.pycsdr_client_chain = ClientAudioChain(chain.getOutputFormat(), audio_rate, output_rate, self.audio_compression) buffer = Buffer(chain.getOutputFormat()) chain.setWriter(buffer) - self.pycsdr_client_chain.setInput(buffer) + self.pycsdr_client_chain.setReader(buffer.getReader()) outputBuffer = Buffer(self.pycsdr_client_chain.getOutputFormat()) self.pycsdr_client_chain.setWriter(outputBuffer) diff --git a/csdr/chain/__init__.py b/csdr/chain/__init__.py index 138a179..0c3b085 100644 --- a/csdr/chain/__init__.py +++ b/csdr/chain/__init__.py @@ -3,57 +3,39 @@ from pycsdr.modules import Buffer, Writer class Chain: def __init__(self, *workers): - self.input = None - self.output = None self.reader = None + self.writer = None + self.clientReader = None self.workers = list(workers) - filtered = self._filterWorkers() - for i in range(1, len(filtered)): - self._connect(filtered[i - 1], filtered[i]) - - def _filterWorkers(self): - return [w for w in self.workers if not isinstance(w, Chain) or not w.empty()] - - def empty(self): - return len(self.workers) <= 0 + for i in range(1, len(self.workers)): + self._connect(self.workers[i - 1], self.workers[i]) def _connect(self, w1, w2): writer = Buffer(w1.getOutputFormat()) w1.setWriter(writer) - if isinstance(w2, Chain): - w2.setInput(writer) - else: - w2.setReader(writer.getReader()) + w2.setReader(writer.getReader()) + + def setReader(self, reader): + if self.reader is reader: + return + self.reader = reader + if self.workers: + self.workers[0].setReader(reader) + + def setWriter(self, writer): + if self.writer is writer: + return + self.writer = writer + if self.workers: + self.workers[-1].setWriter(writer) def stop(self): for w in self.workers: w.stop() - if self.reader is not None: - self.reader.stop() - self.reader = None - - def setInput(self, buffer: Buffer): - if self.input == buffer: - return - self.input = buffer - if self.workers: - firstWorker = self.workers[0] - if isinstance(firstWorker, Chain): - firstWorker.setInput(buffer) - else: - firstWorker.setReader(buffer.getReader()) - else: - self.output = self.input - - def setWriter(self, writer: Writer): - if self.output == writer: - return - self.output = writer - if self.workers: - lastWorker = self.workers[-1] - lastWorker.setWriter(self.output) - else: - raise BufferError("setOutput on empty chain") + if self.clientReader is not None: + # TODO should be covered by finalize + self.clientReader.stop() + self.clientReader = None def getOutputFormat(self): if self.workers: @@ -69,41 +51,34 @@ class Chain: self.workers[index] = newWorker if index == 0: - writer = self.input + if self.reader is not None: + newWorker.setReader(self.reader) else: previousWorker = self.workers[index - 1] - writer = Buffer(previousWorker.getOutputFormat()) - previousWorker.setWriter(writer) + buffer = Buffer(previousWorker.getOutputFormat()) + previousWorker.setWriter(buffer) + newWorker.setReader(buffer.getReader()) - if writer is not None: - if isinstance(newWorker, Chain): - newWorker.setInput(writer) - else: - newWorker.setReader(writer.getReader()) - - if index < len(self.workers) - 1: - nextWorker = self.workers[index + 1] - writer = Buffer(newWorker.getOutputFormat()) - newWorker.setWriter(writer) - if isinstance(nextWorker, Chain): - nextWorker.setInput(writer) - else: - nextWorker.setReader(writer.getReader()) + if index == len(self.workers) - 1: + if self.writer is not None: + newWorker.setWriter(self.writer) else: - if self.output is not None: - newWorker.setWriter(self.output) + nextWorker = self.workers[index + 1] + buffer = Buffer(newWorker.getOutputFormat()) + newWorker.setWriter(buffer) + nextWorker.setReader(buffer.getReader()) def pump(self, write): - if self.output is None: + if self.writer is None: self.setWriter(Buffer(self.getOutputFormat())) - self.reader = self.output.getReader() + self.clientReader = self.writer.getReader() def copy(): run = True while run: data = None try: - data = self.reader.read() + data = self.clientReader.read() except ValueError: pass if data is None: diff --git a/csdr/chain/ssb.py b/csdr/chain/ssb.py index 4034281..9f20368 100644 --- a/csdr/chain/ssb.py +++ b/csdr/chain/ssb.py @@ -7,11 +7,6 @@ class Ssb(Chain): def __init__(self): workers = [ RealPart(), - # empty chain as placeholder for the "last decimation" - Chain(), Agc(Format.FLOAT), ] super().__init__(*workers) - - def setLastDecimation(self, decimation: Chain): - self.replace(1, decimation) diff --git a/owrx/fft.py b/owrx/fft.py index 5149f4c..e97c5a8 100644 --- a/owrx/fft.py +++ b/owrx/fft.py @@ -56,7 +56,7 @@ class SpectrumThread(SdrSourceEventClient): threading.Thread(target=self.dsp.pump(self.sdrSource.writeSpectrumData)).start() if self.sdrSource.isAvailable(): - self.dsp.setInput(self.sdrSource.getBuffer()) + self.dsp.setReader(self.sdrSource.getBuffer().getReader()) def stop(self): if self.dsp is None: @@ -81,7 +81,7 @@ class SpectrumThread(SdrSourceEventClient): if self.dsp is None: self.start() else: - self.dsp.setInput(self.sdrSource.getBuffer()) + self.dsp.setReader(self.sdrSource.getBuffer().getReader()) def onFail(self): self.dsp.stop()