use the fft chain directly without csdr dsp classes

This commit is contained in:
Jakob Ketterl 2021-01-23 19:27:01 +01:00
parent 4e429d047d
commit 4b94126dc3
2 changed files with 63 additions and 29 deletions

View File

@ -35,3 +35,23 @@ class Chain(Flow):
return return
self.output = buffer self.output = buffer
self.workers[-1].setOutput(buffer) self.workers[-1].setOutput(buffer)
def pump(self, write):
if self.output is None:
self.setOutput(Buffer())
def copy():
run = True
while run:
data = None
try:
data = self.output.read()
except ValueError:
pass
if data is None or (isinstance(data, bytes) and len(data) == 0):
run = False
else:
write(data)
return copy

View File

@ -1,5 +1,5 @@
from owrx.config import Config from owrx.config import Config
from csdr import csdr from csdr.chain.fft import FftChain
import threading import threading
from owrx.source import SdrSource, SdrSourceEventClient from owrx.source import SdrSource, SdrSourceEventClient
from owrx.property import PropertyStack from owrx.property import PropertyStack
@ -9,7 +9,7 @@ import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class SpectrumThread(csdr.output, SdrSourceEventClient): class SpectrumThread(SdrSourceEventClient):
def __init__(self, sdrSource): def __init__(self, sdrSource):
self.sdrSource = sdrSource self.sdrSource = sdrSource
super().__init__() super().__init__()
@ -17,7 +17,7 @@ class SpectrumThread(csdr.output, SdrSourceEventClient):
stack = PropertyStack() stack = PropertyStack()
stack.addLayer(0, self.sdrSource.props) stack.addLayer(0, self.sdrSource.props)
stack.addLayer(1, Config.get()) stack.addLayer(1, Config.get())
self.props = props = stack.filter( self.props = stack.filter(
"samp_rate", "samp_rate",
"fft_size", "fft_size",
"fft_fps", "fft_fps",
@ -26,45 +26,56 @@ class SpectrumThread(csdr.output, SdrSourceEventClient):
"csdr_dynamic_bufsize", "csdr_dynamic_bufsize",
"csdr_print_bufsizes", "csdr_print_bufsizes",
"csdr_through", "csdr_through",
"temporary_directory",
) )
self.dsp = dsp = csdr.dsp(self) self.dsp = None
dsp.nc_port = self.sdrSource.getPort()
dsp.setBuffer(self.sdrSource.getBuffer())
dsp.set_demodulator("fft")
self.subscriptions = [ self.subscriptions = []
props.wireProperty("samp_rate", dsp.set_samp_rate), self.subscriptions += [
props.wireProperty("fft_size", dsp.set_fft_size), # these props require a restart
props.wireProperty("fft_fps", dsp.set_fft_fps), self.props.wireProperty("fft_size", self.restart),
props.wireProperty("fft_compression", dsp.set_fft_compression), self.props.wireProperty("fft_compression", self.restart),
props.wireProperty("temporary_directory", dsp.set_temporary_directory),
props.wireProperty("fft_voverlap_factor", dsp.set_fft_voverlap_factor),
] ]
dsp.csdr_dynamic_bufsize = props["csdr_dynamic_bufsize"]
dsp.csdr_print_bufsizes = props["csdr_print_bufsizes"]
dsp.csdr_through = props["csdr_through"]
logger.debug("Spectrum thread initialized successfully.") logger.debug("Spectrum thread initialized successfully.")
def start(self): def start(self):
if self.dsp is not None:
return
self.dsp = FftChain(
self.props['samp_rate'],
self.props['fft_size'],
self.props['fft_voverlap_factor'],
self.props['fft_fps'],
self.props['fft_compression']
)
self.sdrSource.addClient(self) self.sdrSource.addClient(self)
self.subscriptions += [
# these props can be set on the fly
self.props.wireProperty("samp_rate", self.dsp.setSampleRate),
self.props.wireProperty("fft_fps", self.dsp.setFps),
self.props.wireProperty("fft_voverlap_factor", self.dsp.setVOverlapFactor),
]
threading.Thread(target=self.dsp.pump(self.sdrSource.writeSpectrumData)).start()
if self.sdrSource.isAvailable(): if self.sdrSource.isAvailable():
self.dsp.start() self.dsp.setInput(self.sdrSource.getBuffer())
def supports_type(self, t):
return t == "audio"
def receive_output(self, type, read_fn):
threading.Thread(target=self.pump(read_fn, self.sdrSource.writeSpectrumData)).start()
def stop(self): def stop(self):
if self.dsp is None:
return
self.dsp.stop() self.dsp.stop()
self.dsp = None
self.sdrSource.removeClient(self) self.sdrSource.removeClient(self)
for c in self.subscriptions: while self.subscriptions:
c.cancel() self.subscriptions.pop().cancel()
self.subscriptions = []
def restart(self, *args, **kwargs):
self.stop()
self.start()
def getClientClass(self): def getClientClass(self):
return SdrSource.CLIENT_USER return SdrSource.CLIENT_USER
@ -73,7 +84,10 @@ class SpectrumThread(csdr.output, SdrSourceEventClient):
if state in [SdrSource.STATE_STOPPING, SdrSource.STATE_FAILED]: if state in [SdrSource.STATE_STOPPING, SdrSource.STATE_FAILED]:
self.dsp.stop() self.dsp.stop()
elif state == SdrSource.STATE_RUNNING: elif state == SdrSource.STATE_RUNNING:
self.dsp.start() if self.dsp is None:
self.start()
else:
self.dsp.setInput(self.sdrSource.getBuffer())
def onBusyStateChange(self, state): def onBusyStateChange(self, state):
pass pass