improve lock handling

This commit is contained in:
Jakob Ketterl 2020-02-27 19:48:22 +01:00
parent c325368be8
commit d36be799d0

View File

@ -111,7 +111,11 @@ class dsp(object):
self.unvoiced_quality = 1
self.modification_lock = threading.Lock()
self.output = output
self.temporary_directory = "/tmp"
self.temporary_directory = None
self.pipe_base_path = None
self.set_temporary_directory("/tmp")
self.is_service = False
self.direwolf_config = None
self.direwolf_port = None
@ -122,6 +126,7 @@ class dsp(object):
def set_temporary_directory(self, what):
self.temporary_directory = what
self.pipe_base_path = "{tmp_dir}/openwebrx_pipe_{myid}_".format(tmp_dir=self.temporary_directory, myid=id(self))
def chain(self, which):
chain = ["nc -v 127.0.0.1 {nc_port}"]
@ -528,21 +533,19 @@ class dsp(object):
def set_offset_freq(self, offset_freq):
self.offset_freq = offset_freq
if self.running:
self.modification_lock.acquire()
self.pipe_files["shift_pipe"].write("%g\n" % (-float(self.offset_freq) / self.samp_rate))
self.pipe_files["shift_pipe"].flush()
self.modification_lock.release()
with self.modification_lock:
self.pipe_files["shift_pipe"].write("%g\n" % (-float(self.offset_freq) / self.samp_rate))
self.pipe_files["shift_pipe"].flush()
def set_bpf(self, low_cut, high_cut):
self.low_cut = low_cut
self.high_cut = high_cut
if self.running:
self.modification_lock.acquire()
self.pipe_files["bpf_pipe"].write(
"%g %g\n" % (float(self.low_cut) / self.if_samp_rate(), float(self.high_cut) / self.if_samp_rate())
)
self.pipe_files["bpf_pipe"].flush()
self.modification_lock.release()
with self.modification_lock:
self.pipe_files["bpf_pipe"].write(
"%g %g\n" % (float(self.low_cut) / self.if_samp_rate(), float(self.high_cut) / self.if_samp_rate())
)
self.pipe_files["bpf_pipe"].flush()
def get_bpf(self):
return [self.low_cut, self.high_cut]
@ -555,10 +558,9 @@ class dsp(object):
# no squelch required on digital voice modes
actual_squelch = -150 if self.isDigitalVoice() or self.isPacket() or self.isPocsag() else self.squelch_level
if self.running:
self.modification_lock.acquire()
self.pipe_files["squelch_pipe"].write("%g\n" % (self.convertToLinear(actual_squelch)))
self.pipe_files["squelch_pipe"].flush()
self.modification_lock.release()
with self.modification_lock:
self.pipe_files["squelch_pipe"].write("%g\n" % (self.convertToLinear(actual_squelch)))
self.pipe_files["squelch_pipe"].flush()
def set_unvoiced_quality(self, q):
self.unvoiced_quality = q
@ -633,80 +635,76 @@ class dsp(object):
self.direwolf_config = None
def start(self):
self.modification_lock.acquire()
if self.running:
self.modification_lock.release()
return
self.running = True
with self.modification_lock:
if self.running:
return
self.running = True
command_base = " | ".join(self.chain(self.demodulator))
command_base = " | ".join(self.chain(self.demodulator))
# create control pipes for csdr
self.pipe_base_path = "{tmp_dir}/openwebrx_pipe_{myid}_".format(tmp_dir=self.temporary_directory, myid=id(self))
# create control pipes for csdr
self.try_create_pipes(self.pipe_names, command_base)
self.try_create_pipes(self.pipe_names, command_base)
# run the command
command = command_base.format(
bpf_pipe=self.pipes["bpf_pipe"],
shift_pipe=self.pipes["shift_pipe"],
squelch_pipe=self.pipes["squelch_pipe"],
smeter_pipe=self.pipes["smeter_pipe"],
meta_pipe=self.pipes["meta_pipe"],
iqtee_pipe=self.pipes["iqtee_pipe"],
iqtee2_pipe=self.pipes["iqtee2_pipe"],
dmr_control_pipe=self.pipes["dmr_control_pipe"],
decimation=self.decimation,
last_decimation=self.last_decimation,
fft_size=self.fft_size,
fft_block_size=self.fft_block_size(),
fft_averages=self.fft_averages,
bpf_transition_bw=float(self.bpf_transition_bw) / self.if_samp_rate(),
ddc_transition_bw=self.ddc_transition_bw(),
flowcontrol=int(self.samp_rate * 2),
start_bufsize=self.base_bufsize * self.decimation,
nc_port=self.nc_port,
output_rate=self.get_output_rate(),
smeter_report_every=int(self.if_samp_rate() / 6000),
unvoiced_quality=self.get_unvoiced_quality(),
audio_rate=self.get_audio_rate(),
)
logger.debug("Command = %s", command)
my_env = os.environ.copy()
if self.csdr_dynamic_bufsize:
my_env["CSDR_DYNAMIC_BUFSIZE_ON"] = "1"
if self.csdr_print_bufsizes:
my_env["CSDR_PRINT_BUFSIZES"] = "1"
out = subprocess.PIPE if self.output.supports_type("audio") else subprocess.DEVNULL
self.process = subprocess.Popen(command, stdout=out, shell=True, start_new_session=True, env=my_env)
def watch_thread():
rc = self.process.wait()
logger.debug("dsp thread ended with rc=%d", rc)
if rc == 0 and self.running and not self.modification_lock.locked():
logger.debug("restarting since rc = 0, self.running = true, and no modification")
self.restart()
threading.Thread(target=watch_thread).start()
if self.output.supports_type("audio"):
self.output.send_output(
"audio",
partial(
self.process.stdout.read,
self.get_fft_bytes_to_read() if self.demodulator == "fft" else self.get_audio_bytes_to_read(),
),
# run the command
command = command_base.format(
bpf_pipe=self.pipes["bpf_pipe"],
shift_pipe=self.pipes["shift_pipe"],
squelch_pipe=self.pipes["squelch_pipe"],
smeter_pipe=self.pipes["smeter_pipe"],
meta_pipe=self.pipes["meta_pipe"],
iqtee_pipe=self.pipes["iqtee_pipe"],
iqtee2_pipe=self.pipes["iqtee2_pipe"],
dmr_control_pipe=self.pipes["dmr_control_pipe"],
decimation=self.decimation,
last_decimation=self.last_decimation,
fft_size=self.fft_size,
fft_block_size=self.fft_block_size(),
fft_averages=self.fft_averages,
bpf_transition_bw=float(self.bpf_transition_bw) / self.if_samp_rate(),
ddc_transition_bw=self.ddc_transition_bw(),
flowcontrol=int(self.samp_rate * 2),
start_bufsize=self.base_bufsize * self.decimation,
nc_port=self.nc_port,
output_rate=self.get_output_rate(),
smeter_report_every=int(self.if_samp_rate() / 6000),
unvoiced_quality=self.get_unvoiced_quality(),
audio_rate=self.get_audio_rate(),
)
# open control pipes for csdr
for p in ["bpf_pipe", "shift_pipe", "squelch_pipe"]:
if self.has_pipe(p):
self.pipe_files[p] = open(self.pipes[p], "w")
self.start_secondary_demodulator()
logger.debug("Command = %s", command)
my_env = os.environ.copy()
if self.csdr_dynamic_bufsize:
my_env["CSDR_DYNAMIC_BUFSIZE_ON"] = "1"
if self.csdr_print_bufsizes:
my_env["CSDR_PRINT_BUFSIZES"] = "1"
self.modification_lock.release()
out = subprocess.PIPE if self.output.supports_type("audio") else subprocess.DEVNULL
self.process = subprocess.Popen(command, stdout=out, shell=True, start_new_session=True, env=my_env)
def watch_thread():
rc = self.process.wait()
logger.debug("dsp thread ended with rc=%d", rc)
if rc == 0 and self.running and not self.modification_lock.locked():
logger.debug("restarting since rc = 0, self.running = true, and no modification")
self.restart()
threading.Thread(target=watch_thread).start()
if self.output.supports_type("audio"):
self.output.send_output(
"audio",
partial(
self.process.stdout.read,
self.get_fft_bytes_to_read() if self.demodulator == "fft" else self.get_audio_bytes_to_read(),
),
)
self.start_secondary_demodulator()
# open control pipes for csdr
for p in ["bpf_pipe", "shift_pipe", "squelch_pipe", "dmr_control_pipe"]:
if self.has_pipe(p):
self.pipe_files[p] = open(self.pipes[p], "w")
# send initial config through the pipes
if self.has_pipe("squelch_pipe"):
@ -739,24 +737,19 @@ class dsp(object):
self.output.send_output("meta", read_meta)
if self.has_pipe("dmr_control_pipe"):
self.pipe_files["dmr_control_pipe"] = open(self.pipes["dmr_control_pipe"], "w")
def stop(self):
self.modification_lock.acquire()
self.running = False
if self.process is not None:
try:
os.killpg(os.getpgid(self.process.pid), signal.SIGTERM)
self.process = None
except ProcessLookupError:
# been killed by something else, ignore
pass
self.stop_secondary_demodulator()
with self.modification_lock:
self.running = False
if self.process is not None:
try:
os.killpg(os.getpgid(self.process.pid), signal.SIGTERM)
self.process = None
except ProcessLookupError:
# been killed by something else, ignore
pass
self.stop_secondary_demodulator()
self.try_delete_pipes(self.pipe_names)
self.modification_lock.release()
self.try_delete_pipes(self.pipe_names)
def restart(self):
if not self.running: