move the pump mechanism, allowing the old output code to be removed

This commit is contained in:
Jakob Ketterl 2021-09-20 15:09:26 +02:00
parent 4b36aca6fc
commit 9efe41a2b1
6 changed files with 29 additions and 81 deletions

View File

@ -28,8 +28,6 @@ import threading
import math import math
from functools import partial from functools import partial
from csdr.output import Output
from owrx.aprs.direwolf import DirewolfConfig, DirewolfConfigSubscriber from owrx.aprs.direwolf import DirewolfConfig, DirewolfConfigSubscriber
from owrx.audio.chopper import AudioChopper from owrx.audio.chopper import AudioChopper
@ -48,7 +46,7 @@ logger = logging.getLogger(__name__)
class Dsp(DirewolfConfigSubscriber): class Dsp(DirewolfConfigSubscriber):
def __init__(self, output: Output): def __init__(self, output):
self.pycsdr_enabled = True self.pycsdr_enabled = True
self.pycsdr_chain = None self.pycsdr_chain = None
self.pycsdr_client_chain = None self.pycsdr_client_chain = None

View File

@ -145,23 +145,3 @@ class Chain(Module):
return self.workers[-1].getOutputFormat() return self.workers[-1].getOutputFormat()
else: else:
raise BufferError("getOutputFormat on empty chain") raise BufferError("getOutputFormat on empty chain")
def pump(self, write):
if self.writer is None:
self.setWriter(Buffer(self.getOutputFormat()))
self.clientReader = self.writer.getReader()
def copy():
run = True
while run:
data = None
try:
data = self.clientReader.read()
except ValueError:
pass
if data is None:
run = False
else:
write(data)
return copy

View File

@ -29,6 +29,20 @@ class Module(BaseModule, metaclass=ABCMeta):
def getOutputFormat(self) -> Format: def getOutputFormat(self) -> Format:
pass pass
def pump(self, read, write):
def copy():
while True:
data = None
try:
data = read()
except ValueError:
pass
if data is None or isinstance(data, bytes) and len(data) == 0:
break
write(data)
return copy
class AutoStartModule(Module, metaclass=ABCMeta): class AutoStartModule(Module, metaclass=ABCMeta):
def _checkStart(self) -> None: def _checkStart(self) -> None:
@ -47,20 +61,6 @@ class AutoStartModule(Module, metaclass=ABCMeta):
def start(self): def start(self):
pass pass
def pump(self, read, write):
def copy():
while True:
data = None
try:
data = read()
except ValueError:
pass
if data is None or isinstance(data, bytes) and len(data) == 0:
break
write(data)
return copy
class ThreadModule(AutoStartModule, Thread, metaclass=ABCMeta): class ThreadModule(AutoStartModule, Thread, metaclass=ABCMeta):
def __init__(self): def __init__(self):

View File

@ -1,36 +0,0 @@
import threading
import logging
logger = logging.getLogger(__name__)
class Output(object):
def send_output(self, t, read_fn):
if not self.supports_type(t):
# TODO rewrite the output mechanism in a way that avoids producing unnecessary data
logger.warning("dumping output of type %s since it is not supported.", t)
threading.Thread(target=self.pump(read_fn, lambda x: None), name="csdr_pump_thread").start()
return
self.receive_output(t, read_fn)
def receive_output(self, t, read_fn):
pass
def pump(self, read, write):
def copy():
run = True
while run:
data = None
try:
data = read()
except ValueError:
pass
if data is None or (isinstance(data, bytes) and len(data) == 0):
run = False
else:
write(data)
return copy
def supports_type(self, t):
return True

View File

@ -4,7 +4,6 @@ from owrx.source import SdrSourceEventClient, SdrSourceState, SdrClientClass
from owrx.property import PropertyStack, PropertyLayer, PropertyValidator from owrx.property import PropertyStack, PropertyLayer, PropertyValidator
from owrx.property.validators import OrValidator, RegexValidator, BoolValidator from owrx.property.validators import OrValidator, RegexValidator, BoolValidator
from owrx.modes import Modes from owrx.modes import Modes
from csdr.output import Output
from csdr.chain import Chain from csdr.chain import Chain
from csdr.chain.demodulator import BaseDemodulatorChain, FixedIfSampleRateChain, FixedAudioRateChain, HdAudio, SecondaryDemodulator, DialFrequencyReceiver from csdr.chain.demodulator import BaseDemodulatorChain, FixedIfSampleRateChain, FixedAudioRateChain, HdAudio, SecondaryDemodulator, DialFrequencyReceiver
from csdr.chain.selector import Selector from csdr.chain.selector import Selector
@ -287,7 +286,7 @@ class ModulationValidator(OrValidator):
super().__init__(BoolValidator(), RegexValidator(re.compile("^[a-z0-9]+$"))) super().__init__(BoolValidator(), RegexValidator(re.compile("^[a-z0-9]+$")))
class DspManager(Output, SdrSourceEventClient): class DspManager(SdrSourceEventClient):
def __init__(self, handler, sdrSource): def __init__(self, handler, sdrSource):
self.handler = handler self.handler = handler
self.sdrSource = sdrSource self.sdrSource = sdrSource
@ -540,7 +539,7 @@ class DspManager(Output, SdrSourceEventClient):
reader = buffer.getReader() reader = buffer.getReader()
self.readers[t] = reader self.readers[t] = reader
threading.Thread(target=self.pump(reader.read, write), name="dsp_pump_{}".format(t)).start() threading.Thread(target=self.chain.pump(reader.read, write), name="dsp_pump_{}".format(t)).start()
def _unpickle(self, callback): def _unpickle(self, callback):
def unpickler(data): def unpickler(data):
@ -554,6 +553,7 @@ class DspManager(Output, SdrSourceEventClient):
return unpickler return unpickler
def stop(self): def stop(self):
if self.chain:
self.chain.stop() self.chain.stop()
self.chain = None self.chain = None
for reader in self.readers.values(): for reader in self.readers.values():

View File

@ -1,9 +1,9 @@
from owrx.config.core import CoreConfig
from owrx.config import Config from owrx.config import Config
from csdr.chain.fft import FftChain from csdr.chain.fft import FftChain
import threading
from owrx.source import SdrSourceEventClient, SdrSourceState, SdrClientClass from owrx.source import SdrSourceEventClient, SdrSourceState, SdrClientClass
from owrx.property import PropertyStack from owrx.property import PropertyStack
from pycsdr.modules import Buffer
import threading
import logging import logging
@ -27,6 +27,7 @@ class SpectrumThread(SdrSourceEventClient):
) )
self.dsp = None self.dsp = None
self.reader = None
self.subscriptions = [] self.subscriptions = []
@ -53,7 +54,10 @@ class SpectrumThread(SdrSourceEventClient):
self.props.wireProperty("fft_voverlap_factor", self.dsp.setVOverlapFactor), self.props.wireProperty("fft_voverlap_factor", self.dsp.setVOverlapFactor),
] ]
threading.Thread(target=self.dsp.pump(self.sdrSource.writeSpectrumData)).start() buffer = Buffer(self.dsp.getOutputFormat())
self.dsp.setWriter(buffer)
self.reader = buffer.getReader()
threading.Thread(target=self.dsp.pump(self.reader.read, self.sdrSource.writeSpectrumData)).start()
if self.sdrSource.isAvailable(): if self.sdrSource.isAvailable():
self.dsp.setReader(self.sdrSource.getBuffer().getReader()) self.dsp.setReader(self.sdrSource.getBuffer().getReader())
@ -63,6 +67,8 @@ class SpectrumThread(SdrSourceEventClient):
return return
self.dsp.stop() self.dsp.stop()
self.dsp = None self.dsp = None
self.reader.stop()
self.reader = None
self.sdrSource.removeClient(self) self.sdrSource.removeClient(self)
while self.subscriptions: while self.subscriptions:
self.subscriptions.pop().cancel() self.subscriptions.pop().cancel()