Merge branch 'server_rework' into server_rework_dsd

This commit is contained in:
Jakob Ketterl
2019-05-13 17:46:02 +02:00
17 changed files with 1652 additions and 1483 deletions

234
csdr.py
View File

@ -23,9 +23,11 @@ OpenWebRX csdr plugin: do the signal processing with csdr
import subprocess
import time
import os
import code
import signal
import fcntl
import threading
import logging
logger = logging.getLogger(__name__)
class dsp:
@ -45,7 +47,6 @@ class dsp:
self.fft_compression = "none"
self.demodulator = "nfm"
self.name = "csdr"
self.format_conversion = "csdr convert_u8_f"
self.base_bufsize = 512
self.nc_port = 4951
self.csdr_dynamic_bufsize = False
@ -62,6 +63,7 @@ class dsp:
self.pipe_names=["bpf_pipe", "shift_pipe", "squelch_pipe", "smeter_pipe", "meta_pipe", "iqtee_pipe", "iqtee2_pipe"]
self.secondary_pipe_names=["secondary_shift_pipe"]
self.secondary_offset_freq = 1000
self.modification_lock = threading.Lock()
def chain(self,which):
if which in [ "dmr", "dstar", "nxdn", "ysf" ]:
@ -71,7 +73,6 @@ class dsp:
any_chain_base="nc -v 127.0.0.1 {nc_port} | "
if self.csdr_dynamic_bufsize: any_chain_base+="csdr setbuf {start_bufsize} | "
if self.csdr_through: any_chain_base+="csdr through | "
any_chain_base+=self.format_conversion+(" | " if self.format_conversion!="" else "") ##"csdr flowcontrol {flowcontrol} auto 1.5 10 | "
if which == "fft":
fft_chain_base = any_chain_base+"csdr fft_cc {fft_size} {fft_block_size} | " + \
("csdr logpower_cf -70 | " if self.fft_averages == 0 else "csdr logaveragepower_cf -70 {fft_size} {fft_averages} | ") + \
@ -122,7 +123,7 @@ class dsp:
return secondary_chain_base+"csdr realpart_cf | csdr fft_fc {secondary_fft_input_size} {secondary_fft_block_size} | csdr logpower_cf -70 " + (" | csdr compress_fft_adpcm_f_u8 {secondary_fft_size}" if self.fft_compression=="adpcm" else "")
elif which == "bpsk31":
return secondary_chain_base + "csdr shift_addition_cc --fifo {secondary_shift_pipe} | " + \
"csdr bandpass_fir_fft_cc $(csdr '=-(31.25)/{if_samp_rate}') $(csdr '=(31.25)/{if_samp_rate}') $(csdr '=31.25/{if_samp_rate}') | " + \
"csdr bandpass_fir_fft_cc -{secondary_bpf_cutoff} {secondary_bpf_cutoff} {secondary_bpf_cutoff} | " + \
"csdr simple_agc_cc 0.001 0.5 | " + \
"csdr timing_recovery_cc GARDNER {secondary_samples_per_bits} 0.5 2 --add_q | " + \
"CSDR_FIXED_BUFSIZE=1 csdr dbpsk_decoder_c_u8 | " + \
@ -139,12 +140,12 @@ class dsp:
def secondary_bpf_cutoff(self):
if self.secondary_demodulator == "bpsk31":
return (31.25/2) / self.if_samp_rate()
return 31.25 / self.if_samp_rate()
return 0
def secondary_bpf_transition_bw(self):
if self.secondary_demodulator == "bpsk31":
return (31.25/2) / self.if_samp_rate()
return 31.25 / self.if_samp_rate()
return 0
def secondary_samples_per_bits(self):
@ -157,51 +158,43 @@ class dsp:
return 31.25
def start_secondary_demodulator(self):
if(not self.secondary_demodulator): return
print "[openwebrx] starting secondary demodulator from IF input sampled at %d"%self.if_samp_rate()
if not self.secondary_demodulator: return
logger.debug("[openwebrx] starting secondary demodulator from IF input sampled at %d"%self.if_samp_rate())
secondary_command_fft=self.secondary_chain("fft")
secondary_command_demod=self.secondary_chain(self.secondary_demodulator)
self.try_create_pipes(self.secondary_pipe_names, secondary_command_demod + secondary_command_fft)
secondary_command_fft=secondary_command_fft.format( \
input_pipe=self.iqtee_pipe, \
secondary_fft_input_size=self.secondary_fft_size, \
secondary_fft_size=self.secondary_fft_size, \
secondary_fft_block_size=self.secondary_fft_block_size(), \
secondary_command_fft=secondary_command_fft.format(
input_pipe=self.iqtee_pipe,
secondary_fft_input_size=self.secondary_fft_size,
secondary_fft_size=self.secondary_fft_size,
secondary_fft_block_size=self.secondary_fft_block_size(),
)
secondary_command_demod=secondary_command_demod.format( \
input_pipe=self.iqtee2_pipe, \
secondary_shift_pipe=self.secondary_shift_pipe, \
secondary_decimation=self.secondary_decimation(), \
secondary_samples_per_bits=self.secondary_samples_per_bits(), \
secondary_bpf_cutoff=self.secondary_bpf_cutoff(), \
secondary_bpf_transition_bw=self.secondary_bpf_transition_bw(), \
secondary_command_demod=secondary_command_demod.format(
input_pipe=self.iqtee2_pipe,
secondary_shift_pipe=self.secondary_shift_pipe,
secondary_decimation=self.secondary_decimation(),
secondary_samples_per_bits=self.secondary_samples_per_bits(),
secondary_bpf_cutoff=self.secondary_bpf_cutoff(),
secondary_bpf_transition_bw=self.secondary_bpf_transition_bw(),
if_samp_rate=self.if_samp_rate()
)
print "[openwebrx-dsp-plugin:csdr] secondary command (fft) =", secondary_command_fft
print "[openwebrx-dsp-plugin:csdr] secondary command (demod) =", secondary_command_demod
#code.interact(local=locals())
logger.debug("[openwebrx-dsp-plugin:csdr] secondary command (fft) = %s", secondary_command_fft)
logger.debug("[openwebrx-dsp-plugin:csdr] secondary command (demod) = %s", secondary_command_demod)
my_env=os.environ.copy()
#if self.csdr_dynamic_bufsize: my_env["CSDR_DYNAMIC_BUFSIZE_ON"]="1";
if self.csdr_print_bufsizes: my_env["CSDR_PRINT_BUFSIZES"]="1";
self.secondary_process_fft = subprocess.Popen(secondary_command_fft, stdout=subprocess.PIPE, shell=True, preexec_fn=os.setpgrp, env=my_env)
print "[openwebrx-dsp-plugin:csdr] Popen on secondary command (fft)"
logger.debug("[openwebrx-dsp-plugin:csdr] Popen on secondary command (fft)")
self.secondary_process_demod = subprocess.Popen(secondary_command_demod, stdout=subprocess.PIPE, shell=True, preexec_fn=os.setpgrp, env=my_env) #TODO digimodes
print "[openwebrx-dsp-plugin:csdr] Popen on secondary command (demod)" #TODO digimodes
logger.debug("[openwebrx-dsp-plugin:csdr] Popen on secondary command (demod)") #TODO digimodes
self.secondary_processes_running = True
#open control pipes for csdr and send initialization data
# print "==========> 1"
if self.secondary_shift_pipe != None: #TODO digimodes
# print "==========> 2", self.secondary_shift_pipe
self.secondary_shift_pipe_file=open(self.secondary_shift_pipe,"w") #TODO digimodes
# print "==========> 3"
self.set_secondary_offset_freq(self.secondary_offset_freq) #TODO digimodes
# print "==========> 4"
self.set_pipe_nonblocking(self.secondary_process_demod.stdout)
self.set_pipe_nonblocking(self.secondary_process_fft.stdout)
def set_secondary_offset_freq(self, value):
self.secondary_offset_freq=value
@ -212,8 +205,18 @@ class dsp:
def stop_secondary_demodulator(self):
if self.secondary_processes_running == False: return
self.try_delete_pipes(self.secondary_pipe_names)
if self.secondary_process_fft: os.killpg(os.getpgid(self.secondary_process_fft.pid), signal.SIGTERM)
if self.secondary_process_demod: os.killpg(os.getpgid(self.secondary_process_demod.pid), signal.SIGTERM)
if self.secondary_process_fft:
try:
os.killpg(os.getpgid(self.secondary_process_fft.pid), signal.SIGTERM)
except ProcessLookupError:
# been killed by something else, ignore
pass
if self.secondary_process_demod:
try:
os.killpg(os.getpgid(self.secondary_process_demod.pid), signal.SIGTERM)
except ProcessLookupError:
# been killed by something else, ignore
pass
self.secondary_processes_running = False
def read_secondary_demod(self, size):
@ -244,8 +247,11 @@ class dsp:
if self.fft_compression=="adpcm": return (self.secondary_fft_size/2)+(10/2)
def set_samp_rate(self,samp_rate):
#to change this, restart is required
self.samp_rate=samp_rate
self.calculate_decimation()
if self.running: self.restart()
def calculate_decimation(self):
self.decimation=1
while self.samp_rate/(self.decimation+1)>=self.output_rate:
self.decimation+=1
@ -262,46 +268,48 @@ class dsp:
def set_output_rate(self,output_rate):
self.output_rate=output_rate
self.set_samp_rate(self.samp_rate) #as it depends on output_rate
self.calculate_decimation()
def set_demodulator(self,demodulator):
#to change this, restart is required
if (self.demodulator == demodulator): return
self.demodulator=demodulator
self.restart()
def get_demodulator(self):
return self.demodulator
def set_fft_size(self,fft_size):
#to change this, restart is required
self.fft_size=fft_size
self.restart()
def set_fft_fps(self,fft_fps):
#to change this, restart is required
self.fft_fps=fft_fps
self.restart()
def set_fft_averages(self,fft_averages):
#to change this, restart is required
self.fft_averages=fft_averages
self.restart()
def fft_block_size(self):
if self.fft_averages == 0: return self.samp_rate/self.fft_fps
else: return self.samp_rate/self.fft_fps/self.fft_averages
def set_format_conversion(self,format_conversion):
self.format_conversion=format_conversion
def set_offset_freq(self,offset_freq):
self.offset_freq=offset_freq
if self.running:
self.modification_lock.acquire()
self.shift_pipe_file.write("%g\n"%(-float(self.offset_freq)/self.samp_rate))
self.shift_pipe_file.flush()
self.modification_lock.release()
def set_bpf(self,low_cut,high_cut):
self.low_cut=low_cut
self.high_cut=high_cut
if self.running:
self.modification_lock.acquire()
self.bpf_pipe_file.write( "%g %g\n"%(float(self.low_cut)/self.if_samp_rate(), float(self.high_cut)/self.if_samp_rate()) )
self.bpf_pipe_file.flush()
self.modification_lock.release()
def get_bpf(self):
return [self.low_cut, self.high_cut]
@ -309,13 +317,20 @@ class dsp:
def set_squelch_level(self, squelch_level):
self.squelch_level=squelch_level
if self.running:
self.modification_lock.acquire()
self.squelch_pipe_file.write( "%g\n"%(float(self.squelch_level)) )
self.squelch_pipe_file.flush()
self.modification_lock.release()
def get_smeter_level(self):
if self.running:
line=self.smeter_pipe_file.readline()
return float(line[:-1])
try:
return float(line[:-1])
except ValueError:
return 0
else:
time.sleep(1)
def get_metadata(self):
if self.running and self.meta_pipe:
@ -332,9 +347,7 @@ class dsp:
return self.ddc_transition_bw_rate*(self.if_samp_rate()/float(self.samp_rate))
def try_create_pipes(self, pipe_names, command_base):
# print "try_create_pipes"
for pipe_name in pipe_names:
# print "\t"+pipe_name
if "{"+pipe_name+"}" in command_base:
setattr(self, pipe_name, self.pipe_base_path+pipe_name)
self.mkfifo(getattr(self, pipe_name))
@ -346,122 +359,89 @@ class dsp:
pipe_path = getattr(self,pipe_name,None)
if pipe_path:
try: os.unlink(pipe_path)
except Exception as e: print "[openwebrx-dsp-plugin:csdr] try_delete_pipes() ::", e
def set_pipe_nonblocking(self, pipe):
flags = fcntl.fcntl(pipe, fcntl.F_GETFL)
fcntl.fcntl(pipe, fcntl.F_SETFL, flags | os.O_NONBLOCK)
except Exception:
logger.exception("try_delete_pipes()")
def start(self):
self.modification_lock.acquire()
if (self.running):
self.modification_lock.release()
return
self.running = True
command_base=self.chain(self.demodulator)
#create control pipes for csdr
self.pipe_base_path="/tmp/openwebrx_pipe_{myid}_".format(myid=id(self))
# self.bpf_pipe = self.shift_pipe = self.squelch_pipe = self.smeter_pipe = None
self.try_create_pipes(self.pipe_names, command_base)
# if "{bpf_pipe}" in command_base:
# self.bpf_pipe=pipe_base_path+"bpf"
# self.mkfifo(self.bpf_pipe)
# if "{shift_pipe}" in command_base:
# self.shift_pipe=pipe_base_path+"shift"
# self.mkfifo(self.shift_pipe)
# if "{squelch_pipe}" in command_base:
# self.squelch_pipe=pipe_base_path+"squelch"
# self.mkfifo(self.squelch_pipe)
# if "{smeter_pipe}" in command_base:
# self.smeter_pipe=pipe_base_path+"smeter"
# self.mkfifo(self.smeter_pipe)
# if "{iqtee_pipe}" in command_base:
# self.iqtee_pipe=pipe_base_path+"iqtee"
# self.mkfifo(self.iqtee_pipe)
# if "{iqtee2_pipe}" in command_base:
# self.iqtee2_pipe=pipe_base_path+"iqtee2"
# self.mkfifo(self.iqtee2_pipe)
#run the command
command=command_base.format( bpf_pipe=self.bpf_pipe, shift_pipe=self.shift_pipe, decimation=self.decimation, \
last_decimation=self.last_decimation, fft_size=self.fft_size, fft_block_size=self.fft_block_size(), fft_averages=self.fft_averages, \
bpf_transition_bw=float(self.bpf_transition_bw)/self.if_samp_rate(), ddc_transition_bw=self.ddc_transition_bw(), \
flowcontrol=int(self.samp_rate*2), start_bufsize=self.base_bufsize*self.decimation, nc_port=self.nc_port, \
command=command_base.format( bpf_pipe=self.bpf_pipe, shift_pipe=self.shift_pipe, decimation=self.decimation,
last_decimation=self.last_decimation, fft_size=self.fft_size, fft_block_size=self.fft_block_size(), fft_averages=self.fft_averages,
bpf_transition_bw=float(self.bpf_transition_bw)/self.if_samp_rate(), ddc_transition_bw=self.ddc_transition_bw(),
flowcontrol=int(self.samp_rate*2), start_bufsize=self.base_bufsize*self.decimation, nc_port=self.nc_port,
squelch_pipe=self.squelch_pipe, smeter_pipe=self.smeter_pipe, meta_pipe=self.meta_pipe, iqtee_pipe=self.iqtee_pipe, iqtee2_pipe=self.iqtee2_pipe )
print "[openwebrx-dsp-plugin:csdr] Command =",command
#code.interact(local=locals())
logger.debug("[openwebrx-dsp-plugin:csdr] Command = %s", command)
my_env=os.environ.copy()
if self.csdr_dynamic_bufsize: my_env["CSDR_DYNAMIC_BUFSIZE_ON"]="1";
if self.csdr_print_bufsizes: my_env["CSDR_PRINT_BUFSIZES"]="1";
self.process = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True, preexec_fn=os.setpgrp, env=my_env)
#set stdout to non-blocking to avoid blocking the main loop when no audio was decoded in digital modes
self.set_pipe_nonblocking(self.process.stdout)
def watch_thread():
rc = self.process.wait()
logger.debug("dsp thread ended with rc=%d", rc)
if (rc == 0 and self.running and not self.modification_lock.locked()):
logger.debug("restarting since rc = 0, self.running = true, and no modification")
self.restart()
self.running = True
threading.Thread(target = watch_thread).start()
#open control pipes for csdr and send initialization data
# open control pipes for csdr
if self.bpf_pipe != None:
self.bpf_pipe_file=open(self.bpf_pipe,"w")
self.set_bpf(self.low_cut,self.high_cut)
if self.shift_pipe != None:
if self.shift_pipe:
self.shift_pipe_file=open(self.shift_pipe,"w")
self.set_offset_freq(self.offset_freq)
if self.squelch_pipe != None:
if self.squelch_pipe:
self.squelch_pipe_file=open(self.squelch_pipe,"w")
self.set_squelch_level(self.squelch_level)
if self.smeter_pipe != None:
self.smeter_pipe_file=open(self.smeter_pipe,"r")
self.set_pipe_nonblocking(self.smeter_pipe_file)
if self.meta_pipe != None:
self.meta_pipe_file=open(self.meta_pipe,"r")
self.set_pipe_nonblocking(self.meta_pipe_file)
self.start_secondary_demodulator()
self.modification_lock.release()
# send initial config through the pipes
if self.squelch_pipe:
self.set_squelch_level(self.squelch_level)
if self.shift_pipe:
self.set_offset_freq(self.offset_freq)
if self.bpf_pipe:
self.set_bpf(self.low_cut, self.high_cut)
if self.smeter_pipe:
self.smeter_pipe_file=open(self.smeter_pipe,"r")
if self.meta_pipe != None:
self.meta_pipe_file=open(self.meta_pipe,"r")
def read(self,size):
return self.process.stdout.read(size)
def read_async(self, size):
try:
return self.process.stdout.read(size)
except IOError:
return None
def stop(self):
os.killpg(os.getpgid(self.process.pid), signal.SIGTERM)
self.modification_lock.acquire()
self.running = False
if hasattr(self, "process"):
try:
os.killpg(os.getpgid(self.process.pid), signal.SIGTERM)
except ProcessLookupError:
# been killed by something else, ignore
pass
self.stop_secondary_demodulator()
#if(self.process.poll()!=None):return # returns None while subprocess is running
#while(self.process.poll()==None):
# #self.process.kill()
# print "killproc",os.getpgid(self.process.pid),self.process.pid
# os.killpg(self.process.pid, signal.SIGTERM)
#
# time.sleep(0.1)
self.try_delete_pipes(self.pipe_names)
# if self.bpf_pipe:
# try: os.unlink(self.bpf_pipe)
# except: print "[openwebrx-dsp-plugin:csdr] stop() :: unlink failed: " + self.bpf_pipe
# if self.shift_pipe:
# try: os.unlink(self.shift_pipe)
# except: print "[openwebrx-dsp-plugin:csdr] stop() :: unlink failed: " + self.shift_pipe
# if self.squelch_pipe:
# try: os.unlink(self.squelch_pipe)
# except: print "[openwebrx-dsp-plugin:csdr] stop() :: unlink failed: " + self.squelch_pipe
# if self.smeter_pipe:
# try: os.unlink(self.smeter_pipe)
# except: print "[openwebrx-dsp-plugin:csdr] stop() :: unlink failed: " + self.smeter_pipe
# if self.iqtee_pipe:
# try: os.unlink(self.iqtee_pipe)
# except: print "[openwebrx-dsp-plugin:csdr] stop() :: unlink failed: " + self.iqtee_pipe
# if self.iqtee2_pipe:
# try: os.unlink(self.iqtee2_pipe)
# except: print "[openwebrx-dsp-plugin:csdr] stop() :: unlink failed: " + self.iqtee2_pipe
self.running = False
self.modification_lock.release()
def restart(self):
if not self.running: return
self.stop()
self.start()