Merge branch 'develop' into agc_work
This commit is contained in:
428
csdr/csdr.py
428
csdr/csdr.py
@ -29,7 +29,11 @@ import math
|
||||
from functools import partial
|
||||
|
||||
from owrx.kiss import KissClient, DirewolfConfig
|
||||
from owrx.wsjt import Ft8Chopper, WsprChopper, Jt9Chopper, Jt65Chopper, Ft4Chopper
|
||||
from owrx.wsjt import Ft8Profile, WsprProfile, Jt9Profile, Jt65Profile, Ft4Profile
|
||||
from owrx.js8 import Js8Profiles
|
||||
from owrx.audio import AudioChopper
|
||||
|
||||
from csdr.pipe import Pipe
|
||||
|
||||
import logging
|
||||
|
||||
@ -41,7 +45,7 @@ class output(object):
|
||||
if not self.supports_type(t):
|
||||
# TODO rewrite the output mechanism in a way that avoids producing unnecessary data
|
||||
logger.warning("dumping output of type %s since it is not supported.", t)
|
||||
threading.Thread(target=self.pump(read_fn, lambda x: None)).start()
|
||||
threading.Thread(target=self.pump(read_fn, lambda x: None), name="csdr_pump_thread").start()
|
||||
return
|
||||
self.receive_output(t, read_fn)
|
||||
|
||||
@ -52,7 +56,11 @@ class output(object):
|
||||
def copy():
|
||||
run = True
|
||||
while run:
|
||||
data = read()
|
||||
data = None
|
||||
try:
|
||||
data = read()
|
||||
except ValueError:
|
||||
pass
|
||||
if data is None or (isinstance(data, bytes) and len(data) == 0):
|
||||
run = False
|
||||
else:
|
||||
@ -68,8 +76,10 @@ class dsp(object):
|
||||
def __init__(self, output):
|
||||
self.samp_rate = 250000
|
||||
self.output_rate = 11025
|
||||
self.hd_output_rate = 44100
|
||||
self.fft_size = 1024
|
||||
self.fft_fps = 5
|
||||
self.center_freq = 0
|
||||
self.offset_freq = 0
|
||||
self.low_cut = -4000
|
||||
self.high_cut = 4000
|
||||
@ -82,6 +92,8 @@ class dsp(object):
|
||||
self.demodulator = "nfm"
|
||||
self.name = "csdr"
|
||||
self.base_bufsize = 512
|
||||
self.decimation = None
|
||||
self.last_decimation = None
|
||||
self.nc_port = None
|
||||
self.csdr_dynamic_bufsize = False
|
||||
self.csdr_print_bufsizes = False
|
||||
@ -94,31 +106,38 @@ class dsp(object):
|
||||
self.secondary_fft_size = 1024
|
||||
self.secondary_process_fft = None
|
||||
self.secondary_process_demod = None
|
||||
self.pipe_names = [
|
||||
"bpf_pipe",
|
||||
"shift_pipe",
|
||||
"squelch_pipe",
|
||||
"smeter_pipe",
|
||||
"meta_pipe",
|
||||
"iqtee_pipe",
|
||||
"iqtee2_pipe",
|
||||
"dmr_control_pipe",
|
||||
]
|
||||
self.secondary_pipe_names = ["secondary_shift_pipe"]
|
||||
self.pipe_names = {
|
||||
"bpf_pipe": Pipe.WRITE,
|
||||
"shift_pipe": Pipe.WRITE,
|
||||
"squelch_pipe": Pipe.WRITE,
|
||||
"smeter_pipe": Pipe.READ,
|
||||
"meta_pipe": Pipe.READ,
|
||||
"iqtee_pipe": Pipe.NONE,
|
||||
"iqtee2_pipe": Pipe.NONE,
|
||||
"dmr_control_pipe": Pipe.WRITE,
|
||||
}
|
||||
self.pipes = {}
|
||||
self.secondary_pipe_names = {"secondary_shift_pipe": Pipe.WRITE}
|
||||
self.secondary_offset_freq = 1000
|
||||
self.unvoiced_quality = 1
|
||||
self.modification_lock = threading.Lock()
|
||||
self.output = output
|
||||
self.temporary_directory = "/tmp"
|
||||
|
||||
self.temporary_directory = None
|
||||
self.pipe_base_path = None
|
||||
self.set_temporary_directory("/tmp")
|
||||
|
||||
self.is_service = False
|
||||
self.direwolf_config = None
|
||||
self.direwolf_port = None
|
||||
self.process = None
|
||||
|
||||
def set_service(self, flag=True):
|
||||
self.is_service = flag
|
||||
|
||||
def set_temporary_directory(self, what):
|
||||
self.temporary_directory = what
|
||||
self.pipe_base_path = "{tmp_dir}/openwebrx_pipe_".format(tmp_dir=self.temporary_directory)
|
||||
|
||||
def chain(self, which):
|
||||
chain = ["nc -v 127.0.0.1 {nc_port}"]
|
||||
@ -137,7 +156,7 @@ class dsp(object):
|
||||
if self.fft_compression == "adpcm":
|
||||
chain += ["csdr compress_fft_adpcm_f_u8 {fft_size}"]
|
||||
return chain
|
||||
chain += ["csdr shift_addition_cc --fifo {shift_pipe}"]
|
||||
chain += ["csdr shift_addfast_cc --fifo {shift_pipe}"]
|
||||
if self.decimation > 1:
|
||||
chain += ["csdr fir_decimate_cc {decimation} {ddc_transition_bw} HAMMING"]
|
||||
chain += ["csdr bandpass_fir_fft_cc --fifo {bpf_pipe} {bpf_transition_bw} HAMMING"]
|
||||
@ -153,9 +172,12 @@ class dsp(object):
|
||||
if not self.output.supports_type("audio"):
|
||||
return chain
|
||||
# safe some cpu cycles... no need to decimate if decimation factor is 1
|
||||
last_decimation_block = (
|
||||
["csdr fractional_decimator_ff {last_decimation}"] if self.last_decimation != 1.0 else []
|
||||
)
|
||||
last_decimation_block = []
|
||||
if self.last_decimation >= 2.0:
|
||||
# activate prefilter if signal has been oversampled, e.g. WFM
|
||||
last_decimation_block = ["csdr fractional_decimator_ff {last_decimation} 12 --prefilter"]
|
||||
elif self.last_decimation >= 1.0:
|
||||
last_decimation_block = ["csdr fractional_decimator_ff {last_decimation}"]
|
||||
if which == "nfm":
|
||||
chain += ["csdr fmdemod_quadri_cf", "csdr limit_ff"]
|
||||
chain += last_decimation_block
|
||||
@ -166,6 +188,16 @@ class dsp(object):
|
||||
]
|
||||
else:
|
||||
chain += ["csdr convert_f_s16"]
|
||||
elif which == "wfm":
|
||||
chain += [
|
||||
"csdr fmdemod_quadri_cf",
|
||||
"csdr limit_ff",
|
||||
]
|
||||
chain += last_decimation_block
|
||||
chain += [
|
||||
"csdr deemphasis_wfm_ff {audio_rate} 50e-6",
|
||||
"csdr convert_f_s16"
|
||||
]
|
||||
elif self.isDigitalVoice(which):
|
||||
chain += ["csdr fmdemod_quadri_cf", "dc_block "]
|
||||
chain += last_decimation_block
|
||||
@ -202,6 +234,15 @@ class dsp(object):
|
||||
"csdr limit_ff",
|
||||
"csdr convert_f_s16",
|
||||
]
|
||||
elif self.isFreeDV(which):
|
||||
chain += ["csdr realpart_cf"]
|
||||
chain += last_decimation_block
|
||||
chain += [
|
||||
"csdr agc_ff",
|
||||
"csdr convert_f_s16",
|
||||
"freedv_rx 1600 - -",
|
||||
"sox -t raw -r 8000 -e signed-integer -b 16 -c 1 --buffer 32 - -t raw -r {output_rate} -e signed-integer -b 16 -c 1 - ",
|
||||
]
|
||||
elif which == "ssb":
|
||||
chain += ["csdr realpart_cf"]
|
||||
chain += last_decimation_block
|
||||
@ -231,14 +272,14 @@ class dsp(object):
|
||||
return chain
|
||||
elif which == "bpsk31" or which == "bpsk63":
|
||||
return chain + [
|
||||
"csdr shift_addition_cc --fifo {secondary_shift_pipe}",
|
||||
"csdr shift_addfast_cc --fifo {secondary_shift_pipe}",
|
||||
"csdr bandpass_fir_fft_cc -{secondary_bpf_cutoff} {secondary_bpf_cutoff} {secondary_bpf_cutoff}",
|
||||
"csdr simple_agc_cc 0.001 0.5",
|
||||
"csdr timing_recovery_cc GARDNER {secondary_samples_per_bits} 0.5 2 --add_q",
|
||||
"CSDR_FIXED_BUFSIZE=1 csdr dbpsk_decoder_c_u8",
|
||||
"CSDR_FIXED_BUFSIZE=1 csdr psk31_varicode_decoder_u8_u8",
|
||||
]
|
||||
elif self.isWsjtMode(which):
|
||||
elif self.isWsjtMode(which) or self.isJs8(which):
|
||||
chain += ["csdr realpart_cf"]
|
||||
if self.last_decimation != 1.0:
|
||||
chain += ["csdr fractional_decimator_ff {last_decimation}"]
|
||||
@ -247,7 +288,7 @@ class dsp(object):
|
||||
chain += ["csdr fmdemod_quadri_cf"]
|
||||
if self.last_decimation != 1.0:
|
||||
chain += ["csdr fractional_decimator_ff {last_decimation}"]
|
||||
return chain + ["csdr convert_f_s16", "direwolf -c {direwolf_config} -r {audio_rate} -t 0 -q d -q h - 1>&2"]
|
||||
return chain + ["csdr convert_f_s16", "direwolf -c {direwolf_config} -r {audio_rate} -t 0 -q d -q h 1>&2"]
|
||||
elif which == "pocsag":
|
||||
chain += ["csdr fmdemod_quadri_cf"]
|
||||
if self.last_decimation != 1.0:
|
||||
@ -305,8 +346,8 @@ class dsp(object):
|
||||
self.try_create_configs(secondary_command_demod)
|
||||
|
||||
secondary_command_demod = secondary_command_demod.format(
|
||||
input_pipe=self.iqtee2_pipe,
|
||||
secondary_shift_pipe=self.secondary_shift_pipe,
|
||||
input_pipe=self.pipes["iqtee2_pipe"],
|
||||
secondary_shift_pipe=self.pipes["secondary_shift_pipe"],
|
||||
secondary_decimation=self.secondary_decimation(),
|
||||
secondary_samples_per_bits=self.secondary_samples_per_bits(),
|
||||
secondary_bpf_cutoff=self.secondary_bpf_cutoff(),
|
||||
@ -325,7 +366,7 @@ class dsp(object):
|
||||
if self.output.supports_type("secondary_fft"):
|
||||
secondary_command_fft = " | ".join(self.secondary_chain("fft"))
|
||||
secondary_command_fft = secondary_command_fft.format(
|
||||
input_pipe=self.iqtee_pipe,
|
||||
input_pipe=self.pipes["iqtee_pipe"],
|
||||
secondary_fft_input_size=self.secondary_fft_size,
|
||||
secondary_fft_size=self.secondary_fft_size,
|
||||
secondary_fft_block_size=self.secondary_fft_block_size(),
|
||||
@ -351,18 +392,25 @@ class dsp(object):
|
||||
|
||||
if self.isWsjtMode():
|
||||
smd = self.get_secondary_demodulator()
|
||||
chopper_profile = None
|
||||
if smd == "ft8":
|
||||
chopper = Ft8Chopper(self.secondary_process_demod.stdout)
|
||||
chopper_profile = Ft8Profile()
|
||||
elif smd == "wspr":
|
||||
chopper = WsprChopper(self.secondary_process_demod.stdout)
|
||||
chopper_profile = WsprProfile()
|
||||
elif smd == "jt65":
|
||||
chopper = Jt65Chopper(self.secondary_process_demod.stdout)
|
||||
chopper_profile = Jt65Profile()
|
||||
elif smd == "jt9":
|
||||
chopper = Jt9Chopper(self.secondary_process_demod.stdout)
|
||||
chopper_profile = Jt9Profile()
|
||||
elif smd == "ft4":
|
||||
chopper = Ft4Chopper(self.secondary_process_demod.stdout)
|
||||
chopper_profile = Ft4Profile()
|
||||
if chopper_profile is not None:
|
||||
chopper = AudioChopper(self, self.secondary_process_demod.stdout, chopper_profile)
|
||||
chopper.start()
|
||||
self.output.send_output("wsjt_demod", chopper.read)
|
||||
elif self.isJs8():
|
||||
chopper = AudioChopper(self, self.secondary_process_demod.stdout, *Js8Profiles.getEnabledProfiles())
|
||||
chopper.start()
|
||||
self.output.send_output("wsjt_demod", chopper.read)
|
||||
self.output.send_output("js8_demod", chopper.read)
|
||||
elif self.isPacket():
|
||||
# we best get the ax25 packets from the kiss socket
|
||||
kiss = KissClient(self.direwolf_port)
|
||||
@ -373,30 +421,34 @@ class dsp(object):
|
||||
self.output.send_output("secondary_demod", partial(self.secondary_process_demod.stdout.read, 1))
|
||||
|
||||
# open control pipes for csdr and send initialization data
|
||||
if self.secondary_shift_pipe != None: # TODO digimodes
|
||||
self.secondary_shift_pipe_file = open(self.secondary_shift_pipe, "w") # TODO digimodes
|
||||
if self.has_pipe("secondary_shift_pipe"): # TODO digimodes
|
||||
self.set_secondary_offset_freq(self.secondary_offset_freq) # TODO digimodes
|
||||
|
||||
def set_secondary_offset_freq(self, value):
|
||||
self.secondary_offset_freq = value
|
||||
if self.secondary_processes_running and hasattr(self, "secondary_shift_pipe_file"):
|
||||
self.secondary_shift_pipe_file.write("%g\n" % (-float(self.secondary_offset_freq) / self.if_samp_rate()))
|
||||
self.secondary_shift_pipe_file.flush()
|
||||
if self.secondary_processes_running and self.has_pipe("secondary_shift_pipe"):
|
||||
self.pipes["secondary_shift_pipe"].write("%g\n" % (-float(self.secondary_offset_freq) / self.if_samp_rate()))
|
||||
|
||||
def stop_secondary_demodulator(self):
|
||||
if self.secondary_processes_running == False:
|
||||
if not self.secondary_processes_running:
|
||||
return
|
||||
self.try_delete_pipes(self.secondary_pipe_names)
|
||||
self.try_delete_configs()
|
||||
if self.secondary_process_fft:
|
||||
try:
|
||||
os.killpg(os.getpgid(self.secondary_process_fft.pid), signal.SIGTERM)
|
||||
# drain any leftover data to free file descriptors
|
||||
self.secondary_process_fft.communicate()
|
||||
self.secondary_process_fft = None
|
||||
except ProcessLookupError:
|
||||
# been killed by something else, ignore
|
||||
pass
|
||||
if self.secondary_process_demod:
|
||||
try:
|
||||
os.killpg(os.getpgid(self.secondary_process_demod.pid), signal.SIGTERM)
|
||||
# drain any leftover data to free file descriptors
|
||||
self.secondary_process_demod.communicate()
|
||||
self.secondary_process_demod = None
|
||||
except ProcessLookupError:
|
||||
# been killed by something else, ignore
|
||||
pass
|
||||
@ -447,11 +499,21 @@ class dsp(object):
|
||||
|
||||
def get_decimation(self, input_rate, output_rate):
|
||||
decimation = 1
|
||||
while input_rate / (decimation + 1) >= output_rate:
|
||||
correction = 1
|
||||
# wideband fm has a much higher frequency deviation (75kHz).
|
||||
# we cannot cover this if we immediately decimate to the sample rate the audio will have later on, so we need
|
||||
# to compensate here.
|
||||
# the factor of 5 is by experimentation only, with a minimum audio rate of 36kHz (enforced by the client)
|
||||
# this allows us to cover at least +/- 80kHz of frequency spectrum (may be higher, but that's the worst case).
|
||||
# the correction factor is automatically compensated for by the secondary decimation stage, which comes
|
||||
# after the demodulator.
|
||||
if self.get_demodulator() == "wfm":
|
||||
correction = 5
|
||||
while input_rate / (decimation + 1) >= output_rate * correction:
|
||||
decimation += 1
|
||||
fraction = float(input_rate / decimation) / output_rate
|
||||
intermediate_rate = input_rate / decimation
|
||||
return (decimation, fraction, intermediate_rate)
|
||||
return decimation, fraction, intermediate_rate
|
||||
|
||||
def if_samp_rate(self):
|
||||
return self.samp_rate / self.decimation
|
||||
@ -462,11 +524,18 @@ class dsp(object):
|
||||
def get_output_rate(self):
|
||||
return self.output_rate
|
||||
|
||||
def get_hd_output_rate(self):
|
||||
return self.hd_output_rate
|
||||
|
||||
def get_audio_rate(self):
|
||||
if self.isDigitalVoice() or self.isPacket() or self.isPocsag():
|
||||
return 48000
|
||||
elif self.isWsjtMode():
|
||||
elif self.isWsjtMode() or self.isJs8():
|
||||
return 12000
|
||||
elif self.isFreeDV():
|
||||
return 8000
|
||||
elif self.isHdAudio():
|
||||
return self.get_hd_output_rate()
|
||||
return self.get_output_rate()
|
||||
|
||||
def isDigitalVoice(self, demodulator=None):
|
||||
@ -479,6 +548,11 @@ class dsp(object):
|
||||
demodulator = self.get_secondary_demodulator()
|
||||
return demodulator in ["ft8", "wspr", "jt65", "jt9", "ft4"]
|
||||
|
||||
def isJs8(self, demodulator = None):
|
||||
if demodulator is None:
|
||||
demodulator = self.get_secondary_demodulator()
|
||||
return demodulator == "js8"
|
||||
|
||||
def isPacket(self, demodulator=None):
|
||||
if demodulator is None:
|
||||
demodulator = self.get_secondary_demodulator()
|
||||
@ -489,6 +563,16 @@ class dsp(object):
|
||||
demodulator = self.get_secondary_demodulator()
|
||||
return demodulator == "pocsag"
|
||||
|
||||
def isFreeDV(self, demodulator=None):
|
||||
if demodulator is None:
|
||||
demodulator = self.get_demodulator()
|
||||
return demodulator == "freedv"
|
||||
|
||||
def isHdAudio(self, demodulator=None):
|
||||
if demodulator is None:
|
||||
demodulator = self.get_demodulator()
|
||||
return demodulator == "wfm"
|
||||
|
||||
def set_output_rate(self, output_rate):
|
||||
if self.output_rate == output_rate:
|
||||
return
|
||||
@ -496,7 +580,16 @@ class dsp(object):
|
||||
self.calculate_decimation()
|
||||
self.restart()
|
||||
|
||||
def set_hd_output_rate(self, hd_output_rate):
|
||||
if self.hd_output_rate == hd_output_rate:
|
||||
return
|
||||
self.hd_output_rate = hd_output_rate
|
||||
self.calculate_decimation()
|
||||
self.restart()
|
||||
|
||||
def set_demodulator(self, demodulator):
|
||||
if demodulator in ["usb", "lsb", "cw"]:
|
||||
demodulator = "ssb"
|
||||
if self.demodulator == demodulator:
|
||||
return
|
||||
self.demodulator = demodulator
|
||||
@ -527,21 +620,22 @@ class dsp(object):
|
||||
def set_offset_freq(self, offset_freq):
|
||||
self.offset_freq = offset_freq
|
||||
if self.running:
|
||||
self.modification_lock.acquire()
|
||||
self.shift_pipe_file.write("%g\n" % (-float(self.offset_freq) / self.samp_rate))
|
||||
self.shift_pipe_file.flush()
|
||||
self.modification_lock.release()
|
||||
self.pipes["shift_pipe"].write("%g\n" % (-float(self.offset_freq) / self.samp_rate))
|
||||
|
||||
def set_center_freq(self, center_freq):
|
||||
# dsp only needs to know this to be able to pass it to decoders in the form of get_operating_freq()
|
||||
self.center_freq = center_freq
|
||||
|
||||
def get_operating_freq(self):
|
||||
return self.center_freq + self.offset_freq
|
||||
|
||||
def set_bpf(self, low_cut, high_cut):
|
||||
self.low_cut = low_cut
|
||||
self.high_cut = high_cut
|
||||
if self.running:
|
||||
self.modification_lock.acquire()
|
||||
self.bpf_pipe_file.write(
|
||||
self.pipes["bpf_pipe"].write(
|
||||
"%g %g\n" % (float(self.low_cut) / self.if_samp_rate(), float(self.high_cut) / self.if_samp_rate())
|
||||
)
|
||||
self.bpf_pipe_file.flush()
|
||||
self.modification_lock.release()
|
||||
|
||||
def get_bpf(self):
|
||||
return [self.low_cut, self.high_cut]
|
||||
@ -552,12 +646,9 @@ class dsp(object):
|
||||
def set_squelch_level(self, squelch_level):
|
||||
self.squelch_level = squelch_level
|
||||
# no squelch required on digital voice modes
|
||||
actual_squelch = -150 if self.isDigitalVoice() or self.isPacket() or self.isPocsag() else self.squelch_level
|
||||
actual_squelch = -150 if self.isDigitalVoice() or self.isPacket() or self.isPocsag() or self.isFreeDV() else self.squelch_level
|
||||
if self.running:
|
||||
self.modification_lock.acquire()
|
||||
self.squelch_pipe_file.write("%g\n" % (self.convertToLinear(actual_squelch)))
|
||||
self.squelch_pipe_file.flush()
|
||||
self.modification_lock.release()
|
||||
self.pipes["squelch_pipe"].write("%g\n" % (self.convertToLinear(actual_squelch)))
|
||||
|
||||
def set_unvoiced_quality(self, q):
|
||||
self.unvoiced_quality = q
|
||||
@ -567,39 +658,36 @@ class dsp(object):
|
||||
return self.unvoiced_quality
|
||||
|
||||
def set_dmr_filter(self, filter):
|
||||
if self.dmr_control_pipe_file:
|
||||
self.dmr_control_pipe_file.write("{0}\n".format(filter))
|
||||
self.dmr_control_pipe_file.flush()
|
||||
|
||||
def mkfifo(self, path):
|
||||
try:
|
||||
os.unlink(path)
|
||||
except:
|
||||
pass
|
||||
os.mkfifo(path)
|
||||
if self.has_pipe("dmr_control_pipe"):
|
||||
self.pipes["dmr_control_pipe"].write("{0}\n".format(filter))
|
||||
|
||||
def ddc_transition_bw(self):
|
||||
return self.ddc_transition_bw_rate * (self.if_samp_rate() / float(self.samp_rate))
|
||||
|
||||
def try_create_pipes(self, pipe_names, command_base):
|
||||
for pipe_name in pipe_names:
|
||||
for pipe_name, pipe_type in pipe_names.items():
|
||||
if self.has_pipe(pipe_name):
|
||||
logger.warning("{pipe_name} is still in use", pipe_name=pipe_name)
|
||||
self.pipes[pipe_name].close()
|
||||
if "{" + pipe_name + "}" in command_base:
|
||||
setattr(self, pipe_name, self.pipe_base_path + pipe_name)
|
||||
self.mkfifo(getattr(self, pipe_name))
|
||||
p = self.pipe_base_path + pipe_name
|
||||
encoding = None
|
||||
# TODO make digiham output unicode and then change this here
|
||||
# the whole pipe enoding feature onlye exists because of this
|
||||
if pipe_name == "meta_pipe":
|
||||
encoding = "cp437"
|
||||
self.pipes[pipe_name] = Pipe.create(p, pipe_type, encoding=encoding)
|
||||
else:
|
||||
setattr(self, pipe_name, None)
|
||||
self.pipes[pipe_name] = None
|
||||
|
||||
def has_pipe(self, name):
|
||||
return name in self.pipes and self.pipes[name] is not None
|
||||
|
||||
def try_delete_pipes(self, pipe_names):
|
||||
for pipe_name in pipe_names:
|
||||
pipe_path = getattr(self, pipe_name, None)
|
||||
if pipe_path:
|
||||
try:
|
||||
os.unlink(pipe_path)
|
||||
except FileNotFoundError:
|
||||
# it seems like we keep calling this twice. no idea why, but we don't need the resulting error.
|
||||
pass
|
||||
except Exception:
|
||||
logger.exception("try_delete_pipes()")
|
||||
if self.has_pipe(pipe_name):
|
||||
self.pipes[pipe_name].close()
|
||||
self.pipes[pipe_name] = None
|
||||
|
||||
def try_create_configs(self, command):
|
||||
if "{direwolf_config}" in command:
|
||||
@ -626,108 +714,95 @@ class dsp(object):
|
||||
self.direwolf_config = None
|
||||
|
||||
def start(self):
|
||||
self.modification_lock.acquire()
|
||||
if self.running:
|
||||
self.modification_lock.release()
|
||||
return
|
||||
self.running = True
|
||||
with self.modification_lock:
|
||||
if self.running:
|
||||
return
|
||||
self.running = True
|
||||
|
||||
command_base = " | ".join(self.chain(self.demodulator))
|
||||
command_base = " | ".join(self.chain(self.demodulator))
|
||||
|
||||
# create control pipes for csdr
|
||||
self.pipe_base_path = "{tmp_dir}/openwebrx_pipe_{myid}_".format(tmp_dir=self.temporary_directory, myid=id(self))
|
||||
# create control pipes for csdr
|
||||
self.try_create_pipes(self.pipe_names, command_base)
|
||||
|
||||
self.try_create_pipes(self.pipe_names, command_base)
|
||||
# send initial config through the pipes
|
||||
if self.has_pipe("bpf_pipe"):
|
||||
self.set_bpf(self.low_cut, self.high_cut)
|
||||
if self.has_pipe("shift_pipe"):
|
||||
self.set_offset_freq(self.offset_freq)
|
||||
if self.has_pipe("squelch_pipe"):
|
||||
self.set_squelch_level(self.squelch_level)
|
||||
if self.has_pipe("dmr_control_pipe"):
|
||||
self.set_dmr_filter(3)
|
||||
|
||||
# run the command
|
||||
command = command_base.format(
|
||||
bpf_pipe=self.bpf_pipe,
|
||||
shift_pipe=self.shift_pipe,
|
||||
decimation=self.decimation,
|
||||
last_decimation=self.last_decimation,
|
||||
fft_size=self.fft_size,
|
||||
fft_block_size=self.fft_block_size(),
|
||||
fft_averages=self.fft_averages,
|
||||
bpf_transition_bw=float(self.bpf_transition_bw) / self.if_samp_rate(),
|
||||
ddc_transition_bw=self.ddc_transition_bw(),
|
||||
flowcontrol=int(self.samp_rate * 2),
|
||||
start_bufsize=self.base_bufsize * self.decimation,
|
||||
nc_port=self.nc_port,
|
||||
squelch_pipe=self.squelch_pipe,
|
||||
smeter_pipe=self.smeter_pipe,
|
||||
meta_pipe=self.meta_pipe,
|
||||
iqtee_pipe=self.iqtee_pipe,
|
||||
iqtee2_pipe=self.iqtee2_pipe,
|
||||
output_rate=self.get_output_rate(),
|
||||
smeter_report_every=int(self.if_samp_rate() / 6000),
|
||||
unvoiced_quality=self.get_unvoiced_quality(),
|
||||
dmr_control_pipe=self.dmr_control_pipe,
|
||||
audio_rate=self.get_audio_rate(),
|
||||
)
|
||||
|
||||
logger.debug("Command = %s", command)
|
||||
my_env = os.environ.copy()
|
||||
if self.csdr_dynamic_bufsize:
|
||||
my_env["CSDR_DYNAMIC_BUFSIZE_ON"] = "1"
|
||||
if self.csdr_print_bufsizes:
|
||||
my_env["CSDR_PRINT_BUFSIZES"] = "1"
|
||||
|
||||
out = subprocess.PIPE if self.output.supports_type("audio") else subprocess.DEVNULL
|
||||
self.process = subprocess.Popen(command, stdout=out, shell=True, start_new_session=True, env=my_env)
|
||||
|
||||
def watch_thread():
|
||||
rc = self.process.wait()
|
||||
logger.debug("dsp thread ended with rc=%d", rc)
|
||||
if rc == 0 and self.running and not self.modification_lock.locked():
|
||||
logger.debug("restarting since rc = 0, self.running = true, and no modification")
|
||||
self.restart()
|
||||
|
||||
threading.Thread(target=watch_thread).start()
|
||||
|
||||
if self.output.supports_type("audio"):
|
||||
self.output.send_output(
|
||||
"audio",
|
||||
partial(
|
||||
self.process.stdout.read,
|
||||
self.get_fft_bytes_to_read() if self.demodulator == "fft" else self.get_audio_bytes_to_read(),
|
||||
),
|
||||
# run the command
|
||||
command = command_base.format(
|
||||
bpf_pipe=self.pipes["bpf_pipe"],
|
||||
shift_pipe=self.pipes["shift_pipe"],
|
||||
squelch_pipe=self.pipes["squelch_pipe"],
|
||||
smeter_pipe=self.pipes["smeter_pipe"],
|
||||
meta_pipe=self.pipes["meta_pipe"],
|
||||
iqtee_pipe=self.pipes["iqtee_pipe"],
|
||||
iqtee2_pipe=self.pipes["iqtee2_pipe"],
|
||||
dmr_control_pipe=self.pipes["dmr_control_pipe"],
|
||||
decimation=self.decimation,
|
||||
last_decimation=self.last_decimation,
|
||||
fft_size=self.fft_size,
|
||||
fft_block_size=self.fft_block_size(),
|
||||
fft_averages=self.fft_averages,
|
||||
bpf_transition_bw=float(self.bpf_transition_bw) / self.if_samp_rate(),
|
||||
ddc_transition_bw=self.ddc_transition_bw(),
|
||||
flowcontrol=int(self.samp_rate * 2),
|
||||
start_bufsize=self.base_bufsize * self.decimation,
|
||||
nc_port=self.nc_port,
|
||||
output_rate=self.get_output_rate(),
|
||||
smeter_report_every=int(self.if_samp_rate() / 6000),
|
||||
unvoiced_quality=self.get_unvoiced_quality(),
|
||||
audio_rate=self.get_audio_rate(),
|
||||
)
|
||||
|
||||
# open control pipes for csdr
|
||||
if self.bpf_pipe:
|
||||
self.bpf_pipe_file = open(self.bpf_pipe, "w")
|
||||
if self.shift_pipe:
|
||||
self.shift_pipe_file = open(self.shift_pipe, "w")
|
||||
if self.squelch_pipe:
|
||||
self.squelch_pipe_file = open(self.squelch_pipe, "w")
|
||||
self.start_secondary_demodulator()
|
||||
logger.debug("Command = %s", command)
|
||||
my_env = os.environ.copy()
|
||||
if self.csdr_dynamic_bufsize:
|
||||
my_env["CSDR_DYNAMIC_BUFSIZE_ON"] = "1"
|
||||
if self.csdr_print_bufsizes:
|
||||
my_env["CSDR_PRINT_BUFSIZES"] = "1"
|
||||
|
||||
self.modification_lock.release()
|
||||
out = subprocess.PIPE if self.output.supports_type("audio") else subprocess.DEVNULL
|
||||
self.process = subprocess.Popen(command, stdout=out, shell=True, start_new_session=True, env=my_env)
|
||||
|
||||
# send initial config through the pipes
|
||||
if self.squelch_pipe:
|
||||
self.set_squelch_level(self.squelch_level)
|
||||
if self.shift_pipe:
|
||||
self.set_offset_freq(self.offset_freq)
|
||||
if self.bpf_pipe:
|
||||
self.set_bpf(self.low_cut, self.high_cut)
|
||||
if self.smeter_pipe:
|
||||
self.smeter_pipe_file = open(self.smeter_pipe, "r")
|
||||
def watch_thread():
|
||||
rc = self.process.wait()
|
||||
logger.debug("dsp thread ended with rc=%d", rc)
|
||||
if rc == 0 and self.running and not self.modification_lock.locked():
|
||||
logger.debug("restarting since rc = 0, self.running = true, and no modification")
|
||||
self.restart()
|
||||
|
||||
threading.Thread(target=watch_thread, name="csdr_watch_thread").start()
|
||||
|
||||
audio_type = "hd_audio" if self.isHdAudio() else "audio"
|
||||
if self.output.supports_type(audio_type):
|
||||
self.output.send_output(
|
||||
audio_type,
|
||||
partial(
|
||||
self.process.stdout.read,
|
||||
self.get_fft_bytes_to_read() if self.demodulator == "fft" else self.get_audio_bytes_to_read(),
|
||||
),
|
||||
)
|
||||
|
||||
self.start_secondary_demodulator()
|
||||
|
||||
if self.has_pipe("smeter_pipe"):
|
||||
def read_smeter():
|
||||
raw = self.smeter_pipe_file.readline()
|
||||
raw = self.pipes["smeter_pipe"].readline()
|
||||
if len(raw) == 0:
|
||||
return None
|
||||
else:
|
||||
return float(raw.rstrip("\n"))
|
||||
|
||||
self.output.send_output("smeter", read_smeter)
|
||||
if self.meta_pipe != None:
|
||||
# TODO make digiham output unicode and then change this here
|
||||
self.meta_pipe_file = open(self.meta_pipe, "r", encoding="cp437")
|
||||
|
||||
if self.has_pipe("meta_pipe"):
|
||||
def read_meta():
|
||||
raw = self.meta_pipe_file.readline()
|
||||
raw = self.pipes["meta_pipe"].readline()
|
||||
if len(raw) == 0:
|
||||
return None
|
||||
else:
|
||||
@ -735,23 +810,25 @@ class dsp(object):
|
||||
|
||||
self.output.send_output("meta", read_meta)
|
||||
|
||||
if self.dmr_control_pipe:
|
||||
self.dmr_control_pipe_file = open(self.dmr_control_pipe, "w")
|
||||
if self.csdr_dynamic_bufsize:
|
||||
self.process.stdout.read(8) # dummy read to skip bufsize & preamble
|
||||
logger.debug("Note: CSDR_DYNAMIC_BUFSIZE_ON = 1")
|
||||
|
||||
def stop(self):
|
||||
self.modification_lock.acquire()
|
||||
self.running = False
|
||||
if hasattr(self, "process"):
|
||||
try:
|
||||
os.killpg(os.getpgid(self.process.pid), signal.SIGTERM)
|
||||
except ProcessLookupError:
|
||||
# been killed by something else, ignore
|
||||
pass
|
||||
self.stop_secondary_demodulator()
|
||||
with self.modification_lock:
|
||||
self.running = False
|
||||
if self.process is not None:
|
||||
try:
|
||||
os.killpg(os.getpgid(self.process.pid), signal.SIGTERM)
|
||||
# drain any leftover data to free file descriptors
|
||||
self.process.communicate()
|
||||
self.process = None
|
||||
except ProcessLookupError:
|
||||
# been killed by something else, ignore
|
||||
pass
|
||||
self.stop_secondary_demodulator()
|
||||
|
||||
self.try_delete_pipes(self.pipe_names)
|
||||
|
||||
self.modification_lock.release()
|
||||
self.try_delete_pipes(self.pipe_names)
|
||||
|
||||
def restart(self):
|
||||
if not self.running:
|
||||
@ -761,4 +838,3 @@ class dsp(object):
|
||||
|
||||
def __del__(self):
|
||||
self.stop()
|
||||
del self.process
|
||||
|
155
csdr/pipe.py
Normal file
155
csdr/pipe.py
Normal file
@ -0,0 +1,155 @@
|
||||
import os
|
||||
import select
|
||||
import time
|
||||
import threading
|
||||
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Pipe(object):
|
||||
READ = "r"
|
||||
WRITE = "w"
|
||||
NONE = None
|
||||
|
||||
@staticmethod
|
||||
def create(path, t, encoding=None):
|
||||
if t == Pipe.READ:
|
||||
return ReadingPipe(path, encoding=encoding)
|
||||
elif t == Pipe.WRITE:
|
||||
return WritingPipe(path, encoding=encoding)
|
||||
elif t == Pipe.NONE:
|
||||
return Pipe(path, None, encoding=encoding)
|
||||
|
||||
def __init__(self, path, direction, encoding=None):
|
||||
self.doOpen = True
|
||||
self.path = "{base}_{myid}".format(base=path, myid=id(self))
|
||||
self.direction = direction
|
||||
self.encoding = encoding
|
||||
self.file = None
|
||||
os.mkfifo(self.path)
|
||||
|
||||
def open(self):
|
||||
"""
|
||||
this method opens the file descriptor with an added O_NONBLOCK flag. This gives us a special behaviour for
|
||||
FIFOS, when they are not opened by the opposing side:
|
||||
|
||||
- opening a pipe for writing will throw an OSError with errno = 6 (ENXIO). This is handled specially in the
|
||||
WritingPipe class.
|
||||
- opening a pipe for reading will pass through this method instantly, even if the opposing end has not been
|
||||
opened yet, but the resulting file descriptor will behave as if O_NONBLOCK is set (even if we remove it
|
||||
immediately here), resulting in empty reads until data is available. This is handled specially in the
|
||||
ReadingPipe class.
|
||||
"""
|
||||
def opener(path, flags):
|
||||
fd = os.open(path, flags | os.O_NONBLOCK)
|
||||
os.set_blocking(fd, True)
|
||||
return fd
|
||||
|
||||
self.file = open(self.path, self.direction, encoding=self.encoding, opener=opener)
|
||||
|
||||
def close(self):
|
||||
self.doOpen = False
|
||||
try:
|
||||
if self.file is not None:
|
||||
self.file.close()
|
||||
os.unlink(self.path)
|
||||
except FileNotFoundError:
|
||||
# it seems like we keep calling this twice. no idea why, but we don't need the resulting error.
|
||||
pass
|
||||
except Exception:
|
||||
logger.exception("Pipe.close()")
|
||||
|
||||
def __str__(self):
|
||||
return self.path
|
||||
|
||||
|
||||
class WritingPipe(Pipe):
|
||||
def __init__(self, path, encoding=None):
|
||||
self.queue = []
|
||||
self.queueLock = threading.Lock()
|
||||
super().__init__(path, "w", encoding=encoding)
|
||||
self.open()
|
||||
|
||||
def open_and_dequeue(self):
|
||||
"""
|
||||
This method implements a retry loop that can be interrupted in case the Pipe gets shutdown before actually
|
||||
being connected.
|
||||
|
||||
After the pipe is opened successfully, all data that has been queued is sent in the order it was passed into
|
||||
write().
|
||||
"""
|
||||
retries = 0
|
||||
|
||||
while self.file is None and self.doOpen and retries < 10:
|
||||
try:
|
||||
super().open()
|
||||
except OSError as error:
|
||||
# ENXIO = FIFO has not been opened for reading
|
||||
if error.errno == 6:
|
||||
time.sleep(.1)
|
||||
retries += 1
|
||||
else:
|
||||
raise
|
||||
|
||||
# if doOpen is false, opening has been canceled, so no warning in that case.
|
||||
if self.file is None:
|
||||
if self.doOpen:
|
||||
logger.warning("could not open FIFO %s", self.path)
|
||||
return
|
||||
|
||||
with self.queueLock:
|
||||
for i in self.queue:
|
||||
self.file.write(i)
|
||||
self.file.flush()
|
||||
self.queue = None
|
||||
|
||||
def open(self):
|
||||
"""
|
||||
This sends the opening operation off to a background thread. If we were to block the thread here, another pipe
|
||||
may be waiting in the queue to be opened on the opposing side, resulting in a deadlock
|
||||
"""
|
||||
threading.Thread(target=self.open_and_dequeue, name="csdr_pipe_thread").start()
|
||||
|
||||
def write(self, data):
|
||||
"""
|
||||
This method queues all data to be written until the file is actually opened. As soon as a file is available,
|
||||
it becomes a passthrough.
|
||||
"""
|
||||
if self.file is None:
|
||||
with self.queueLock:
|
||||
self.queue.append(data)
|
||||
return
|
||||
r = self.file.write(data)
|
||||
self.file.flush()
|
||||
return r
|
||||
|
||||
|
||||
class ReadingPipe(Pipe):
|
||||
def __init__(self, path, encoding=None):
|
||||
super().__init__(path, "r", encoding=encoding)
|
||||
|
||||
def open(self):
|
||||
"""
|
||||
This method implements an interruptible loop that waits for the file descriptor to be opened and the first
|
||||
batch of data coming in using repeated select() calls.
|
||||
:return:
|
||||
"""
|
||||
if not self.doOpen:
|
||||
return
|
||||
super().open()
|
||||
while self.doOpen:
|
||||
(read, _, _) = select.select([self.file], [], [], 1)
|
||||
if self.file in read:
|
||||
break
|
||||
|
||||
def read(self):
|
||||
if self.file is None:
|
||||
self.open()
|
||||
return self.file.read()
|
||||
|
||||
def readline(self):
|
||||
if self.file is None:
|
||||
self.open()
|
||||
return self.file.readline()
|
Reference in New Issue
Block a user