stabilize dsp operation with a lock

This commit is contained in:
Jakob Ketterl 2019-05-08 16:31:52 +02:00
parent f5d9306c37
commit bd7cd01359
2 changed files with 49 additions and 75 deletions

108
csdr.py
View File

@ -23,9 +23,7 @@ OpenWebRX csdr plugin: do the signal processing with csdr
import subprocess import subprocess
import time import time
import os import os
import code
import signal import signal
import fcntl
import threading import threading
class dsp: class dsp:
@ -62,6 +60,7 @@ class dsp:
self.pipe_names=["bpf_pipe", "shift_pipe", "squelch_pipe", "smeter_pipe", "iqtee_pipe", "iqtee2_pipe"] self.pipe_names=["bpf_pipe", "shift_pipe", "squelch_pipe", "smeter_pipe", "iqtee_pipe", "iqtee2_pipe"]
self.secondary_pipe_names=["secondary_shift_pipe"] self.secondary_pipe_names=["secondary_shift_pipe"]
self.secondary_offset_freq = 1000 self.secondary_offset_freq = 1000
self.modification_lock = threading.Lock()
def chain(self,which): def chain(self,which):
any_chain_base="nc -v 127.0.0.1 {nc_port} | " any_chain_base="nc -v 127.0.0.1 {nc_port} | "
@ -127,7 +126,7 @@ class dsp:
return 31.25 return 31.25
def start_secondary_demodulator(self): def start_secondary_demodulator(self):
if(not self.secondary_demodulator): return if not self.secondary_demodulator: return
print("[openwebrx] starting secondary demodulator from IF input sampled at %d"%self.if_samp_rate()) print("[openwebrx] starting secondary demodulator from IF input sampled at %d"%self.if_samp_rate())
secondary_command_fft=self.secondary_chain("fft") secondary_command_fft=self.secondary_chain("fft")
secondary_command_demod=self.secondary_chain(self.secondary_demodulator) secondary_command_demod=self.secondary_chain(self.secondary_demodulator)
@ -151,7 +150,6 @@ class dsp:
print("[openwebrx-dsp-plugin:csdr] secondary command (fft) =", secondary_command_fft) print("[openwebrx-dsp-plugin:csdr] secondary command (fft) =", secondary_command_fft)
print("[openwebrx-dsp-plugin:csdr] secondary command (demod) =", secondary_command_demod) print("[openwebrx-dsp-plugin:csdr] secondary command (demod) =", secondary_command_demod)
#code.interact(local=locals())
my_env=os.environ.copy() my_env=os.environ.copy()
#if self.csdr_dynamic_bufsize: my_env["CSDR_DYNAMIC_BUFSIZE_ON"]="1"; #if self.csdr_dynamic_bufsize: my_env["CSDR_DYNAMIC_BUFSIZE_ON"]="1";
if self.csdr_print_bufsizes: my_env["CSDR_PRINT_BUFSIZES"]="1"; if self.csdr_print_bufsizes: my_env["CSDR_PRINT_BUFSIZES"]="1";
@ -179,8 +177,18 @@ class dsp:
def stop_secondary_demodulator(self): def stop_secondary_demodulator(self):
if self.secondary_processes_running == False: return if self.secondary_processes_running == False: return
self.try_delete_pipes(self.secondary_pipe_names) self.try_delete_pipes(self.secondary_pipe_names)
if self.secondary_process_fft: os.killpg(os.getpgid(self.secondary_process_fft.pid), signal.SIGTERM) if self.secondary_process_fft:
if self.secondary_process_demod: os.killpg(os.getpgid(self.secondary_process_demod.pid), signal.SIGTERM) try:
os.killpg(os.getpgid(self.secondary_process_fft.pid), signal.SIGTERM)
except ProcessLookupError:
# been killed by something else, ignore
pass
if self.secondary_process_demod:
try:
os.killpg(os.getpgid(self.secondary_process_demod.pid), signal.SIGTERM)
except ProcessLookupError:
# been killed by something else, ignore
pass
self.secondary_processes_running = False self.secondary_processes_running = False
def read_secondary_demod(self, size): def read_secondary_demod(self, size):
@ -235,8 +243,9 @@ class dsp:
self.calculate_decimation() self.calculate_decimation()
def set_demodulator(self,demodulator): def set_demodulator(self,demodulator):
#to change this, restart is required if (self.demodulator == demodulator): return
self.demodulator=demodulator self.demodulator=demodulator
self.restart()
def get_demodulator(self): def get_demodulator(self):
return self.demodulator return self.demodulator
@ -260,15 +269,19 @@ class dsp:
def set_offset_freq(self,offset_freq): def set_offset_freq(self,offset_freq):
self.offset_freq=offset_freq self.offset_freq=offset_freq
if self.running: if self.running:
self.modification_lock.acquire()
self.shift_pipe_file.write("%g\n"%(-float(self.offset_freq)/self.samp_rate)) self.shift_pipe_file.write("%g\n"%(-float(self.offset_freq)/self.samp_rate))
self.shift_pipe_file.flush() self.shift_pipe_file.flush()
self.modification_lock.release()
def set_bpf(self,low_cut,high_cut): def set_bpf(self,low_cut,high_cut):
self.low_cut=low_cut self.low_cut=low_cut
self.high_cut=high_cut self.high_cut=high_cut
if self.running: if self.running:
self.modification_lock.acquire()
self.bpf_pipe_file.write( "%g %g\n"%(float(self.low_cut)/self.if_samp_rate(), float(self.high_cut)/self.if_samp_rate()) ) self.bpf_pipe_file.write( "%g %g\n"%(float(self.low_cut)/self.if_samp_rate(), float(self.high_cut)/self.if_samp_rate()) )
self.bpf_pipe_file.flush() self.bpf_pipe_file.flush()
self.modification_lock.release()
def get_bpf(self): def get_bpf(self):
return [self.low_cut, self.high_cut] return [self.low_cut, self.high_cut]
@ -276,8 +289,10 @@ class dsp:
def set_squelch_level(self, squelch_level): def set_squelch_level(self, squelch_level):
self.squelch_level=squelch_level self.squelch_level=squelch_level
if self.running: if self.running:
self.modification_lock.acquire()
self.squelch_pipe_file.write( "%g\n"%(float(self.squelch_level)) ) self.squelch_pipe_file.write( "%g\n"%(float(self.squelch_level)) )
self.squelch_pipe_file.flush() self.squelch_pipe_file.flush()
self.modification_lock.release()
def get_smeter_level(self): def get_smeter_level(self):
if self.running: if self.running:
@ -317,36 +332,19 @@ class dsp:
except Exception as e: print("[openwebrx-dsp-plugin:csdr] try_delete_pipes() ::", e) except Exception as e: print("[openwebrx-dsp-plugin:csdr] try_delete_pipes() ::", e)
def start(self): def start(self):
if (self.running): return self.modification_lock.acquire()
if (self.running):
self.modification_lock.release()
return
self.running = True self.running = True
command_base=self.chain(self.demodulator) command_base=self.chain(self.demodulator)
#create control pipes for csdr #create control pipes for csdr
self.pipe_base_path="/tmp/openwebrx_pipe_{myid}_".format(myid=id(self)) self.pipe_base_path="/tmp/openwebrx_pipe_{myid}_".format(myid=id(self))
# self.bpf_pipe = self.shift_pipe = self.squelch_pipe = self.smeter_pipe = None
self.try_create_pipes(self.pipe_names, command_base) self.try_create_pipes(self.pipe_names, command_base)
# if "{bpf_pipe}" in command_base:
# self.bpf_pipe=pipe_base_path+"bpf"
# self.mkfifo(self.bpf_pipe)
# if "{shift_pipe}" in command_base:
# self.shift_pipe=pipe_base_path+"shift"
# self.mkfifo(self.shift_pipe)
# if "{squelch_pipe}" in command_base:
# self.squelch_pipe=pipe_base_path+"squelch"
# self.mkfifo(self.squelch_pipe)
# if "{smeter_pipe}" in command_base:
# self.smeter_pipe=pipe_base_path+"smeter"
# self.mkfifo(self.smeter_pipe)
# if "{iqtee_pipe}" in command_base:
# self.iqtee_pipe=pipe_base_path+"iqtee"
# self.mkfifo(self.iqtee_pipe)
# if "{iqtee2_pipe}" in command_base:
# self.iqtee2_pipe=pipe_base_path+"iqtee2"
# self.mkfifo(self.iqtee2_pipe)
#run the command #run the command
command=command_base.format( bpf_pipe=self.bpf_pipe, shift_pipe=self.shift_pipe, decimation=self.decimation, \ command=command_base.format( bpf_pipe=self.bpf_pipe, shift_pipe=self.shift_pipe, decimation=self.decimation, \
last_decimation=self.last_decimation, fft_size=self.fft_size, fft_block_size=self.fft_block_size(), fft_averages=self.fft_averages, \ last_decimation=self.last_decimation, fft_size=self.fft_size, fft_block_size=self.fft_block_size(), fft_averages=self.fft_averages, \
@ -355,7 +353,6 @@ class dsp:
squelch_pipe=self.squelch_pipe, smeter_pipe=self.smeter_pipe, iqtee_pipe=self.iqtee_pipe, iqtee2_pipe=self.iqtee2_pipe ) squelch_pipe=self.squelch_pipe, smeter_pipe=self.smeter_pipe, iqtee_pipe=self.iqtee_pipe, iqtee2_pipe=self.iqtee2_pipe )
print("[openwebrx-dsp-plugin:csdr] Command =",command) print("[openwebrx-dsp-plugin:csdr] Command =",command)
#code.interact(local=locals())
my_env=os.environ.copy() my_env=os.environ.copy()
if self.csdr_dynamic_bufsize: my_env["CSDR_DYNAMIC_BUFSIZE_ON"]="1"; if self.csdr_dynamic_bufsize: my_env["CSDR_DYNAMIC_BUFSIZE_ON"]="1";
if self.csdr_print_bufsizes: my_env["CSDR_PRINT_BUFSIZES"]="1"; if self.csdr_print_bufsizes: my_env["CSDR_PRINT_BUFSIZES"]="1";
@ -364,30 +361,39 @@ class dsp:
def watch_thread(): def watch_thread():
rc = self.process.wait() rc = self.process.wait()
print("dsp thread ended with rc={0}".format(rc)) print("dsp thread ended with rc={0}".format(rc))
if (self.running): if (rc == 0 and self.running and not self.modification_lock.locked()):
print("restarting since rc = 0, self.running = true, and no modification")
self.restart() self.restart()
threading.Thread(target = watch_thread).start() threading.Thread(target = watch_thread).start()
#open control pipes for csdr and send initialization data # open control pipes for csdr
if self.bpf_pipe != None: if self.bpf_pipe != None:
self.bpf_pipe_file=open(self.bpf_pipe,"w") self.bpf_pipe_file=open(self.bpf_pipe,"w")
self.set_bpf(self.low_cut,self.high_cut) if self.shift_pipe:
if self.shift_pipe != None:
self.shift_pipe_file=open(self.shift_pipe,"w") self.shift_pipe_file=open(self.shift_pipe,"w")
self.set_offset_freq(self.offset_freq) if self.squelch_pipe:
if self.squelch_pipe != None:
self.squelch_pipe_file=open(self.squelch_pipe,"w") self.squelch_pipe_file=open(self.squelch_pipe,"w")
self.set_squelch_level(self.squelch_level)
if self.smeter_pipe != None:
self.smeter_pipe_file=open(self.smeter_pipe,"r")
self.start_secondary_demodulator() self.start_secondary_demodulator()
self.modification_lock.release()
# send initial config through the pipes
if self.squelch_pipe:
self.set_squelch_level(self.squelch_level)
if self.shift_pipe:
self.set_offset_freq(self.offset_freq)
if self.bpf_pipe:
self.set_bpf(self.low_cut, self.high_cut)
if self.smeter_pipe:
self.smeter_pipe_file=open(self.smeter_pipe,"r")
def read(self,size): def read(self,size):
return self.process.stdout.read(size) return self.process.stdout.read(size)
def stop(self): def stop(self):
self.modification_lock.acquire()
self.running = False self.running = False
if hasattr(self, "process"): if hasattr(self, "process"):
try: try:
@ -396,34 +402,10 @@ class dsp:
# been killed by something else, ignore # been killed by something else, ignore
pass pass
self.stop_secondary_demodulator() self.stop_secondary_demodulator()
#if(self.process.poll()!=None):return # returns None while subprocess is running
#while(self.process.poll()==None):
# #self.process.kill()
# print "killproc",os.getpgid(self.process.pid),self.process.pid
# os.killpg(self.process.pid, signal.SIGTERM)
#
# time.sleep(0.1)
self.try_delete_pipes(self.pipe_names) self.try_delete_pipes(self.pipe_names)
# if self.bpf_pipe: self.modification_lock.release()
# try: os.unlink(self.bpf_pipe)
# except: print "[openwebrx-dsp-plugin:csdr] stop() :: unlink failed: " + self.bpf_pipe
# if self.shift_pipe:
# try: os.unlink(self.shift_pipe)
# except: print "[openwebrx-dsp-plugin:csdr] stop() :: unlink failed: " + self.shift_pipe
# if self.squelch_pipe:
# try: os.unlink(self.squelch_pipe)
# except: print "[openwebrx-dsp-plugin:csdr] stop() :: unlink failed: " + self.squelch_pipe
# if self.smeter_pipe:
# try: os.unlink(self.smeter_pipe)
# except: print "[openwebrx-dsp-plugin:csdr] stop() :: unlink failed: " + self.smeter_pipe
# if self.iqtee_pipe:
# try: os.unlink(self.iqtee_pipe)
# except: print "[openwebrx-dsp-plugin:csdr] stop() :: unlink failed: " + self.iqtee_pipe
# if self.iqtee2_pipe:
# try: os.unlink(self.iqtee2_pipe)
# except: print "[openwebrx-dsp-plugin:csdr] stop() :: unlink failed: " + self.iqtee2_pipe
def restart(self): def restart(self):
if not self.running: return if not self.running: return

View File

@ -29,7 +29,7 @@ class RtlNmuxSource(object):
) )
def restart(name, value): def restart(name, value):
print("would now restart rtl source due to property change: {0} changed to {1}".format(name, value)) print("restarting rtl source due to property change: {0} changed to {1}".format(name, value))
self.stop() self.stop()
self.start() self.start()
props.wire(restart) props.wire(restart)
@ -171,7 +171,6 @@ class DspManager(object):
self.dsp.csdr_through = self.localProps["csdr_through"] self.dsp.csdr_through = self.localProps["csdr_through"]
self.localProps.getProperty("samp_rate").wire(self.dsp.set_samp_rate) self.localProps.getProperty("samp_rate").wire(self.dsp.set_samp_rate)
#do_secondary_demod=False
self.localProps.getProperty("output_rate").wire(self.dsp.set_output_rate) self.localProps.getProperty("output_rate").wire(self.dsp.set_output_rate)
self.localProps.getProperty("offset_freq").wire(self.dsp.set_offset_freq) self.localProps.getProperty("offset_freq").wire(self.dsp.set_offset_freq)
@ -189,12 +188,7 @@ class DspManager(object):
self.dsp.set_bpf(*bpf) self.dsp.set_bpf(*bpf)
self.localProps.getProperty("high_cut").wire(set_high_cut) self.localProps.getProperty("high_cut").wire(set_high_cut)
def set_mod(mod): self.localProps.getProperty("mod").wire(self.dsp.set_demodulator)
if (self.dsp.get_demodulator() == mod): return
self.dsp.stop()
self.dsp.set_demodulator(mod)
self.dsp.start()
self.localProps.getProperty("mod").wire(set_mod)
if (self.localProps["digimodes_enable"]): if (self.localProps["digimodes_enable"]):
def set_secondary_mod(mod): def set_secondary_mod(mod):
@ -202,10 +196,8 @@ class DspManager(object):
if self.dsp.get_secondary_demodulator() == mod: return if self.dsp.get_secondary_demodulator() == mod: return
self.stopSecondaryThreads() self.stopSecondaryThreads()
self.dsp.stop() self.dsp.stop()
if mod is None:
self.dsp.set_secondary_demodulator(None)
else:
self.dsp.set_secondary_demodulator(mod) self.dsp.set_secondary_demodulator(mod)
if mod is not None:
self.handler.write_secondary_dsp_config({ self.handler.write_secondary_dsp_config({
"secondary_fft_size":self.localProps["digimodes_fft_size"], "secondary_fft_size":self.localProps["digimodes_fft_size"],
"if_samp_rate":self.dsp.if_samp_rate(), "if_samp_rate":self.dsp.if_samp_rate(),