diff --git a/csdr/__init__.py b/csdr/__init__.py index c1c649b..162e5ea 100644 --- a/csdr/__init__.py +++ b/csdr/__init__.py @@ -35,6 +35,7 @@ from owrx.audio.chopper import AudioChopper from csdr.pipe import Pipe +from pycsdr.modules import Buffer from csdr.chain.demodulator import DemodulatorChain from csdr.chain.fm import Fm from csdr.chain.am import Am @@ -49,6 +50,7 @@ class Dsp(DirewolfConfigSubscriber): def __init__(self, output: Output): self.pycsdr_enabled = True self.pycsdr_chain = None + self.pycsdr_reader = None self.buffer = None self.samp_rate = 250000 @@ -729,7 +731,10 @@ class Dsp(DirewolfConfigSubscriber): self.set_bpf(self.low_cut, self.high_cut) self.set_offset_freq(self.offset_freq) chain.setInput(self.buffer) - self.output.send_output("audio", chain.getOutput().read) + outputBuffer = Buffer(chain.getOutputFormat()) + chain.setWriter(outputBuffer) + self.pycsdr_reader = outputBuffer.getReader() + self.output.send_output("audio", self.pycsdr_reader.read) return command_base = " | ".join(chain) @@ -824,6 +829,8 @@ class Dsp(DirewolfConfigSubscriber): if self.pycsdr_enabled and self.pycsdr_chain is not None: self.pycsdr_chain.stop() self.pycsdr_chain = None + self.pycsdr_reader.stop() + self.pycsdr_reader = None if self.process is not None: try: os.killpg(os.getpgid(self.process.pid), signal.SIGTERM) diff --git a/csdr/chain/__init__.py b/csdr/chain/__init__.py index 6be0ce8..798ed50 100644 --- a/csdr/chain/__init__.py +++ b/csdr/chain/__init__.py @@ -1,56 +1,65 @@ -from pycsdr.modules import Buffer +from pycsdr.modules import Buffer, Writer class Chain: def __init__(self, *workers): self.input = None self.output = None + self.reader = None self.workers = list(workers) - for i in range(1, len(self.workers)): - self._connect(self.workers[i - 1], self.workers[i]) + filtered = self._filterWorkers() + for i in range(1, len(filtered)): + self._connect(filtered[i - 1], filtered[i]) + + def _filterWorkers(self): + return [w for w in self.workers if not isinstance(w, Chain) or not w.empty()] + + def empty(self): + return len(self.workers) <= 0 def _connect(self, w1, w2): - if isinstance(w1, Chain): - buffer = w1.getOutput() + writer = Buffer(w1.getOutputFormat()) + w1.setWriter(writer) + if isinstance(w2, Chain): + w2.setInput(writer) else: - buffer = Buffer(w1.getOutputFormat()) - w1.setOutput(buffer) - w2.setInput(buffer) + w2.setReader(writer.getReader()) def stop(self): for w in self.workers: w.stop() - self.setInput(None) - if self.output is not None: - self.output.stop() + if self.reader is not None: + self.reader.stop() + self.reader = None - def setInput(self, buffer): + def setInput(self, buffer: Buffer): if self.input == buffer: return self.input = buffer if self.workers: - self.workers[0].setInput(buffer) + firstWorker = self.workers[0] + if isinstance(firstWorker, Chain): + firstWorker.setInput(buffer) + else: + firstWorker.setReader(buffer.getReader()) else: self.output = self.input - def getOutput(self): - if self.output is None: - if self.workers: - lastWorker = self.workers[-1] - if isinstance(lastWorker, Chain): - self.output = lastWorker.getOutput() - else: - self.output = Buffer(self.getOutputFormat()) - self.workers[-1].setOutput(self.output) - else: - self.output = self.input - return self.output + def setWriter(self, writer: Writer): + if self.output == writer: + return + self.output = writer + if self.workers: + lastWorker = self.workers[-1] + lastWorker.setWriter(self.output) + else: + raise BufferError("setOutput on empty chain") def getOutputFormat(self): if self.workers: return self.workers[-1].getOutputFormat() else: - return self.input.getOutputFormat() + raise BufferError("getOutputFormat on empty chain") def replace(self, index, newWorker): if index >= len(self.workers): @@ -60,36 +69,41 @@ class Chain: self.workers[index] = newWorker if index == 0: - newWorker.setInput(self.input) + writer = self.input else: previousWorker = self.workers[index - 1] - if isinstance(previousWorker, Chain): - newWorker.setInput(previousWorker.getOutput()) + writer = Buffer(previousWorker.getOutputFormat()) + previousWorker.setWriter(writer) + + if writer is not None: + if isinstance(newWorker, Chain): + newWorker.setInput(writer) else: - buffer = Buffer(previousWorker.getOutputFormat()) - previousWorker.setOutput(buffer) - newWorker.setInput(buffer) + newWorker.setReader(writer.getReader()) if index < len(self.workers) - 1: nextWorker = self.workers[index + 1] - if isinstance(newWorker, Chain): - nextWorker.setInput(newWorker.getOutput()) + writer = Buffer(newWorker.getOutputFormat()) + newWorker.setWriter(writer) + if isinstance(nextWorker, Chain): + nextWorker.setInput(writer) else: - buffer = Buffer(newWorker.getOutputFormat()) - newWorker.setOutput(buffer) - nextWorker.setInput(buffer) + nextWorker.setReader(writer.getReader()) else: - newWorker.setOutput(self.output) + if self.output is not None: + newWorker.setWriter(self.output) def pump(self, write): - output = self.getOutput() + if self.output is None: + self.setWriter(Buffer(self.getOutputFormat())) + self.reader = self.output.getReader() def copy(): run = True while run: data = None try: - data = output.read() + data = self.reader.read() except ValueError: pass if data is None or (isinstance(data, bytes) and len(data) == 0): diff --git a/owrx/source/__init__.py b/owrx/source/__init__.py index ddd4f36..a932d38 100644 --- a/owrx/source/__init__.py +++ b/owrx/source/__init__.py @@ -252,13 +252,13 @@ class SdrSource(ABC): def _getTcpSource(self): with self.modificationLock: if self.tcpSource is None: - self.tcpSource = TcpSource(self.port) + self.tcpSource = TcpSource(self.port, Format.COMPLEX_FLOAT) return self.tcpSource def getBuffer(self): if self.buffer is None: self.buffer = Buffer(Format.COMPLEX_FLOAT) - self._getTcpSource().setOutput(self.buffer) + self._getTcpSource().setWriter(self.buffer) return self.buffer def getCommandValues(self):