From 42aae4c03a042b37d6a61d172ac03fac32c004ee Mon Sep 17 00:00:00 2001 From: Jakob Ketterl Date: Sun, 4 Aug 2019 14:55:56 +0200 Subject: [PATCH] save some cpu cycles by only running necessary stuff for services --- csdr.py | 83 +++++++++++++++++++++++++++++++------------------ owrx/service.py | 16 +++++----- owrx/source.py | 4 +-- 3 files changed, 62 insertions(+), 41 deletions(-) diff --git a/csdr.py b/csdr.py index a67631d..08a0c45 100644 --- a/csdr.py +++ b/csdr.py @@ -33,10 +33,15 @@ logger = logging.getLogger(__name__) class output(object): - def add_output(self, type, read_fn): - pass + def send_output(self, t, read_fn): + if not self.supports_type(t): + # TODO rewrite the output mechanism in a way that avoids producing unnecessary data + logger.warning('dumping output of type %s since it is not supported.', t) + threading.Thread(target=self.pump(read_fn, lambda x: None)).start() + return + self.receive_output(t, read_fn) - def reset(self): + def receive_output(self, t, read_fn): pass def pump(self, read, write): @@ -51,6 +56,9 @@ class output(object): return copy + def supports_type(self, t): + return True + class dsp(object): def __init__(self, output): @@ -123,10 +131,19 @@ class dsp(object): "csdr shift_addition_cc --fifo {shift_pipe}", "csdr fir_decimate_cc {decimation} {ddc_transition_bw} HAMMING", "csdr bandpass_fir_fft_cc --fifo {bpf_pipe} {bpf_transition_bw} HAMMING", - "csdr squelch_and_smeter_cc --fifo {squelch_pipe} --outfifo {smeter_pipe} 5 {smeter_report_every}", + ] + if self.output.supports_type('smeter'): + chain += [ + "csdr squelch_and_smeter_cc --fifo {squelch_pipe} --outfifo {smeter_pipe} 5 {smeter_report_every}", + ] if self.secondary_demodulator: - chain += ["csdr tee {iqtee_pipe}", "csdr tee {iqtee2_pipe}"] + if self.output.supports_type('secondary_fft'): + chain += ["csdr tee {iqtee_pipe}"] + chain += ["csdr tee {iqtee2_pipe}"] + # early exit if we don't want audio + if not self.output.supports_type('audio'): + return chain # safe some cpu cycles... no need to decimate if decimation factor is 1 last_decimation_block = ( ["csdr fractional_decimator_ff {last_decimation}"] if self.last_decimation != 1.0 else [] @@ -246,16 +263,9 @@ class dsp(object): if not self.secondary_demodulator: return logger.debug("starting secondary demodulator from IF input sampled at %d" % self.if_samp_rate()) - secondary_command_fft = self.secondary_chain("fft") secondary_command_demod = self.secondary_chain(self.secondary_demodulator) - self.try_create_pipes(self.secondary_pipe_names, secondary_command_demod + secondary_command_fft) + self.try_create_pipes(self.secondary_pipe_names, secondary_command_demod) - secondary_command_fft = secondary_command_fft.format( - input_pipe=self.iqtee_pipe, - secondary_fft_input_size=self.secondary_fft_size, - secondary_fft_size=self.secondary_fft_size, - secondary_fft_block_size=self.secondary_fft_block_size(), - ) secondary_command_demod = secondary_command_demod.format( input_pipe=self.iqtee2_pipe, secondary_shift_pipe=self.secondary_shift_pipe, @@ -267,24 +277,34 @@ class dsp(object): last_decimation=self.last_decimation, ) - logger.debug("secondary command (fft) = %s", secondary_command_fft) logger.debug("secondary command (demod) = %s", secondary_command_demod) my_env = os.environ.copy() # if self.csdr_dynamic_bufsize: my_env["CSDR_DYNAMIC_BUFSIZE_ON"]="1"; if self.csdr_print_bufsizes: my_env["CSDR_PRINT_BUFSIZES"] = "1" - self.secondary_process_fft = subprocess.Popen( - secondary_command_fft, stdout=subprocess.PIPE, shell=True, preexec_fn=os.setpgrp, env=my_env - ) + if self.output.supports_type('secondary_fft'): + secondary_command_fft = self.secondary_chain("fft") + secondary_command_fft = secondary_command_fft.format( + input_pipe=self.iqtee_pipe, + secondary_fft_input_size=self.secondary_fft_size, + secondary_fft_size=self.secondary_fft_size, + secondary_fft_block_size=self.secondary_fft_block_size(), + ) + logger.debug("secondary command (fft) = %s", secondary_command_fft) + + self.secondary_process_fft = subprocess.Popen( + secondary_command_fft, stdout=subprocess.PIPE, shell=True, preexec_fn=os.setpgrp, env=my_env + ) + self.output.send_output( + "secondary_fft", + partial(self.secondary_process_fft.stdout.read, int(self.get_secondary_fft_bytes_to_read())), + ) + self.secondary_process_demod = subprocess.Popen( secondary_command_demod, stdout=subprocess.PIPE, shell=True, preexec_fn=os.setpgrp, env=my_env ) self.secondary_processes_running = True - self.output.add_output( - "secondary_fft", - partial(self.secondary_process_fft.stdout.read, int(self.get_secondary_fft_bytes_to_read())), - ) if self.isWsjtMode(): smd = self.get_secondary_demodulator() if smd == "ft8": @@ -298,9 +318,9 @@ class dsp(object): elif smd == "ft4": chopper = Ft4Chopper(self.secondary_process_demod.stdout) chopper.start() - self.output.add_output("wsjt_demod", chopper.read) + self.output.send_output("wsjt_demod", chopper.read) else: - self.output.add_output("secondary_demod", partial(self.secondary_process_demod.stdout.read, 1)) + self.output.send_output("secondary_demod", partial(self.secondary_process_demod.stdout.read, 1)) # open control pipes for csdr and send initialization data if self.secondary_shift_pipe != None: # TODO digimodes @@ -551,7 +571,9 @@ class dsp(object): my_env["CSDR_DYNAMIC_BUFSIZE_ON"] = "1" if self.csdr_print_bufsizes: my_env["CSDR_PRINT_BUFSIZES"] = "1" - self.process = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True, preexec_fn=os.setpgrp, env=my_env) + + out = subprocess.PIPE if self.output.supports_type('audio') else subprocess.DEVNULL + self.process = subprocess.Popen(command, stdout=out, shell=True, preexec_fn=os.setpgrp, env=my_env) def watch_thread(): rc = self.process.wait() @@ -562,10 +584,11 @@ class dsp(object): threading.Thread(target=watch_thread).start() - self.output.add_output( - "audio", - partial(self.process.stdout.read, int(self.get_fft_bytes_to_read()) if self.demodulator == "fft" else 256), - ) + if self.output.supports_type('audio'): + self.output.send_output( + "audio", + partial(self.process.stdout.read, int(self.get_fft_bytes_to_read()) if self.demodulator == "fft" else 256), + ) # open control pipes for csdr if self.bpf_pipe: @@ -595,7 +618,7 @@ class dsp(object): else: return float(raw.rstrip("\n")) - self.output.add_output("smeter", read_smeter) + self.output.send_output("smeter", read_smeter) if self.meta_pipe != None: # TODO make digiham output unicode and then change this here self.meta_pipe_file = open(self.meta_pipe, "r", encoding="cp437") @@ -607,7 +630,7 @@ class dsp(object): else: return raw.rstrip("\n") - self.output.add_output("meta", read_meta) + self.output.send_output("meta", read_meta) if self.dmr_control_pipe: self.dmr_control_pipe_file = open(self.dmr_control_pipe, "w") diff --git a/owrx/service.py b/owrx/service.py index d04ee0b..f90b2bb 100644 --- a/owrx/service.py +++ b/owrx/service.py @@ -14,17 +14,15 @@ class ServiceOutput(output): def __init__(self, frequency): self.frequency = frequency - def add_output(self, t, read_fn): - if t == "wsjt_demod": - parser = WsjtParser(WsjtHandler()) - parser.setDialFrequency(self.frequency) - target = self.pump(read_fn, parser.parse) - else: - # dump everything else - # TODO rewrite the output mechanism in a way that avoids producing unnecessary data - target = self.pump(read_fn, lambda x: None) + def receive_output(self, t, read_fn): + parser = WsjtParser(WsjtHandler()) + parser.setDialFrequency(self.frequency) + target = self.pump(read_fn, parser.parse) threading.Thread(target=target).start() + def supports_type(self, t): + return t == 'wsjt_demod' + class ServiceHandler(object): def __init__(self, source): diff --git a/owrx/source.py b/owrx/source.py index 3191cc2..ca82270 100644 --- a/owrx/source.py +++ b/owrx/source.py @@ -376,7 +376,7 @@ class SpectrumThread(csdr.output): if self.sdrSource.isAvailable(): self.dsp.start() - def add_output(self, type, read_fn): + def receive_output(self, type, read_fn): if type != "audio": logger.error("unsupported output type received by FFT: %s", type) return @@ -503,7 +503,7 @@ class DspManager(csdr.output): if self.sdrSource.isAvailable(): self.dsp.start() - def add_output(self, t, read_fn): + def receive_output(self, t, read_fn): logger.debug("adding new output of type %s", t) writers = { "audio": self.handler.write_dsp_data,