save some cpu cycles by only running necessary stuff for services

This commit is contained in:
Jakob Ketterl 2019-08-04 14:55:56 +02:00
parent 441738e569
commit 42aae4c03a
3 changed files with 62 additions and 41 deletions

83
csdr.py
View File

@ -33,10 +33,15 @@ logger = logging.getLogger(__name__)
class output(object):
def add_output(self, type, read_fn):
pass
def send_output(self, t, read_fn):
if not self.supports_type(t):
# TODO rewrite the output mechanism in a way that avoids producing unnecessary data
logger.warning('dumping output of type %s since it is not supported.', t)
threading.Thread(target=self.pump(read_fn, lambda x: None)).start()
return
self.receive_output(t, read_fn)
def reset(self):
def receive_output(self, t, read_fn):
pass
def pump(self, read, write):
@ -51,6 +56,9 @@ class output(object):
return copy
def supports_type(self, t):
return True
class dsp(object):
def __init__(self, output):
@ -123,10 +131,19 @@ class dsp(object):
"csdr shift_addition_cc --fifo {shift_pipe}",
"csdr fir_decimate_cc {decimation} {ddc_transition_bw} HAMMING",
"csdr bandpass_fir_fft_cc --fifo {bpf_pipe} {bpf_transition_bw} HAMMING",
"csdr squelch_and_smeter_cc --fifo {squelch_pipe} --outfifo {smeter_pipe} 5 {smeter_report_every}",
]
if self.output.supports_type('smeter'):
chain += [
"csdr squelch_and_smeter_cc --fifo {squelch_pipe} --outfifo {smeter_pipe} 5 {smeter_report_every}",
]
if self.secondary_demodulator:
chain += ["csdr tee {iqtee_pipe}", "csdr tee {iqtee2_pipe}"]
if self.output.supports_type('secondary_fft'):
chain += ["csdr tee {iqtee_pipe}"]
chain += ["csdr tee {iqtee2_pipe}"]
# early exit if we don't want audio
if not self.output.supports_type('audio'):
return chain
# safe some cpu cycles... no need to decimate if decimation factor is 1
last_decimation_block = (
["csdr fractional_decimator_ff {last_decimation}"] if self.last_decimation != 1.0 else []
@ -246,16 +263,9 @@ class dsp(object):
if not self.secondary_demodulator:
return
logger.debug("starting secondary demodulator from IF input sampled at %d" % self.if_samp_rate())
secondary_command_fft = self.secondary_chain("fft")
secondary_command_demod = self.secondary_chain(self.secondary_demodulator)
self.try_create_pipes(self.secondary_pipe_names, secondary_command_demod + secondary_command_fft)
self.try_create_pipes(self.secondary_pipe_names, secondary_command_demod)
secondary_command_fft = secondary_command_fft.format(
input_pipe=self.iqtee_pipe,
secondary_fft_input_size=self.secondary_fft_size,
secondary_fft_size=self.secondary_fft_size,
secondary_fft_block_size=self.secondary_fft_block_size(),
)
secondary_command_demod = secondary_command_demod.format(
input_pipe=self.iqtee2_pipe,
secondary_shift_pipe=self.secondary_shift_pipe,
@ -267,24 +277,34 @@ class dsp(object):
last_decimation=self.last_decimation,
)
logger.debug("secondary command (fft) = %s", secondary_command_fft)
logger.debug("secondary command (demod) = %s", secondary_command_demod)
my_env = os.environ.copy()
# if self.csdr_dynamic_bufsize: my_env["CSDR_DYNAMIC_BUFSIZE_ON"]="1";
if self.csdr_print_bufsizes:
my_env["CSDR_PRINT_BUFSIZES"] = "1"
self.secondary_process_fft = subprocess.Popen(
secondary_command_fft, stdout=subprocess.PIPE, shell=True, preexec_fn=os.setpgrp, env=my_env
)
if self.output.supports_type('secondary_fft'):
secondary_command_fft = self.secondary_chain("fft")
secondary_command_fft = secondary_command_fft.format(
input_pipe=self.iqtee_pipe,
secondary_fft_input_size=self.secondary_fft_size,
secondary_fft_size=self.secondary_fft_size,
secondary_fft_block_size=self.secondary_fft_block_size(),
)
logger.debug("secondary command (fft) = %s", secondary_command_fft)
self.secondary_process_fft = subprocess.Popen(
secondary_command_fft, stdout=subprocess.PIPE, shell=True, preexec_fn=os.setpgrp, env=my_env
)
self.output.send_output(
"secondary_fft",
partial(self.secondary_process_fft.stdout.read, int(self.get_secondary_fft_bytes_to_read())),
)
self.secondary_process_demod = subprocess.Popen(
secondary_command_demod, stdout=subprocess.PIPE, shell=True, preexec_fn=os.setpgrp, env=my_env
)
self.secondary_processes_running = True
self.output.add_output(
"secondary_fft",
partial(self.secondary_process_fft.stdout.read, int(self.get_secondary_fft_bytes_to_read())),
)
if self.isWsjtMode():
smd = self.get_secondary_demodulator()
if smd == "ft8":
@ -298,9 +318,9 @@ class dsp(object):
elif smd == "ft4":
chopper = Ft4Chopper(self.secondary_process_demod.stdout)
chopper.start()
self.output.add_output("wsjt_demod", chopper.read)
self.output.send_output("wsjt_demod", chopper.read)
else:
self.output.add_output("secondary_demod", partial(self.secondary_process_demod.stdout.read, 1))
self.output.send_output("secondary_demod", partial(self.secondary_process_demod.stdout.read, 1))
# open control pipes for csdr and send initialization data
if self.secondary_shift_pipe != None: # TODO digimodes
@ -551,7 +571,9 @@ class dsp(object):
my_env["CSDR_DYNAMIC_BUFSIZE_ON"] = "1"
if self.csdr_print_bufsizes:
my_env["CSDR_PRINT_BUFSIZES"] = "1"
self.process = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True, preexec_fn=os.setpgrp, env=my_env)
out = subprocess.PIPE if self.output.supports_type('audio') else subprocess.DEVNULL
self.process = subprocess.Popen(command, stdout=out, shell=True, preexec_fn=os.setpgrp, env=my_env)
def watch_thread():
rc = self.process.wait()
@ -562,10 +584,11 @@ class dsp(object):
threading.Thread(target=watch_thread).start()
self.output.add_output(
"audio",
partial(self.process.stdout.read, int(self.get_fft_bytes_to_read()) if self.demodulator == "fft" else 256),
)
if self.output.supports_type('audio'):
self.output.send_output(
"audio",
partial(self.process.stdout.read, int(self.get_fft_bytes_to_read()) if self.demodulator == "fft" else 256),
)
# open control pipes for csdr
if self.bpf_pipe:
@ -595,7 +618,7 @@ class dsp(object):
else:
return float(raw.rstrip("\n"))
self.output.add_output("smeter", read_smeter)
self.output.send_output("smeter", read_smeter)
if self.meta_pipe != None:
# TODO make digiham output unicode and then change this here
self.meta_pipe_file = open(self.meta_pipe, "r", encoding="cp437")
@ -607,7 +630,7 @@ class dsp(object):
else:
return raw.rstrip("\n")
self.output.add_output("meta", read_meta)
self.output.send_output("meta", read_meta)
if self.dmr_control_pipe:
self.dmr_control_pipe_file = open(self.dmr_control_pipe, "w")

View File

@ -14,17 +14,15 @@ class ServiceOutput(output):
def __init__(self, frequency):
self.frequency = frequency
def add_output(self, t, read_fn):
if t == "wsjt_demod":
parser = WsjtParser(WsjtHandler())
parser.setDialFrequency(self.frequency)
target = self.pump(read_fn, parser.parse)
else:
# dump everything else
# TODO rewrite the output mechanism in a way that avoids producing unnecessary data
target = self.pump(read_fn, lambda x: None)
def receive_output(self, t, read_fn):
parser = WsjtParser(WsjtHandler())
parser.setDialFrequency(self.frequency)
target = self.pump(read_fn, parser.parse)
threading.Thread(target=target).start()
def supports_type(self, t):
return t == 'wsjt_demod'
class ServiceHandler(object):
def __init__(self, source):

View File

@ -376,7 +376,7 @@ class SpectrumThread(csdr.output):
if self.sdrSource.isAvailable():
self.dsp.start()
def add_output(self, type, read_fn):
def receive_output(self, type, read_fn):
if type != "audio":
logger.error("unsupported output type received by FFT: %s", type)
return
@ -503,7 +503,7 @@ class DspManager(csdr.output):
if self.sdrSource.isAvailable():
self.dsp.start()
def add_output(self, t, read_fn):
def receive_output(self, t, read_fn):
logger.debug("adding new output of type %s", t)
writers = {
"audio": self.handler.write_dsp_data,