Merge branch 'develop' into pycsdr

This commit is contained in:
Jakob Ketterl
2021-07-15 18:09:39 +02:00
273 changed files with 16169 additions and 3734 deletions

View File

@ -28,10 +28,10 @@ import threading
import math
from functools import partial
from owrx.kiss import KissClient, DirewolfConfig
from owrx.wsjt import Ft8Profile, WsprProfile, Jt9Profile, Jt65Profile, Ft4Profile, Fst4Profile, Fst4wProfile
from owrx.js8 import Js8Profiles
from owrx.audio import AudioChopper
from csdr.output import Output
from owrx.kiss import KissClient, DirewolfConfig, DirewolfConfigSubscriber
from owrx.audio.chopper import AudioChopper
from csdr.pipe import Pipe
@ -40,40 +40,8 @@ import logging
logger = logging.getLogger(__name__)
class output(object):
def send_output(self, t, read_fn):
if not self.supports_type(t):
# TODO rewrite the output mechanism in a way that avoids producing unnecessary data
logger.warning("dumping output of type %s since it is not supported.", t)
threading.Thread(target=self.pump(read_fn, lambda x: None), name="csdr_pump_thread").start()
return
self.receive_output(t, read_fn)
def receive_output(self, t, read_fn):
pass
def pump(self, read, write):
def copy():
run = True
while run:
data = None
try:
data = read()
except ValueError:
pass
if data is None or (isinstance(data, bytes) and len(data) == 0):
run = False
else:
write(data)
return copy
def supports_type(self, t):
return True
class dsp(object):
def __init__(self, output):
class Dsp(DirewolfConfigSubscriber):
def __init__(self, output: Output):
self.pycsdr_enabled = True
self.pycsdr_chain = None
self.buffer = None
@ -98,9 +66,6 @@ class dsp(object):
self.decimation = None
self.last_decimation = None
self.nc_port = None
self.csdr_dynamic_bufsize = False
self.csdr_print_bufsizes = False
self.csdr_through = False
self.squelch_level = -150
self.fft_averages = 50
self.wfm_deemphasis_tau = 50e-6
@ -124,6 +89,7 @@ class dsp(object):
self.secondary_pipe_names = {"secondary_shift_pipe": Pipe.WRITE}
self.secondary_offset_freq = 1000
self.unvoiced_quality = 1
self.codecserver = None
self.modification_lock = threading.Lock()
self.output = output
@ -133,7 +99,7 @@ class dsp(object):
self.is_service = False
self.direwolf_config = None
self.direwolf_port = None
self.direwolf_config_path = None
self.process = None
def setBuffer(self, buffer):
@ -150,10 +116,6 @@ class dsp(object):
def chain(self, which):
chain = ["nc -v 127.0.0.1 {nc_port}"]
if self.csdr_dynamic_bufsize:
chain += ["csdr setbuf {start_bufsize}"]
if self.csdr_through:
chain += ["csdr through"]
chain += ["csdr shift_addfast_cc --fifo {shift_pipe}"]
if self.decimation > 1:
chain += ["csdr fir_decimate_cc {decimation} {ddc_transition_bw} HAMMING"]
@ -199,43 +161,43 @@ class dsp(object):
elif self.isDigitalVoice(which):
chain += ["csdr fmdemod_quadri_cf"]
chain += last_decimation_block
# dsd modes
if which in ["dstar", "nxdn"]:
chain += ["dc_block", "csdr limit_ff", "csdr convert_f_s16"]
if which == "dstar":
chain += ["dsd -fd -i - -o - -u {unvoiced_quality} -g -1 "]
elif which == "nxdn":
chain += ["dsd -fi -i - -o - -u {unvoiced_quality} -g -1 "]
chain += [
"digitalvoice_filter",
"CSDR_FIXED_BUFSIZE=32 csdr agc_s16 --max 30 --initial 3",
"sox -t raw -r 8000 -e signed-integer -b 16 -c 1 --buffer 32 - -t raw -r {output_rate} -e signed-integer -b 16 -c 1 - ",
]
chain += ["dc_block"]
# m17
elif which == "m17":
if which == "m17":
chain += [
"csdr limit_ff",
"csdr convert_f_s16",
"m17-demod",
"CSDR_FIXED_BUFSIZE=32 csdr agc_s16 --max 30 --initial 3",
"sox -t raw -r 8000 -e signed-integer -b 16 -c 1 --buffer 32 - -t raw -r {output_rate} -e signed-integer -b 16 -c 1 - ",
]
# digiham modes
else:
chain += ["dc_block", "rrc_filter", "gfsk_demodulator"]
if which == "dmr":
# digiham modes
if which == "dstar":
chain += [
"dmr_decoder --fifo {meta_pipe} --control-fifo {dmr_control_pipe}",
"mbe_synthesizer -f -u {unvoiced_quality}",
"fsk_demodulator -s 10",
"dstar_decoder --fifo {meta_pipe}",
"mbe_synthesizer -d {codecserver_arg}",
]
elif which == "ysf":
chain += ["ysf_decoder --fifo {meta_pipe}", "mbe_synthesizer -y -f -u {unvoiced_quality}"]
max_gain = 0.005
chain += [
"digitalvoice_filter -f",
"CSDR_FIXED_BUFSIZE=32 csdr agc_ff --max 0.005 --initial 0.0005",
"sox -t raw -r 8000 -e floating-point -b 32 -c 1 --buffer 32 - -t raw -r {output_rate} -e signed-integer -b 16 -c 1 - ",
]
elif which == "nxdn":
chain += [
"rrc_filter --narrow",
"gfsk_demodulator --samples 20",
"nxdn_decoder --fifo {meta_pipe}",
"mbe_synthesizer {codecserver_arg}",
]
else:
chain += ["rrc_filter", "gfsk_demodulator"]
if which == "dmr":
chain += [
"dmr_decoder --fifo {meta_pipe} --control-fifo {dmr_control_pipe}",
"mbe_synthesizer {codecserver_arg}",
]
elif which == "ysf":
chain += ["ysf_decoder --fifo {meta_pipe}", "mbe_synthesizer -y {codecserver_arg}"]
chain += ["digitalvoice_filter"]
chain += [
"CSDR_FIXED_BUFSIZE=32 csdr agc_s16 --max 30 --initial 3",
"sox -t raw -r 8000 -e signed-integer -b 16 -c 1 --buffer 32 - -t raw -r {output_rate} -e signed-integer -b 16 -c 1 - ",
]
elif which == "am":
chain += ["csdr amdemod_cf", "csdr fastdcblock_ff"]
chain += last_decimation_block
@ -377,14 +339,10 @@ class dsp(object):
if_samp_rate=self.if_samp_rate(),
last_decimation=self.last_decimation,
audio_rate=self.get_audio_rate(),
direwolf_config=self.direwolf_config,
direwolf_config=self.direwolf_config_path,
)
logger.debug("secondary command (demod) = %s", secondary_command_demod)
my_env = os.environ.copy()
# if self.csdr_dynamic_bufsize: my_env["CSDR_DYNAMIC_BUFSIZE_ON"]="1";
if self.csdr_print_bufsizes:
my_env["CSDR_PRINT_BUFSIZES"] = "1"
if self.output.supports_type("secondary_fft"):
secondary_command_fft = " | ".join(self.secondary_chain("fft"))
secondary_command_fft = secondary_command_fft.format(
@ -397,7 +355,7 @@ class dsp(object):
logger.debug("secondary command (fft) = %s", secondary_command_fft)
self.secondary_process_fft = subprocess.Popen(
secondary_command_fft, stdout=subprocess.PIPE, shell=True, start_new_session=True, env=my_env
secondary_command_fft, stdout=subprocess.PIPE, shell=True, start_new_session=True
)
self.output.send_output(
"secondary_fft",
@ -409,38 +367,18 @@ class dsp(object):
# it would block if not read. by piping it to devnull, we avoid a potential pitfall here.
secondary_output = subprocess.DEVNULL if self.isPacket() else subprocess.PIPE
self.secondary_process_demod = subprocess.Popen(
secondary_command_demod, stdout=secondary_output, shell=True, start_new_session=True, env=my_env
secondary_command_demod, stdout=secondary_output, shell=True, start_new_session=True
)
self.secondary_processes_running = True
if self.isWsjtMode():
smd = self.get_secondary_demodulator()
chopper_profiles = None
if smd == "ft8":
chopper_profiles = [Ft8Profile()]
elif smd == "wspr":
chopper_profiles = [WsprProfile()]
elif smd == "jt65":
chopper_profiles = [Jt65Profile()]
elif smd == "jt9":
chopper_profiles = [Jt9Profile()]
elif smd == "ft4":
chopper_profiles = [Ft4Profile()]
elif smd == "fst4":
chopper_profiles = Fst4Profile.getEnabledProfiles()
elif smd == "fst4w":
chopper_profiles = Fst4wProfile.getEnabledProfiles()
if chopper_profiles is not None and len(chopper_profiles):
chopper = AudioChopper(self, self.secondary_process_demod.stdout, *chopper_profiles)
chopper.start()
self.output.send_output("wsjt_demod", chopper.read)
elif self.isJs8():
chopper = AudioChopper(self, self.secondary_process_demod.stdout, *Js8Profiles.getEnabledProfiles())
chopper.start()
self.output.send_output("js8_demod", chopper.read)
if self.isWsjtMode() or self.isJs8():
chopper = AudioChopper(self, self.get_secondary_demodulator())
chopper.send_output("audio", self.secondary_process_demod.stdout.read)
output_type = "js8_demod" if self.isJs8() else "wsjt_demod"
self.output.send_output(output_type, chopper.read)
elif self.isPacket():
# we best get the ax25 packets from the kiss socket
kiss = KissClient(self.direwolf_port)
kiss = KissClient(self.direwolf_config.getPort())
self.output.send_output("packet_demod", kiss.read)
elif self.isPocsag():
self.output.send_output("pocsag_demod", self.secondary_process_demod.stdout.readline)
@ -487,11 +425,16 @@ class dsp(object):
return self.secondary_demodulator
def set_secondary_fft_size(self, secondary_fft_size):
# to change this, restart is required
if self.secondary_fft_size == secondary_fft_size:
return
self.secondary_fft_size = secondary_fft_size
self.restart()
def set_audio_compression(self, what):
if self.audio_compression == what:
return
self.audio_compression = what
self.restart()
def get_audio_bytes_to_read(self):
# desired latency: 5ms
@ -506,6 +449,7 @@ class dsp(object):
if self.fft_compression == what:
return
self.fft_compression = what
self.restart()
def get_secondary_fft_bytes_to_read(self):
if self.fft_compression == "none":
@ -528,6 +472,8 @@ class dsp(object):
(self.decimation, self.last_decimation) = self.get_decimation(self.samp_rate, self.get_audio_rate())
def get_decimation(self, input_rate, output_rate):
if output_rate <= 0:
raise ValueError("invalid output rate: {rate}".format(rate=output_rate))
decimation = 1
target_rate = output_rate
# wideband fm has a much higher frequency deviation (75kHz).
@ -571,7 +517,7 @@ class dsp(object):
def isWsjtMode(self, demodulator=None):
if demodulator is None:
demodulator = self.get_secondary_demodulator()
return demodulator in ["ft8", "wspr", "jt65", "jt9", "ft4", "fst4", "fst4w"]
return demodulator in ["ft8", "wspr", "jt65", "jt9", "ft4", "fst4", "fst4w", "q65"]
def isJs8(self, demodulator=None):
if demodulator is None:
@ -665,7 +611,7 @@ class dsp(object):
# no squelch required on digital voice modes
actual_squelch = (
-150
if self.isDigitalVoice() or self.isPacket() or self.isPocsag() or self.isFreeDV()
if self.isDigitalVoice() or self.isPacket() or self.isPocsag() or self.isFreeDV() or self.isDrm()
else self.squelch_level
)
if self.running:
@ -678,6 +624,15 @@ class dsp(object):
def get_unvoiced_quality(self):
return self.unvoiced_quality
def set_codecserver(self, s):
if self.codecserver == s:
return
self.codecserver = s
self.restart()
def get_codecserver_arg(self):
return "-s {}".format(self.codecserver) if self.codecserver else ""
def set_dmr_filter(self, filter):
if self.has_pipe("dmr_control_pipe"):
self.pipes["dmr_control_pipe"].write("{0}\n".format(filter))
@ -718,27 +673,34 @@ class dsp(object):
def try_create_configs(self, command):
if "{direwolf_config}" in command:
self.direwolf_config = "{tmp_dir}/openwebrx_direwolf_{myid}.conf".format(
self.direwolf_config_path = "{tmp_dir}/openwebrx_direwolf_{myid}.conf".format(
tmp_dir=self.temporary_directory, myid=id(self)
)
self.direwolf_port = KissClient.getFreePort()
file = open(self.direwolf_config, "w")
file.write(DirewolfConfig().getConfig(self.direwolf_port, self.is_service))
self.direwolf_config = DirewolfConfig()
self.direwolf_config.wire(self)
file = open(self.direwolf_config_path, "w")
file.write(self.direwolf_config.getConfig(self.is_service))
file.close()
else:
self.direwolf_config = None
self.direwolf_port = None
self.direwolf_config_path = None
def try_delete_configs(self):
if self.direwolf_config:
if self.direwolf_config is not None:
self.direwolf_config.unwire(self)
self.direwolf_config = None
if self.direwolf_config_path is not None:
try:
os.unlink(self.direwolf_config)
os.unlink(self.direwolf_config_path)
except FileNotFoundError:
# result suits our expectations. fine :)
pass
except Exception:
logger.exception("try_delete_configs()")
self.direwolf_config = None
self.direwolf_config_path = None
def onConfigChanged(self):
self.restart()
def start(self):
with self.modification_lock:
@ -781,19 +743,15 @@ class dsp(object):
output_rate=self.get_output_rate(),
smeter_report_every=int(self.if_samp_rate() / 6000),
unvoiced_quality=self.get_unvoiced_quality(),
codecserver_arg=self.get_codecserver_arg(),
audio_rate=self.get_audio_rate(),
wfm_deemphasis_tau=self.wfm_deemphasis_tau,
)
logger.debug("Command = %s", command)
my_env = os.environ.copy()
if self.csdr_dynamic_bufsize:
my_env["CSDR_DYNAMIC_BUFSIZE_ON"] = "1"
if self.csdr_print_bufsizes:
my_env["CSDR_PRINT_BUFSIZES"] = "1"
out = subprocess.PIPE if self.output.supports_type("audio") else subprocess.DEVNULL
self.process = subprocess.Popen(command, stdout=out, shell=True, start_new_session=True, env=my_env)
self.process = subprocess.Popen(command, stdout=out, shell=True, start_new_session=True)
def watch_thread():
rc = self.process.wait()
@ -837,10 +795,6 @@ class dsp(object):
self.output.send_output("meta", read_meta)
if self.csdr_dynamic_bufsize:
self.process.stdout.read(8) # dummy read to skip bufsize & preamble
logger.debug("Note: CSDR_DYNAMIC_BUFSIZE_ON = 1")
def stop(self):
with self.modification_lock:
self.running = False
@ -859,6 +813,7 @@ class dsp(object):
self.stop_secondary_demodulator()
self.try_delete_pipes(self.pipe_names)
self.try_delete_configs()
def restart(self):
if not self.running:

36
csdr/output.py Normal file
View File

@ -0,0 +1,36 @@
import threading
import logging
logger = logging.getLogger(__name__)
class Output(object):
def send_output(self, t, read_fn):
if not self.supports_type(t):
# TODO rewrite the output mechanism in a way that avoids producing unnecessary data
logger.warning("dumping output of type %s since it is not supported.", t)
threading.Thread(target=self.pump(read_fn, lambda x: None), name="csdr_pump_thread").start()
return
self.receive_output(t, read_fn)
def receive_output(self, t, read_fn):
pass
def pump(self, read, write):
def copy():
run = True
while run:
data = None
try:
data = read()
except ValueError:
pass
if data is None or (isinstance(data, bytes) and len(data) == 0):
run = False
else:
write(data)
return copy
def supports_type(self, t):
return True