From d36be799d0b5e57394d8df8cd20b62c833785ba9 Mon Sep 17 00:00:00 2001 From: Jakob Ketterl Date: Thu, 27 Feb 2020 19:48:22 +0100 Subject: [PATCH] improve lock handling --- csdr/csdr.py | 191 +++++++++++++++++++++++++-------------------------- 1 file changed, 92 insertions(+), 99 deletions(-) diff --git a/csdr/csdr.py b/csdr/csdr.py index eb948af..441a1fc 100644 --- a/csdr/csdr.py +++ b/csdr/csdr.py @@ -111,7 +111,11 @@ class dsp(object): self.unvoiced_quality = 1 self.modification_lock = threading.Lock() self.output = output - self.temporary_directory = "/tmp" + + self.temporary_directory = None + self.pipe_base_path = None + self.set_temporary_directory("/tmp") + self.is_service = False self.direwolf_config = None self.direwolf_port = None @@ -122,6 +126,7 @@ class dsp(object): def set_temporary_directory(self, what): self.temporary_directory = what + self.pipe_base_path = "{tmp_dir}/openwebrx_pipe_{myid}_".format(tmp_dir=self.temporary_directory, myid=id(self)) def chain(self, which): chain = ["nc -v 127.0.0.1 {nc_port}"] @@ -528,21 +533,19 @@ class dsp(object): def set_offset_freq(self, offset_freq): self.offset_freq = offset_freq if self.running: - self.modification_lock.acquire() - self.pipe_files["shift_pipe"].write("%g\n" % (-float(self.offset_freq) / self.samp_rate)) - self.pipe_files["shift_pipe"].flush() - self.modification_lock.release() + with self.modification_lock: + self.pipe_files["shift_pipe"].write("%g\n" % (-float(self.offset_freq) / self.samp_rate)) + self.pipe_files["shift_pipe"].flush() def set_bpf(self, low_cut, high_cut): self.low_cut = low_cut self.high_cut = high_cut if self.running: - self.modification_lock.acquire() - self.pipe_files["bpf_pipe"].write( - "%g %g\n" % (float(self.low_cut) / self.if_samp_rate(), float(self.high_cut) / self.if_samp_rate()) - ) - self.pipe_files["bpf_pipe"].flush() - self.modification_lock.release() + with self.modification_lock: + self.pipe_files["bpf_pipe"].write( + "%g %g\n" % (float(self.low_cut) / self.if_samp_rate(), float(self.high_cut) / self.if_samp_rate()) + ) + self.pipe_files["bpf_pipe"].flush() def get_bpf(self): return [self.low_cut, self.high_cut] @@ -555,10 +558,9 @@ class dsp(object): # no squelch required on digital voice modes actual_squelch = -150 if self.isDigitalVoice() or self.isPacket() or self.isPocsag() else self.squelch_level if self.running: - self.modification_lock.acquire() - self.pipe_files["squelch_pipe"].write("%g\n" % (self.convertToLinear(actual_squelch))) - self.pipe_files["squelch_pipe"].flush() - self.modification_lock.release() + with self.modification_lock: + self.pipe_files["squelch_pipe"].write("%g\n" % (self.convertToLinear(actual_squelch))) + self.pipe_files["squelch_pipe"].flush() def set_unvoiced_quality(self, q): self.unvoiced_quality = q @@ -633,80 +635,76 @@ class dsp(object): self.direwolf_config = None def start(self): - self.modification_lock.acquire() - if self.running: - self.modification_lock.release() - return - self.running = True + with self.modification_lock: + if self.running: + return + self.running = True - command_base = " | ".join(self.chain(self.demodulator)) + command_base = " | ".join(self.chain(self.demodulator)) - # create control pipes for csdr - self.pipe_base_path = "{tmp_dir}/openwebrx_pipe_{myid}_".format(tmp_dir=self.temporary_directory, myid=id(self)) + # create control pipes for csdr + self.try_create_pipes(self.pipe_names, command_base) - self.try_create_pipes(self.pipe_names, command_base) - - # run the command - command = command_base.format( - bpf_pipe=self.pipes["bpf_pipe"], - shift_pipe=self.pipes["shift_pipe"], - squelch_pipe=self.pipes["squelch_pipe"], - smeter_pipe=self.pipes["smeter_pipe"], - meta_pipe=self.pipes["meta_pipe"], - iqtee_pipe=self.pipes["iqtee_pipe"], - iqtee2_pipe=self.pipes["iqtee2_pipe"], - dmr_control_pipe=self.pipes["dmr_control_pipe"], - decimation=self.decimation, - last_decimation=self.last_decimation, - fft_size=self.fft_size, - fft_block_size=self.fft_block_size(), - fft_averages=self.fft_averages, - bpf_transition_bw=float(self.bpf_transition_bw) / self.if_samp_rate(), - ddc_transition_bw=self.ddc_transition_bw(), - flowcontrol=int(self.samp_rate * 2), - start_bufsize=self.base_bufsize * self.decimation, - nc_port=self.nc_port, - output_rate=self.get_output_rate(), - smeter_report_every=int(self.if_samp_rate() / 6000), - unvoiced_quality=self.get_unvoiced_quality(), - audio_rate=self.get_audio_rate(), - ) - - logger.debug("Command = %s", command) - my_env = os.environ.copy() - if self.csdr_dynamic_bufsize: - my_env["CSDR_DYNAMIC_BUFSIZE_ON"] = "1" - if self.csdr_print_bufsizes: - my_env["CSDR_PRINT_BUFSIZES"] = "1" - - out = subprocess.PIPE if self.output.supports_type("audio") else subprocess.DEVNULL - self.process = subprocess.Popen(command, stdout=out, shell=True, start_new_session=True, env=my_env) - - def watch_thread(): - rc = self.process.wait() - logger.debug("dsp thread ended with rc=%d", rc) - if rc == 0 and self.running and not self.modification_lock.locked(): - logger.debug("restarting since rc = 0, self.running = true, and no modification") - self.restart() - - threading.Thread(target=watch_thread).start() - - if self.output.supports_type("audio"): - self.output.send_output( - "audio", - partial( - self.process.stdout.read, - self.get_fft_bytes_to_read() if self.demodulator == "fft" else self.get_audio_bytes_to_read(), - ), + # run the command + command = command_base.format( + bpf_pipe=self.pipes["bpf_pipe"], + shift_pipe=self.pipes["shift_pipe"], + squelch_pipe=self.pipes["squelch_pipe"], + smeter_pipe=self.pipes["smeter_pipe"], + meta_pipe=self.pipes["meta_pipe"], + iqtee_pipe=self.pipes["iqtee_pipe"], + iqtee2_pipe=self.pipes["iqtee2_pipe"], + dmr_control_pipe=self.pipes["dmr_control_pipe"], + decimation=self.decimation, + last_decimation=self.last_decimation, + fft_size=self.fft_size, + fft_block_size=self.fft_block_size(), + fft_averages=self.fft_averages, + bpf_transition_bw=float(self.bpf_transition_bw) / self.if_samp_rate(), + ddc_transition_bw=self.ddc_transition_bw(), + flowcontrol=int(self.samp_rate * 2), + start_bufsize=self.base_bufsize * self.decimation, + nc_port=self.nc_port, + output_rate=self.get_output_rate(), + smeter_report_every=int(self.if_samp_rate() / 6000), + unvoiced_quality=self.get_unvoiced_quality(), + audio_rate=self.get_audio_rate(), ) - # open control pipes for csdr - for p in ["bpf_pipe", "shift_pipe", "squelch_pipe"]: - if self.has_pipe(p): - self.pipe_files[p] = open(self.pipes[p], "w") - self.start_secondary_demodulator() + logger.debug("Command = %s", command) + my_env = os.environ.copy() + if self.csdr_dynamic_bufsize: + my_env["CSDR_DYNAMIC_BUFSIZE_ON"] = "1" + if self.csdr_print_bufsizes: + my_env["CSDR_PRINT_BUFSIZES"] = "1" - self.modification_lock.release() + out = subprocess.PIPE if self.output.supports_type("audio") else subprocess.DEVNULL + self.process = subprocess.Popen(command, stdout=out, shell=True, start_new_session=True, env=my_env) + + def watch_thread(): + rc = self.process.wait() + logger.debug("dsp thread ended with rc=%d", rc) + if rc == 0 and self.running and not self.modification_lock.locked(): + logger.debug("restarting since rc = 0, self.running = true, and no modification") + self.restart() + + threading.Thread(target=watch_thread).start() + + if self.output.supports_type("audio"): + self.output.send_output( + "audio", + partial( + self.process.stdout.read, + self.get_fft_bytes_to_read() if self.demodulator == "fft" else self.get_audio_bytes_to_read(), + ), + ) + + self.start_secondary_demodulator() + + # open control pipes for csdr + for p in ["bpf_pipe", "shift_pipe", "squelch_pipe", "dmr_control_pipe"]: + if self.has_pipe(p): + self.pipe_files[p] = open(self.pipes[p], "w") # send initial config through the pipes if self.has_pipe("squelch_pipe"): @@ -739,24 +737,19 @@ class dsp(object): self.output.send_output("meta", read_meta) - if self.has_pipe("dmr_control_pipe"): - self.pipe_files["dmr_control_pipe"] = open(self.pipes["dmr_control_pipe"], "w") - def stop(self): - self.modification_lock.acquire() - self.running = False - if self.process is not None: - try: - os.killpg(os.getpgid(self.process.pid), signal.SIGTERM) - self.process = None - except ProcessLookupError: - # been killed by something else, ignore - pass - self.stop_secondary_demodulator() + with self.modification_lock: + self.running = False + if self.process is not None: + try: + os.killpg(os.getpgid(self.process.pid), signal.SIGTERM) + self.process = None + except ProcessLookupError: + # been killed by something else, ignore + pass + self.stop_secondary_demodulator() - self.try_delete_pipes(self.pipe_names) - - self.modification_lock.release() + self.try_delete_pipes(self.pipe_names) def restart(self): if not self.running: