From bd7cd013596eb3e770da16b9dbd302bce9729a64 Mon Sep 17 00:00:00 2001 From: Jakob Ketterl Date: Wed, 8 May 2019 16:31:52 +0200 Subject: [PATCH] stabilize dsp operation with a lock --- csdr.py | 108 +++++++++++++++++++++---------------------------- owrx/source.py | 16 ++------ 2 files changed, 49 insertions(+), 75 deletions(-) diff --git a/csdr.py b/csdr.py index 2a27228..d4d52d7 100755 --- a/csdr.py +++ b/csdr.py @@ -23,9 +23,7 @@ OpenWebRX csdr plugin: do the signal processing with csdr import subprocess import time import os -import code import signal -import fcntl import threading class dsp: @@ -62,6 +60,7 @@ class dsp: self.pipe_names=["bpf_pipe", "shift_pipe", "squelch_pipe", "smeter_pipe", "iqtee_pipe", "iqtee2_pipe"] self.secondary_pipe_names=["secondary_shift_pipe"] self.secondary_offset_freq = 1000 + self.modification_lock = threading.Lock() def chain(self,which): any_chain_base="nc -v 127.0.0.1 {nc_port} | " @@ -127,7 +126,7 @@ class dsp: return 31.25 def start_secondary_demodulator(self): - if(not self.secondary_demodulator): return + if not self.secondary_demodulator: return print("[openwebrx] starting secondary demodulator from IF input sampled at %d"%self.if_samp_rate()) secondary_command_fft=self.secondary_chain("fft") secondary_command_demod=self.secondary_chain(self.secondary_demodulator) @@ -151,7 +150,6 @@ class dsp: print("[openwebrx-dsp-plugin:csdr] secondary command (fft) =", secondary_command_fft) print("[openwebrx-dsp-plugin:csdr] secondary command (demod) =", secondary_command_demod) - #code.interact(local=locals()) my_env=os.environ.copy() #if self.csdr_dynamic_bufsize: my_env["CSDR_DYNAMIC_BUFSIZE_ON"]="1"; if self.csdr_print_bufsizes: my_env["CSDR_PRINT_BUFSIZES"]="1"; @@ -179,8 +177,18 @@ class dsp: def stop_secondary_demodulator(self): if self.secondary_processes_running == False: return self.try_delete_pipes(self.secondary_pipe_names) - if self.secondary_process_fft: os.killpg(os.getpgid(self.secondary_process_fft.pid), signal.SIGTERM) - if self.secondary_process_demod: os.killpg(os.getpgid(self.secondary_process_demod.pid), signal.SIGTERM) + if self.secondary_process_fft: + try: + os.killpg(os.getpgid(self.secondary_process_fft.pid), signal.SIGTERM) + except ProcessLookupError: + # been killed by something else, ignore + pass + if self.secondary_process_demod: + try: + os.killpg(os.getpgid(self.secondary_process_demod.pid), signal.SIGTERM) + except ProcessLookupError: + # been killed by something else, ignore + pass self.secondary_processes_running = False def read_secondary_demod(self, size): @@ -235,8 +243,9 @@ class dsp: self.calculate_decimation() def set_demodulator(self,demodulator): - #to change this, restart is required + if (self.demodulator == demodulator): return self.demodulator=demodulator + self.restart() def get_demodulator(self): return self.demodulator @@ -260,15 +269,19 @@ class dsp: def set_offset_freq(self,offset_freq): self.offset_freq=offset_freq if self.running: + self.modification_lock.acquire() self.shift_pipe_file.write("%g\n"%(-float(self.offset_freq)/self.samp_rate)) self.shift_pipe_file.flush() + self.modification_lock.release() def set_bpf(self,low_cut,high_cut): self.low_cut=low_cut self.high_cut=high_cut if self.running: + self.modification_lock.acquire() self.bpf_pipe_file.write( "%g %g\n"%(float(self.low_cut)/self.if_samp_rate(), float(self.high_cut)/self.if_samp_rate()) ) self.bpf_pipe_file.flush() + self.modification_lock.release() def get_bpf(self): return [self.low_cut, self.high_cut] @@ -276,8 +289,10 @@ class dsp: def set_squelch_level(self, squelch_level): self.squelch_level=squelch_level if self.running: + self.modification_lock.acquire() self.squelch_pipe_file.write( "%g\n"%(float(self.squelch_level)) ) self.squelch_pipe_file.flush() + self.modification_lock.release() def get_smeter_level(self): if self.running: @@ -317,36 +332,19 @@ class dsp: except Exception as e: print("[openwebrx-dsp-plugin:csdr] try_delete_pipes() ::", e) def start(self): - if (self.running): return + self.modification_lock.acquire() + if (self.running): + self.modification_lock.release() + return self.running = True command_base=self.chain(self.demodulator) #create control pipes for csdr self.pipe_base_path="/tmp/openwebrx_pipe_{myid}_".format(myid=id(self)) - # self.bpf_pipe = self.shift_pipe = self.squelch_pipe = self.smeter_pipe = None self.try_create_pipes(self.pipe_names, command_base) - # if "{bpf_pipe}" in command_base: - # self.bpf_pipe=pipe_base_path+"bpf" - # self.mkfifo(self.bpf_pipe) - # if "{shift_pipe}" in command_base: - # self.shift_pipe=pipe_base_path+"shift" - # self.mkfifo(self.shift_pipe) - # if "{squelch_pipe}" in command_base: - # self.squelch_pipe=pipe_base_path+"squelch" - # self.mkfifo(self.squelch_pipe) - # if "{smeter_pipe}" in command_base: - # self.smeter_pipe=pipe_base_path+"smeter" - # self.mkfifo(self.smeter_pipe) - # if "{iqtee_pipe}" in command_base: - # self.iqtee_pipe=pipe_base_path+"iqtee" - # self.mkfifo(self.iqtee_pipe) - # if "{iqtee2_pipe}" in command_base: - # self.iqtee2_pipe=pipe_base_path+"iqtee2" - # self.mkfifo(self.iqtee2_pipe) - #run the command command=command_base.format( bpf_pipe=self.bpf_pipe, shift_pipe=self.shift_pipe, decimation=self.decimation, \ last_decimation=self.last_decimation, fft_size=self.fft_size, fft_block_size=self.fft_block_size(), fft_averages=self.fft_averages, \ @@ -355,7 +353,6 @@ class dsp: squelch_pipe=self.squelch_pipe, smeter_pipe=self.smeter_pipe, iqtee_pipe=self.iqtee_pipe, iqtee2_pipe=self.iqtee2_pipe ) print("[openwebrx-dsp-plugin:csdr] Command =",command) - #code.interact(local=locals()) my_env=os.environ.copy() if self.csdr_dynamic_bufsize: my_env["CSDR_DYNAMIC_BUFSIZE_ON"]="1"; if self.csdr_print_bufsizes: my_env["CSDR_PRINT_BUFSIZES"]="1"; @@ -364,30 +361,39 @@ class dsp: def watch_thread(): rc = self.process.wait() print("dsp thread ended with rc={0}".format(rc)) - if (self.running): + if (rc == 0 and self.running and not self.modification_lock.locked()): + print("restarting since rc = 0, self.running = true, and no modification") self.restart() threading.Thread(target = watch_thread).start() - #open control pipes for csdr and send initialization data + # open control pipes for csdr if self.bpf_pipe != None: self.bpf_pipe_file=open(self.bpf_pipe,"w") - self.set_bpf(self.low_cut,self.high_cut) - if self.shift_pipe != None: + if self.shift_pipe: self.shift_pipe_file=open(self.shift_pipe,"w") - self.set_offset_freq(self.offset_freq) - if self.squelch_pipe != None: + if self.squelch_pipe: self.squelch_pipe_file=open(self.squelch_pipe,"w") - self.set_squelch_level(self.squelch_level) - if self.smeter_pipe != None: - self.smeter_pipe_file=open(self.smeter_pipe,"r") self.start_secondary_demodulator() + self.modification_lock.release() + + # send initial config through the pipes + if self.squelch_pipe: + self.set_squelch_level(self.squelch_level) + if self.shift_pipe: + self.set_offset_freq(self.offset_freq) + if self.bpf_pipe: + self.set_bpf(self.low_cut, self.high_cut) + if self.smeter_pipe: + self.smeter_pipe_file=open(self.smeter_pipe,"r") + def read(self,size): return self.process.stdout.read(size) def stop(self): + self.modification_lock.acquire() self.running = False if hasattr(self, "process"): try: @@ -396,34 +402,10 @@ class dsp: # been killed by something else, ignore pass self.stop_secondary_demodulator() - #if(self.process.poll()!=None):return # returns None while subprocess is running - #while(self.process.poll()==None): - # #self.process.kill() - # print "killproc",os.getpgid(self.process.pid),self.process.pid - # os.killpg(self.process.pid, signal.SIGTERM) - # - # time.sleep(0.1) self.try_delete_pipes(self.pipe_names) - # if self.bpf_pipe: - # try: os.unlink(self.bpf_pipe) - # except: print "[openwebrx-dsp-plugin:csdr] stop() :: unlink failed: " + self.bpf_pipe - # if self.shift_pipe: - # try: os.unlink(self.shift_pipe) - # except: print "[openwebrx-dsp-plugin:csdr] stop() :: unlink failed: " + self.shift_pipe - # if self.squelch_pipe: - # try: os.unlink(self.squelch_pipe) - # except: print "[openwebrx-dsp-plugin:csdr] stop() :: unlink failed: " + self.squelch_pipe - # if self.smeter_pipe: - # try: os.unlink(self.smeter_pipe) - # except: print "[openwebrx-dsp-plugin:csdr] stop() :: unlink failed: " + self.smeter_pipe - # if self.iqtee_pipe: - # try: os.unlink(self.iqtee_pipe) - # except: print "[openwebrx-dsp-plugin:csdr] stop() :: unlink failed: " + self.iqtee_pipe - # if self.iqtee2_pipe: - # try: os.unlink(self.iqtee2_pipe) - # except: print "[openwebrx-dsp-plugin:csdr] stop() :: unlink failed: " + self.iqtee2_pipe + self.modification_lock.release() def restart(self): if not self.running: return diff --git a/owrx/source.py b/owrx/source.py index d889700..b05387f 100644 --- a/owrx/source.py +++ b/owrx/source.py @@ -29,7 +29,7 @@ class RtlNmuxSource(object): ) def restart(name, value): - print("would now restart rtl source due to property change: {0} changed to {1}".format(name, value)) + print("restarting rtl source due to property change: {0} changed to {1}".format(name, value)) self.stop() self.start() props.wire(restart) @@ -171,7 +171,6 @@ class DspManager(object): self.dsp.csdr_through = self.localProps["csdr_through"] self.localProps.getProperty("samp_rate").wire(self.dsp.set_samp_rate) - #do_secondary_demod=False self.localProps.getProperty("output_rate").wire(self.dsp.set_output_rate) self.localProps.getProperty("offset_freq").wire(self.dsp.set_offset_freq) @@ -189,12 +188,7 @@ class DspManager(object): self.dsp.set_bpf(*bpf) self.localProps.getProperty("high_cut").wire(set_high_cut) - def set_mod(mod): - if (self.dsp.get_demodulator() == mod): return - self.dsp.stop() - self.dsp.set_demodulator(mod) - self.dsp.start() - self.localProps.getProperty("mod").wire(set_mod) + self.localProps.getProperty("mod").wire(self.dsp.set_demodulator) if (self.localProps["digimodes_enable"]): def set_secondary_mod(mod): @@ -202,10 +196,8 @@ class DspManager(object): if self.dsp.get_secondary_demodulator() == mod: return self.stopSecondaryThreads() self.dsp.stop() - if mod is None: - self.dsp.set_secondary_demodulator(None) - else: - self.dsp.set_secondary_demodulator(mod) + self.dsp.set_secondary_demodulator(mod) + if mod is not None: self.handler.write_secondary_dsp_config({ "secondary_fft_size":self.localProps["digimodes_fft_size"], "if_samp_rate":self.dsp.if_samp_rate(),