From 9efe41a2b1da71d700bbe78e0aa172796c9945f4 Mon Sep 17 00:00:00 2001 From: Jakob Ketterl Date: Mon, 20 Sep 2021 15:09:26 +0200 Subject: [PATCH] move the pump mechanism, allowing the old output code to be removed --- csdr/__init__.py | 4 +--- csdr/chain/__init__.py | 20 -------------------- csdr/module.py | 28 ++++++++++++++-------------- csdr/output.py | 36 ------------------------------------ owrx/dsp.py | 10 +++++----- owrx/fft.py | 12 +++++++++--- 6 files changed, 29 insertions(+), 81 deletions(-) delete mode 100644 csdr/output.py diff --git a/csdr/__init__.py b/csdr/__init__.py index 5e05ad9..561997a 100644 --- a/csdr/__init__.py +++ b/csdr/__init__.py @@ -28,8 +28,6 @@ import threading import math from functools import partial -from csdr.output import Output - from owrx.aprs.direwolf import DirewolfConfig, DirewolfConfigSubscriber from owrx.audio.chopper import AudioChopper @@ -48,7 +46,7 @@ logger = logging.getLogger(__name__) class Dsp(DirewolfConfigSubscriber): - def __init__(self, output: Output): + def __init__(self, output): self.pycsdr_enabled = True self.pycsdr_chain = None self.pycsdr_client_chain = None diff --git a/csdr/chain/__init__.py b/csdr/chain/__init__.py index f4c14e8..6e9b210 100644 --- a/csdr/chain/__init__.py +++ b/csdr/chain/__init__.py @@ -145,23 +145,3 @@ class Chain(Module): return self.workers[-1].getOutputFormat() else: raise BufferError("getOutputFormat on empty chain") - - def pump(self, write): - if self.writer is None: - self.setWriter(Buffer(self.getOutputFormat())) - self.clientReader = self.writer.getReader() - - def copy(): - run = True - while run: - data = None - try: - data = self.clientReader.read() - except ValueError: - pass - if data is None: - run = False - else: - write(data) - - return copy diff --git a/csdr/module.py b/csdr/module.py index 7ade431..906d98f 100644 --- a/csdr/module.py +++ b/csdr/module.py @@ -29,6 +29,20 @@ class Module(BaseModule, metaclass=ABCMeta): def getOutputFormat(self) -> Format: pass + def pump(self, read, write): + def copy(): + while True: + data = None + try: + data = read() + except ValueError: + pass + if data is None or isinstance(data, bytes) and len(data) == 0: + break + write(data) + + return copy + class AutoStartModule(Module, metaclass=ABCMeta): def _checkStart(self) -> None: @@ -47,20 +61,6 @@ class AutoStartModule(Module, metaclass=ABCMeta): def start(self): pass - def pump(self, read, write): - def copy(): - while True: - data = None - try: - data = read() - except ValueError: - pass - if data is None or isinstance(data, bytes) and len(data) == 0: - break - write(data) - - return copy - class ThreadModule(AutoStartModule, Thread, metaclass=ABCMeta): def __init__(self): diff --git a/csdr/output.py b/csdr/output.py deleted file mode 100644 index 5fef242..0000000 --- a/csdr/output.py +++ /dev/null @@ -1,36 +0,0 @@ -import threading -import logging - -logger = logging.getLogger(__name__) - - -class Output(object): - def send_output(self, t, read_fn): - if not self.supports_type(t): - # TODO rewrite the output mechanism in a way that avoids producing unnecessary data - logger.warning("dumping output of type %s since it is not supported.", t) - threading.Thread(target=self.pump(read_fn, lambda x: None), name="csdr_pump_thread").start() - return - self.receive_output(t, read_fn) - - def receive_output(self, t, read_fn): - pass - - def pump(self, read, write): - def copy(): - run = True - while run: - data = None - try: - data = read() - except ValueError: - pass - if data is None or (isinstance(data, bytes) and len(data) == 0): - run = False - else: - write(data) - - return copy - - def supports_type(self, t): - return True diff --git a/owrx/dsp.py b/owrx/dsp.py index af4ccd7..b27b6ea 100644 --- a/owrx/dsp.py +++ b/owrx/dsp.py @@ -4,7 +4,6 @@ from owrx.source import SdrSourceEventClient, SdrSourceState, SdrClientClass from owrx.property import PropertyStack, PropertyLayer, PropertyValidator from owrx.property.validators import OrValidator, RegexValidator, BoolValidator from owrx.modes import Modes -from csdr.output import Output from csdr.chain import Chain from csdr.chain.demodulator import BaseDemodulatorChain, FixedIfSampleRateChain, FixedAudioRateChain, HdAudio, SecondaryDemodulator, DialFrequencyReceiver from csdr.chain.selector import Selector @@ -287,7 +286,7 @@ class ModulationValidator(OrValidator): super().__init__(BoolValidator(), RegexValidator(re.compile("^[a-z0-9]+$"))) -class DspManager(Output, SdrSourceEventClient): +class DspManager(SdrSourceEventClient): def __init__(self, handler, sdrSource): self.handler = handler self.sdrSource = sdrSource @@ -540,7 +539,7 @@ class DspManager(Output, SdrSourceEventClient): reader = buffer.getReader() self.readers[t] = reader - threading.Thread(target=self.pump(reader.read, write), name="dsp_pump_{}".format(t)).start() + threading.Thread(target=self.chain.pump(reader.read, write), name="dsp_pump_{}".format(t)).start() def _unpickle(self, callback): def unpickler(data): @@ -554,8 +553,9 @@ class DspManager(Output, SdrSourceEventClient): return unpickler def stop(self): - self.chain.stop() - self.chain = None + if self.chain: + self.chain.stop() + self.chain = None for reader in self.readers.values(): reader.stop() self.readers = {} diff --git a/owrx/fft.py b/owrx/fft.py index e1a8004..e7d216f 100644 --- a/owrx/fft.py +++ b/owrx/fft.py @@ -1,9 +1,9 @@ -from owrx.config.core import CoreConfig from owrx.config import Config from csdr.chain.fft import FftChain -import threading from owrx.source import SdrSourceEventClient, SdrSourceState, SdrClientClass from owrx.property import PropertyStack +from pycsdr.modules import Buffer +import threading import logging @@ -27,6 +27,7 @@ class SpectrumThread(SdrSourceEventClient): ) self.dsp = None + self.reader = None self.subscriptions = [] @@ -53,7 +54,10 @@ class SpectrumThread(SdrSourceEventClient): self.props.wireProperty("fft_voverlap_factor", self.dsp.setVOverlapFactor), ] - threading.Thread(target=self.dsp.pump(self.sdrSource.writeSpectrumData)).start() + buffer = Buffer(self.dsp.getOutputFormat()) + self.dsp.setWriter(buffer) + self.reader = buffer.getReader() + threading.Thread(target=self.dsp.pump(self.reader.read, self.sdrSource.writeSpectrumData)).start() if self.sdrSource.isAvailable(): self.dsp.setReader(self.sdrSource.getBuffer().getReader()) @@ -63,6 +67,8 @@ class SpectrumThread(SdrSourceEventClient): return self.dsp.stop() self.dsp = None + self.reader.stop() + self.reader = None self.sdrSource.removeClient(self) while self.subscriptions: self.subscriptions.pop().cancel()