diff --git a/csdr/chain/__init__.py b/csdr/chain/__init__.py index 33ec232..bab0c0b 100644 --- a/csdr/chain/__init__.py +++ b/csdr/chain/__init__.py @@ -35,3 +35,23 @@ class Chain(Flow): return self.output = buffer self.workers[-1].setOutput(buffer) + + def pump(self, write): + if self.output is None: + self.setOutput(Buffer()) + + def copy(): + run = True + while run: + data = None + try: + data = self.output.read() + except ValueError: + pass + if data is None or (isinstance(data, bytes) and len(data) == 0): + run = False + else: + write(data) + + return copy + diff --git a/owrx/fft.py b/owrx/fft.py index 141ca45..3346f0f 100644 --- a/owrx/fft.py +++ b/owrx/fft.py @@ -1,5 +1,5 @@ from owrx.config import Config -from csdr import csdr +from csdr.chain.fft import FftChain import threading from owrx.source import SdrSource, SdrSourceEventClient from owrx.property import PropertyStack @@ -9,7 +9,7 @@ import logging logger = logging.getLogger(__name__) -class SpectrumThread(csdr.output, SdrSourceEventClient): +class SpectrumThread(SdrSourceEventClient): def __init__(self, sdrSource): self.sdrSource = sdrSource super().__init__() @@ -17,7 +17,7 @@ class SpectrumThread(csdr.output, SdrSourceEventClient): stack = PropertyStack() stack.addLayer(0, self.sdrSource.props) stack.addLayer(1, Config.get()) - self.props = props = stack.filter( + self.props = stack.filter( "samp_rate", "fft_size", "fft_fps", @@ -26,45 +26,56 @@ class SpectrumThread(csdr.output, SdrSourceEventClient): "csdr_dynamic_bufsize", "csdr_print_bufsizes", "csdr_through", - "temporary_directory", ) - self.dsp = dsp = csdr.dsp(self) - dsp.nc_port = self.sdrSource.getPort() - dsp.setBuffer(self.sdrSource.getBuffer()) - dsp.set_demodulator("fft") + self.dsp = None - self.subscriptions = [ - props.wireProperty("samp_rate", dsp.set_samp_rate), - props.wireProperty("fft_size", dsp.set_fft_size), - props.wireProperty("fft_fps", dsp.set_fft_fps), - props.wireProperty("fft_compression", dsp.set_fft_compression), - props.wireProperty("temporary_directory", dsp.set_temporary_directory), - props.wireProperty("fft_voverlap_factor", dsp.set_fft_voverlap_factor), + self.subscriptions = [] + self.subscriptions += [ + # these props require a restart + self.props.wireProperty("fft_size", self.restart), + self.props.wireProperty("fft_compression", self.restart), ] - dsp.csdr_dynamic_bufsize = props["csdr_dynamic_bufsize"] - dsp.csdr_print_bufsizes = props["csdr_print_bufsizes"] - dsp.csdr_through = props["csdr_through"] logger.debug("Spectrum thread initialized successfully.") def start(self): + if self.dsp is not None: + return + + self.dsp = FftChain( + self.props['samp_rate'], + self.props['fft_size'], + self.props['fft_voverlap_factor'], + self.props['fft_fps'], + self.props['fft_compression'] + ) self.sdrSource.addClient(self) + + self.subscriptions += [ + # these props can be set on the fly + self.props.wireProperty("samp_rate", self.dsp.setSampleRate), + self.props.wireProperty("fft_fps", self.dsp.setFps), + self.props.wireProperty("fft_voverlap_factor", self.dsp.setVOverlapFactor), + ] + + threading.Thread(target=self.dsp.pump(self.sdrSource.writeSpectrumData)).start() + if self.sdrSource.isAvailable(): - self.dsp.start() - - def supports_type(self, t): - return t == "audio" - - def receive_output(self, type, read_fn): - threading.Thread(target=self.pump(read_fn, self.sdrSource.writeSpectrumData)).start() + self.dsp.setInput(self.sdrSource.getBuffer()) def stop(self): + if self.dsp is None: + return self.dsp.stop() + self.dsp = None self.sdrSource.removeClient(self) - for c in self.subscriptions: - c.cancel() - self.subscriptions = [] + while self.subscriptions: + self.subscriptions.pop().cancel() + + def restart(self, *args, **kwargs): + self.stop() + self.start() def getClientClass(self): return SdrSource.CLIENT_USER @@ -73,7 +84,10 @@ class SpectrumThread(csdr.output, SdrSourceEventClient): if state in [SdrSource.STATE_STOPPING, SdrSource.STATE_FAILED]: self.dsp.stop() elif state == SdrSource.STATE_RUNNING: - self.dsp.start() + if self.dsp is None: + self.start() + else: + self.dsp.setInput(self.sdrSource.getBuffer()) def onBusyStateChange(self, state): pass