remove old csdr code

This commit is contained in:
Jakob Ketterl 2021-09-27 18:53:49 +02:00
parent e77b0f4a67
commit a07480fd9a
2 changed files with 0 additions and 1063 deletions

View File

@ -1,907 +0,0 @@
"""
OpenWebRX csdr plugin: do the signal processing with csdr
This file is part of OpenWebRX,
an open-source SDR receiver software with a web UI.
Copyright (c) 2013-2015 by Andras Retzler <randras@sdr.hu>
Copyright (c) 2019-2021 by Jakob Ketterl <dd5jfk@darc.de>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import subprocess
import os
import signal
import threading
import math
from functools import partial
from owrx.aprs.direwolf import DirewolfConfig, DirewolfConfigSubscriber
from owrx.audio.chopper import AudioChopper
from csdr.pipe import Pipe
from pycsdr.modules import Buffer
from pycsdr.types import Format
from csdr.chain.selector import Selector
from csdr.chain.clientaudio import ClientAudioChain
import logging
logger = logging.getLogger(__name__)
class Dsp(DirewolfConfigSubscriber):
def __init__(self, output):
self.pycsdr_enabled = True
self.pycsdr_chain = None
self.pycsdr_client_chain = None
self.pycsdr_reader = None
self.pycsdr_power_reader = None
self.pycsdr_meta_reader = None
self.buffer = None
self.samp_rate = 250000
self.output_rate = 11025
self.hd_output_rate = 44100
self.fft_fps = 5
self.center_freq = 0
self.offset_freq = 0
self.low_cut = -4000
self.high_cut = 4000
self.bpf_transition_bw = 320 # Hz, and this is a constant
self.ddc_transition_bw_rate = 0.15 # of the IF sample rate
self.running = False
self.secondary_processes_running = False
self.audio_compression = "none"
self.fft_compression = "none"
self.demodulator = "nfm"
self.name = "csdr"
self.base_bufsize = 512
self.decimation = None
self.last_decimation = None
self.nc_port = None
self.squelch_level = -150
self.fft_averages = 50
self.wfm_deemphasis_tau = 50e-6
self.iqtee = False
self.iqtee2 = False
self.secondary_demodulator = None
self.secondary_fft_size = 1024
self.secondary_process_fft = None
self.secondary_process_demod = None
self.pipe_names = {
"bpf_pipe": Pipe.WRITE,
"shift_pipe": Pipe.WRITE,
"squelch_pipe": Pipe.WRITE,
"smeter_pipe": Pipe.READ,
"meta_pipe": Pipe.READ,
"iqtee_pipe": Pipe.NONE,
"iqtee2_pipe": Pipe.NONE,
"dmr_control_pipe": Pipe.WRITE,
}
self.pipes = {}
self.secondary_pipe_names = {"secondary_shift_pipe": Pipe.WRITE}
self.secondary_offset_freq = 1000
self.codecserver = None
self.modification_lock = threading.Lock()
self.output = output
self.temporary_directory = None
self.pipe_base_path = None
self.set_temporary_directory("/tmp")
self.is_service = False
self.direwolf_config = None
self.direwolf_config_path = None
self.process = None
def setBuffer(self, buffer):
self.buffer = buffer
if self.pycsdr_chain is not None:
self.pycsdr_chain.setReader(buffer.getReader())
def set_service(self, flag=True):
self.is_service = flag
def set_temporary_directory(self, what):
self.temporary_directory = what
self.pipe_base_path = "{tmp_dir}/openwebrx_pipe_".format(tmp_dir=self.temporary_directory)
def chain(self, which):
if self.pycsdr_enabled:
if which == "nfm":
self.pycsdr_chain = DemodulatorChain(self.samp_rate, self.get_audio_rate(), 0.0, NFm(self.get_audio_rate()))
return self.pycsdr_chain
elif which == "am":
self.pycsdr_chain = DemodulatorChain(self.samp_rate, self.get_audio_rate(), 0.0, Am())
return self.pycsdr_chain
elif which == "ssb":
self.pycsdr_chain = DemodulatorChain(self.samp_rate, self.get_audio_rate(), 0.0, Ssb())
return self.pycsdr_chain
elif which == "wfm":
self.pycsdr_chain = DemodulatorChain(self.samp_rate, 200000, 0.0, WFm(self.get_audio_rate(), self.wfm_deemphasis_tau))
return self.pycsdr_chain
elif which == "dstar":
self.pycsdr_chain = DemodulatorChain(self.samp_rate, 48000, 0.0, Dstar(self.codecserver))
return self.pycsdr_chain
elif which == "nxdn":
self.pycsdr_chain = DemodulatorChain(self.samp_rate, 48000, 0.0, Nxdn(self.codecserver))
return self.pycsdr_chain
elif which == "dmr":
self.pycsdr_chain = DemodulatorChain(self.samp_rate, 48000, 0.0, Dmr(self.codecserver))
return self.pycsdr_chain
elif which == "ysf":
self.pycsdr_chain = DemodulatorChain(self.samp_rate, 48000, 0.0, Ysf(self.codecserver))
return self.pycsdr_chain
chain = ["nc -v 127.0.0.1 {nc_port}"]
chain += ["csdr shift_addfast_cc --fifo {shift_pipe}"]
if self.decimation > 1:
chain += ["csdr fir_decimate_cc {decimation} {ddc_transition_bw} HAMMING"]
chain += ["csdr bandpass_fir_fft_cc --fifo {bpf_pipe} {bpf_transition_bw} HAMMING"]
if self.output.supports_type("smeter"):
chain += [
"csdr squelch_and_smeter_cc --fifo {squelch_pipe} --outfifo {smeter_pipe} 5 {smeter_report_every}"
]
if self.secondary_demodulator:
if self.output.supports_type("secondary_fft"):
chain += ["csdr tee {iqtee_pipe}"]
chain += ["csdr tee {iqtee2_pipe}"]
# early exit if we don't want audio
if not self.output.supports_type("audio"):
return chain
# safe some cpu cycles... no need to decimate if decimation factor is 1
last_decimation_block = []
if self.last_decimation >= 2.0:
# activate prefilter if signal has been oversampled, e.g. WFM
last_decimation_block = ["csdr fractional_decimator_ff {last_decimation} 12 --prefilter"]
elif self.last_decimation != 1.0:
last_decimation_block = ["csdr fractional_decimator_ff {last_decimation}"]
if which == "nfm":
chain += ["csdr fmdemod_quadri_cf", "csdr limit_ff"]
chain += last_decimation_block
chain += [
"csdr deemphasis_nfm_ff {audio_rate}",
"csdr agc_ff --profile slow --max 3",
]
if self.get_audio_rate() != self.get_output_rate():
chain += [
"sox -t raw -r {audio_rate} -e floating-point -b 32 -c 1 --buffer 32 - -t raw -r {output_rate} -e signed-integer -b 16 -c 1 - "
]
else:
chain += ["csdr convert_f_s16"]
elif which == "wfm":
chain += [
"csdr fmdemod_quadri_cf",
"csdr limit_ff",
]
chain += last_decimation_block
chain += ["csdr deemphasis_wfm_ff {audio_rate} {wfm_deemphasis_tau}", "csdr convert_f_s16"]
elif self.isDigitalVoice(which):
chain += ["csdr fmdemod_quadri_cf"]
chain += last_decimation_block
chain += ["dc_block"]
# m17
if which == "m17":
chain += [
"csdr limit_ff",
"csdr convert_f_s16",
"m17-demod",
]
else:
# digiham modes
if which == "dstar":
chain += [
"fsk_demodulator -s 10",
"dstar_decoder --fifo {meta_pipe}",
"mbe_synthesizer -d {codecserver_arg}",
]
elif which == "nxdn":
chain += [
"rrc_filter --narrow",
"gfsk_demodulator --samples 20",
"nxdn_decoder --fifo {meta_pipe}",
"mbe_synthesizer {codecserver_arg}",
]
else:
chain += ["rrc_filter", "gfsk_demodulator"]
if which == "dmr":
chain += [
"dmr_decoder --fifo {meta_pipe} --control-fifo {dmr_control_pipe}",
"mbe_synthesizer {codecserver_arg}",
]
elif which == "ysf":
chain += ["ysf_decoder --fifo {meta_pipe}", "mbe_synthesizer -y {codecserver_arg}"]
chain += ["digitalvoice_filter"]
chain += [
"CSDR_FIXED_BUFSIZE=32 csdr agc_s16 --max 30 --initial 3",
"sox -t raw -r 8000 -e signed-integer -b 16 -c 1 --buffer 32 - -t raw -r {output_rate} -e signed-integer -b 16 -c 1 - ",
]
elif which == "am":
chain += ["csdr amdemod_cf", "csdr fastdcblock_ff"]
chain += last_decimation_block
chain += [
"csdr agc_ff --profile slow --initial 200",
"csdr convert_f_s16",
]
elif self.isFreeDV(which):
chain += ["csdr realpart_cf"]
chain += last_decimation_block
chain += [
"csdr agc_ff",
"csdr convert_f_s16",
"freedv_rx 1600 - -",
"csdr agc_s16 --max 30 --initial 3",
"sox -t raw -r 8000 -e signed-integer -b 16 -c 1 --buffer 32 - -t raw -r {output_rate} -e signed-integer -b 16 -c 1 - ",
]
elif self.isDrm(which):
if self.last_decimation != 1.0:
# we are still dealing with complex samples here, so the regular last_decimation_block doesn't fit
chain += ["csdr fractional_decimator_cc {last_decimation}"]
chain += [
"csdr convert_f_s16",
"dream -c 6 --sigsrate 48000 --audsrate 48000 -I - -O -",
"sox -t raw -r 48000 -e signed-integer -b 16 -c 2 - -t raw -r {output_rate} -e signed-integer -b 16 -c 1 - ",
]
elif which == "ssb":
chain += ["csdr realpart_cf"]
chain += last_decimation_block
chain += ["csdr agc_ff"]
# fixed sample rate necessary for the wsjt-x tools. fix with sox...
if self.get_audio_rate() != self.get_output_rate():
chain += [
"sox -t raw -r {audio_rate} -e floating-point -b 32 -c 1 --buffer 32 - -t raw -r {output_rate} -e signed-integer -b 16 -c 1 - "
]
else:
chain += ["csdr convert_f_s16"]
if self.audio_compression == "adpcm":
chain += ["csdr++ adpcm -e --sync"]
return chain
def secondary_chain(self, which):
chain = ["cat {input_pipe}"]
if which == "fft":
chain += [
"csdr fft_cc {secondary_fft_input_size} {secondary_fft_block_size}",
"csdr logpower_cf -70"
if self.fft_averages == 0
else "csdr logaveragepower_cf -70 {secondary_fft_size} {fft_averages}",
"csdr fft_exchange_sides_ff {secondary_fft_input_size}",
]
if self.fft_compression == "adpcm":
chain += ["csdr compress_fft_adpcm_f_u8 {secondary_fft_size}"]
return chain
elif which == "bpsk31" or which == "bpsk63":
return chain + [
"csdr shift_addfast_cc --fifo {secondary_shift_pipe}",
"csdr bandpass_fir_fft_cc -{secondary_bpf_cutoff} {secondary_bpf_cutoff} {secondary_bpf_cutoff}",
"csdr simple_agc_cc 0.001 0.5",
"csdr timing_recovery_cc GARDNER {secondary_samples_per_bits} 0.5 2 --add_q",
"CSDR_FIXED_BUFSIZE=1 csdr dbpsk_decoder_c_u8",
"CSDR_FIXED_BUFSIZE=1 csdr psk31_varicode_decoder_u8_u8",
]
elif self.isWsjtMode(which) or self.isJs8(which):
chain += ["csdr realpart_cf"]
if self.last_decimation != 1.0:
chain += ["csdr fractional_decimator_ff {last_decimation}"]
return chain + ["csdr agc_ff", "csdr convert_f_s16"]
elif which == "packet":
chain += ["csdr fmdemod_quadri_cf"]
if self.last_decimation != 1.0:
chain += ["csdr fractional_decimator_ff {last_decimation}"]
return chain + ["csdr convert_f_s16", "direwolf -c {direwolf_config} -r {audio_rate} -t 0 -q d -q h 1>&2"]
elif which == "pocsag":
chain += ["csdr fmdemod_quadri_cf"]
if self.last_decimation != 1.0:
chain += ["csdr fractional_decimator_ff {last_decimation}"]
return chain + ["fsk_demodulator -i", "pocsag_decoder"]
def set_secondary_demodulator(self, what):
if self.get_secondary_demodulator() == what:
return
self.secondary_demodulator = what
self.calculate_decimation()
self.restart()
def secondary_fft_block_size(self):
base = (self.samp_rate / self.decimation) / (self.fft_fps * 2)
if self.fft_averages == 0:
return base
return base / self.fft_averages
def secondary_decimation(self):
return 1 # currently unused
def secondary_bpf_cutoff(self):
if self.secondary_demodulator == "bpsk31":
return 31.25 / self.if_samp_rate()
elif self.secondary_demodulator == "bpsk63":
return 62.5 / self.if_samp_rate()
return 0
def secondary_bpf_transition_bw(self):
if self.secondary_demodulator == "bpsk31":
return 31.25 / self.if_samp_rate()
elif self.secondary_demodulator == "bpsk63":
return 62.5 / self.if_samp_rate()
return 0
def secondary_samples_per_bits(self):
if self.secondary_demodulator == "bpsk31":
return int(round(self.if_samp_rate() / 31.25)) & ~3
elif self.secondary_demodulator == "bpsk63":
return int(round(self.if_samp_rate() / 62.5)) & ~3
return 0
def secondary_bw(self):
if self.secondary_demodulator == "bpsk31":
return 31.25
elif self.secondary_demodulator == "bpsk63":
return 62.5
def start_secondary_demodulator(self):
if not self.secondary_demodulator:
return
logger.debug("starting secondary demodulator from IF input sampled at %d" % self.if_samp_rate())
secondary_command_demod = " | ".join(self.secondary_chain(self.secondary_demodulator))
self.try_create_pipes(self.secondary_pipe_names, secondary_command_demod)
self.try_create_configs(secondary_command_demod)
secondary_command_demod = secondary_command_demod.format(
input_pipe=self.pipes["iqtee2_pipe"],
secondary_shift_pipe=self.pipes["secondary_shift_pipe"],
secondary_decimation=self.secondary_decimation(),
secondary_samples_per_bits=self.secondary_samples_per_bits(),
secondary_bpf_cutoff=self.secondary_bpf_cutoff(),
secondary_bpf_transition_bw=self.secondary_bpf_transition_bw(),
if_samp_rate=self.if_samp_rate(),
last_decimation=self.last_decimation,
audio_rate=self.get_audio_rate(),
direwolf_config=self.direwolf_config_path,
)
logger.debug("secondary command (demod) = %s", secondary_command_demod)
if self.output.supports_type("secondary_fft"):
secondary_command_fft = " | ".join(self.secondary_chain("fft"))
secondary_command_fft = secondary_command_fft.format(
input_pipe=self.pipes["iqtee_pipe"],
secondary_fft_input_size=self.secondary_fft_size,
secondary_fft_size=self.secondary_fft_size,
secondary_fft_block_size=self.secondary_fft_block_size(),
fft_averages=self.fft_averages,
)
logger.debug("secondary command (fft) = %s", secondary_command_fft)
self.secondary_process_fft = subprocess.Popen(
secondary_command_fft, stdout=subprocess.PIPE, shell=True, start_new_session=True
)
self.output.send_output(
"secondary_fft",
partial(self.secondary_process_fft.stdout.read, int(self.get_secondary_fft_bytes_to_read())),
)
# direwolf does not provide any meaningful data on stdout
# more specifically, it doesn't provide any data. if however, for any strange reason, it would start to do so,
# it would block if not read. by piping it to devnull, we avoid a potential pitfall here.
secondary_output = subprocess.DEVNULL if self.isPacket() else subprocess.PIPE
self.secondary_process_demod = subprocess.Popen(
secondary_command_demod, stdout=secondary_output, shell=True, start_new_session=True
)
self.secondary_processes_running = True
if self.isWsjtMode() or self.isJs8():
chopper = AudioChopper(self, self.get_secondary_demodulator())
chopper.send_output("audio", self.secondary_process_demod.stdout.read)
output_type = "js8_demod" if self.isJs8() else "wsjt_demod"
self.output.send_output(output_type, chopper.read)
elif self.isPacket():
# we best get the ax25 packets from the kiss socket
kiss = KissClient(self.direwolf_config.getPort())
self.output.send_output("packet_demod", kiss.read)
elif self.isPocsag():
self.output.send_output("pocsag_demod", self.secondary_process_demod.stdout.readline)
else:
self.output.send_output("secondary_demod", partial(self.secondary_process_demod.stdout.read, 1))
# open control pipes for csdr and send initialization data
if self.has_pipe("secondary_shift_pipe"): # TODO digimodes
self.set_secondary_offset_freq(self.secondary_offset_freq) # TODO digimodes
def set_secondary_offset_freq(self, value):
self.secondary_offset_freq = value
if self.secondary_processes_running and self.has_pipe("secondary_shift_pipe"):
self.pipes["secondary_shift_pipe"].write(
"%g\n" % (-float(self.secondary_offset_freq) / self.if_samp_rate())
)
def stop_secondary_demodulator(self):
if not self.secondary_processes_running:
return
self.try_delete_pipes(self.secondary_pipe_names)
self.try_delete_configs()
if self.secondary_process_fft:
try:
os.killpg(os.getpgid(self.secondary_process_fft.pid), signal.SIGTERM)
# drain any leftover data to free file descriptors
self.secondary_process_fft.communicate()
self.secondary_process_fft = None
except ProcessLookupError:
# been killed by something else, ignore
pass
if self.secondary_process_demod:
try:
os.killpg(os.getpgid(self.secondary_process_demod.pid), signal.SIGTERM)
# drain any leftover data to free file descriptors
self.secondary_process_demod.communicate()
self.secondary_process_demod = None
except ProcessLookupError:
# been killed by something else, ignore
pass
self.secondary_processes_running = False
def get_secondary_demodulator(self):
return self.secondary_demodulator
def set_secondary_fft_size(self, secondary_fft_size):
if self.secondary_fft_size == secondary_fft_size:
return
self.secondary_fft_size = secondary_fft_size
self.restart()
def set_audio_compression(self, what):
if self.audio_compression == what:
return
self.audio_compression = what
self.restart()
def get_audio_bytes_to_read(self):
# desired latency: 5ms
# uncompressed audio has 16 bits = 2 bytes per sample
base = self.output_rate * 0.005 * 2
# adpcm compresses the bitstream by 4
if self.audio_compression == "adpcm":
base = base / 4
return int(base)
def set_fft_compression(self, what):
if self.fft_compression == what:
return
self.fft_compression = what
self.restart()
def get_secondary_fft_bytes_to_read(self):
if self.fft_compression == "none":
return self.secondary_fft_size * 4
if self.fft_compression == "adpcm":
return (self.secondary_fft_size / 2) + (10 / 2)
def set_samp_rate(self, samp_rate):
if self.samp_rate == samp_rate:
return
self.samp_rate = samp_rate
self.calculate_decimation()
if self.running:
self.restart()
def calculate_decimation(self):
(self.decimation, self.last_decimation) = self.get_decimation(self.samp_rate, self.get_audio_rate())
def get_decimation(self, input_rate, output_rate):
if output_rate <= 0:
raise ValueError("invalid output rate: {rate}".format(rate=output_rate))
decimation = 1
target_rate = output_rate
# wideband fm has a much higher frequency deviation (75kHz).
# we cannot cover this if we immediately decimate to the sample rate the audio will have later on, so we need
# to compensate here.
if self.get_demodulator() == "wfm" and output_rate < 200000:
target_rate = 200000
while input_rate / (decimation + 1) >= target_rate:
decimation += 1
fraction = float(input_rate / decimation) / output_rate
return decimation, fraction
def if_samp_rate(self):
return self.samp_rate / self.decimation
def get_name(self):
return self.name
def get_output_rate(self):
return self.output_rate
def get_hd_output_rate(self):
return self.hd_output_rate
def get_audio_rate(self):
if self.isDigitalVoice() or self.isPacket() or self.isPocsag() or self.isDrm():
return 48000
elif self.isWsjtMode() or self.isJs8():
return 12000
elif self.isFreeDV():
return 8000
elif self.isHdAudio():
return self.get_hd_output_rate()
return self.get_output_rate()
def isDigitalVoice(self, demodulator=None):
if demodulator is None:
demodulator = self.get_demodulator()
return demodulator in ["dmr", "dstar", "nxdn", "ysf", "m17"]
def isWsjtMode(self, demodulator=None):
if demodulator is None:
demodulator = self.get_secondary_demodulator()
return demodulator in ["ft8", "wspr", "jt65", "jt9", "ft4", "fst4", "fst4w", "q65"]
def isJs8(self, demodulator=None):
if demodulator is None:
demodulator = self.get_secondary_demodulator()
return demodulator == "js8"
def isPacket(self, demodulator=None):
if demodulator is None:
demodulator = self.get_secondary_demodulator()
return demodulator == "packet"
def isPocsag(self, demodulator=None):
if demodulator is None:
demodulator = self.get_secondary_demodulator()
return demodulator == "pocsag"
def isFreeDV(self, demodulator=None):
if demodulator is None:
demodulator = self.get_demodulator()
return demodulator == "freedv"
def isHdAudio(self, demodulator=None):
if demodulator is None:
demodulator = self.get_demodulator()
return demodulator == "wfm"
def isDrm(self, demodulator=None):
if demodulator is None:
demodulator = self.get_demodulator()
return demodulator == "drm"
def set_output_rate(self, output_rate):
if self.output_rate == output_rate:
return
self.output_rate = output_rate
self.calculate_decimation()
self.restart()
def set_hd_output_rate(self, hd_output_rate):
if self.hd_output_rate == hd_output_rate:
return
self.hd_output_rate = hd_output_rate
self.calculate_decimation()
self.restart()
def set_demodulator(self, demodulator):
if demodulator in ["usb", "lsb", "cw"]:
demodulator = "ssb"
if self.demodulator == demodulator:
return
self.demodulator = demodulator
self.calculate_decimation()
self.restart()
def get_demodulator(self):
return self.demodulator
def set_offset_freq(self, offset_freq):
if offset_freq is None:
return
self.offset_freq = offset_freq
if self.running:
if self.pycsdr_chain is not None and isinstance(self.pycsdr_chain, DemodulatorChain):
self.pycsdr_chain.setShiftRate(-float(self.offset_freq) / self.samp_rate)
else:
self.pipes["shift_pipe"].write("%g\n" % (-float(self.offset_freq) / self.samp_rate))
def set_center_freq(self, center_freq):
# dsp only needs to know this to be able to pass it to decoders in the form of get_operating_freq()
self.center_freq = center_freq
def get_operating_freq(self):
return self.center_freq + self.offset_freq
def set_bandpass(self, bandpass):
self.set_bpf(bandpass.low_cut, bandpass.high_cut)
def set_bpf(self, low_cut, high_cut):
self.low_cut = low_cut
self.high_cut = high_cut
if self.running:
if self.pycsdr_chain is not None and isinstance(self.pycsdr_chain, DemodulatorChain):
self.pycsdr_chain.setBandpass(float(self.low_cut) / self.if_samp_rate(), float(self.high_cut) / self.if_samp_rate())
else:
self.pipes["bpf_pipe"].write(
"%g %g\n" % (float(self.low_cut) / self.if_samp_rate(), float(self.high_cut) / self.if_samp_rate())
)
def get_bpf(self):
return [self.low_cut, self.high_cut]
def convertToLinear(self, db):
return float(math.pow(10, db / 10))
def set_squelch_level(self, squelch_level):
self.squelch_level = squelch_level
# no squelch required on digital voice modes
actual_squelch = (
-150
if self.isDigitalVoice() or self.isPacket() or self.isPocsag() or self.isFreeDV() or self.isDrm()
else self.squelch_level
)
if self.running:
if self.pycsdr_chain is not None and isinstance(self.pycsdr_chain, DemodulatorChain):
self.pycsdr_chain.setSquelchLevel(self.convertToLinear(actual_squelch))
else:
self.pipes["squelch_pipe"].write("%g\n" % (self.convertToLinear(actual_squelch)))
def set_codecserver(self, s):
if self.codecserver == s:
return
self.codecserver = s
self.restart()
def get_codecserver_arg(self):
return "-s {}".format(self.codecserver) if self.codecserver else ""
def set_dmr_filter(self, filter):
if self.has_pipe("dmr_control_pipe"):
self.pipes["dmr_control_pipe"].write("{0}\n".format(filter))
if self.pycsdr_enabled and self.pycsdr_chain is not None and isinstance(self.pycsdr_chain, DemodulatorChain):
self.pycsdr_chain.setSlotFilter(filter)
def set_wfm_deemphasis_tau(self, tau):
if self.wfm_deemphasis_tau == tau:
return
self.wfm_deemphasis_tau = tau
self.restart()
def ddc_transition_bw(self):
return self.ddc_transition_bw_rate * (self.if_samp_rate() / float(self.samp_rate))
def try_create_pipes(self, pipe_names, command_base):
for pipe_name, pipe_type in pipe_names.items():
if self.has_pipe(pipe_name):
logger.warning("{pipe_name} is still in use", pipe_name=pipe_name)
self.pipes[pipe_name].close()
if "{" + pipe_name + "}" in command_base:
p = self.pipe_base_path + pipe_name
encoding = None
# TODO make digiham output unicode and then change this here
# the whole pipe enoding feature onlye exists because of this
if pipe_name == "meta_pipe":
encoding = "cp437"
self.pipes[pipe_name] = Pipe.create(p, pipe_type, encoding=encoding)
else:
self.pipes[pipe_name] = None
def has_pipe(self, name):
return name in self.pipes and self.pipes[name] is not None
def try_delete_pipes(self, pipe_names):
for pipe_name in pipe_names:
if self.has_pipe(pipe_name):
self.pipes[pipe_name].close()
self.pipes[pipe_name] = None
def try_create_configs(self, command):
if "{direwolf_config}" in command:
self.direwolf_config_path = "{tmp_dir}/openwebrx_direwolf_{myid}.conf".format(
tmp_dir=self.temporary_directory, myid=id(self)
)
self.direwolf_config = DirewolfConfig()
self.direwolf_config.wire(self)
file = open(self.direwolf_config_path, "w")
file.write(self.direwolf_config.getConfig(self.is_service))
file.close()
else:
self.direwolf_config = None
self.direwolf_config_path = None
def try_delete_configs(self):
if self.direwolf_config is not None:
self.direwolf_config.unwire(self)
self.direwolf_config = None
if self.direwolf_config_path is not None:
try:
os.unlink(self.direwolf_config_path)
except FileNotFoundError:
# result suits our expectations. fine :)
pass
except Exception:
logger.exception("try_delete_configs()")
self.direwolf_config_path = None
def onConfigChanged(self):
self.restart()
def start(self):
with self.modification_lock:
if self.running:
return
self.running = True
chain = self.chain(self.demodulator)
if self.pycsdr_enabled and isinstance(chain, DemodulatorChain):
self.set_squelch_level(self.squelch_level)
self.set_bpf(self.low_cut, self.high_cut)
self.set_offset_freq(self.offset_freq)
chain.setReader(self.buffer.getReader())
output_rate = self.get_hd_output_rate() if self.isHdAudio() else self.get_output_rate()
audio_rate = 8000 if self.isDigitalVoice() else self.get_audio_rate()
self.pycsdr_client_chain = ClientAudioChain(chain.getOutputFormat(), audio_rate, output_rate, self.audio_compression)
buffer = Buffer(chain.getOutputFormat())
chain.setWriter(buffer)
self.pycsdr_client_chain.setReader(buffer.getReader())
outputBuffer = Buffer(self.pycsdr_client_chain.getOutputFormat())
self.pycsdr_client_chain.setWriter(outputBuffer)
self.pycsdr_reader = outputBuffer.getReader()
audio_type = "hd_audio" if self.isHdAudio() else "audio"
self.output.send_output(audio_type, self.pycsdr_reader.read)
powerBuffer = Buffer(Format.FLOAT)
chain.setPowerWriter(powerBuffer)
self.pycsdr_power_reader = powerBuffer.getReader()
self.output.send_output("smeter", self.pycsdr_power_reader.read)
if self.isDigitalVoice():
metaBuffer = Buffer(Format.CHAR)
chain.setMetaWriter(metaBuffer)
self.pycsdr_meta_reader = metaBuffer.getReader()
def read_meta():
raw = self.pycsdr_meta_reader.read()
if raw is None or len(raw) == 0:
return None
else:
raw = raw.tobytes().decode("utf-8")
return raw.rstrip("\n")
self.output.send_output("meta", read_meta)
return
command_base = " | ".join(chain)
# create control pipes for csdr
self.try_create_pipes(self.pipe_names, command_base)
# send initial config through the pipes
if self.has_pipe("bpf_pipe"):
self.set_bpf(self.low_cut, self.high_cut)
if self.has_pipe("shift_pipe"):
self.set_offset_freq(self.offset_freq)
if self.has_pipe("squelch_pipe"):
self.set_squelch_level(self.squelch_level)
if self.has_pipe("dmr_control_pipe"):
self.set_dmr_filter(3)
# run the command
command = command_base.format(
bpf_pipe=self.pipes["bpf_pipe"],
shift_pipe=self.pipes["shift_pipe"],
squelch_pipe=self.pipes["squelch_pipe"],
smeter_pipe=self.pipes["smeter_pipe"],
meta_pipe=self.pipes["meta_pipe"],
iqtee_pipe=self.pipes["iqtee_pipe"],
iqtee2_pipe=self.pipes["iqtee2_pipe"],
dmr_control_pipe=self.pipes["dmr_control_pipe"],
decimation=self.decimation,
last_decimation=self.last_decimation,
bpf_transition_bw=float(self.bpf_transition_bw) / self.if_samp_rate(),
ddc_transition_bw=self.ddc_transition_bw(),
flowcontrol=int(self.samp_rate * 2),
start_bufsize=self.base_bufsize * self.decimation,
nc_port=self.nc_port,
output_rate=self.get_output_rate(),
smeter_report_every=int(self.if_samp_rate() / 6000),
codecserver_arg=self.get_codecserver_arg(),
audio_rate=self.get_audio_rate(),
wfm_deemphasis_tau=self.wfm_deemphasis_tau,
)
logger.debug("Command = %s", command)
out = subprocess.PIPE if self.output.supports_type("audio") else subprocess.DEVNULL
self.process = subprocess.Popen(command, stdout=out, shell=True, start_new_session=True)
def watch_thread():
rc = self.process.wait()
logger.debug("dsp thread ended with rc=%d", rc)
if rc == 0 and self.running and not self.modification_lock.locked():
logger.debug("restarting since rc = 0, self.running = true, and no modification")
self.restart()
threading.Thread(target=watch_thread, name="csdr_watch_thread").start()
audio_type = "hd_audio" if self.isHdAudio() else "audio"
if self.output.supports_type(audio_type):
self.output.send_output(
audio_type,
partial(
self.process.stdout.read,
self.get_audio_bytes_to_read(),
),
)
self.start_secondary_demodulator()
if self.has_pipe("smeter_pipe"):
def read_smeter():
raw = self.pipes["smeter_pipe"].readline()
if len(raw) == 0:
return None
else:
return float(raw.rstrip("\n"))
self.output.send_output("smeter", read_smeter)
if self.has_pipe("meta_pipe"):
def read_meta():
raw = self.pipes["meta_pipe"].readline()
if len(raw) == 0:
return None
else:
return raw.rstrip("\n")
self.output.send_output("meta", read_meta)
def stop(self):
with self.modification_lock:
self.running = False
if self.pycsdr_enabled and self.pycsdr_chain is not None:
self.pycsdr_chain.stop()
self.pycsdr_chain = None
self.pycsdr_reader.stop()
self.pycsdr_reader = None
self.pycsdr_power_reader.stop()
self.pycsdr_power_reader = None
self.pycsdr_client_chain.stop()
self.pycsdr_client_chain = None
if self.pycsdr_meta_reader is not None:
self.pycsdr_meta_reader.stop()
self.pycsdr_meta_reader = None
if self.process is not None:
try:
os.killpg(os.getpgid(self.process.pid), signal.SIGTERM)
# drain any leftover data to free file descriptors
self.process.communicate()
self.process = None
except ProcessLookupError:
# been killed by something else, ignore
pass
self.stop_secondary_demodulator()
self.try_delete_pipes(self.pipe_names)
self.try_delete_configs()
def restart(self):
if not self.running:
return
self.stop()
self.start()

View File

@ -1,156 +0,0 @@
import os
import select
import time
import threading
import logging
logger = logging.getLogger(__name__)
class Pipe(object):
READ = "r"
WRITE = "w"
NONE = None
@staticmethod
def create(path, t, encoding=None):
if t == Pipe.READ:
return ReadingPipe(path, encoding=encoding)
elif t == Pipe.WRITE:
return WritingPipe(path, encoding=encoding)
elif t == Pipe.NONE:
return Pipe(path, None, encoding=encoding)
def __init__(self, path, direction, encoding=None):
self.doOpen = True
self.path = "{base}_{myid}".format(base=path, myid=id(self))
self.direction = direction
self.encoding = encoding
self.file = None
os.mkfifo(self.path)
def open(self):
"""
this method opens the file descriptor with an added O_NONBLOCK flag. This gives us a special behaviour for
FIFOS, when they are not opened by the opposing side:
- opening a pipe for writing will throw an OSError with errno = 6 (ENXIO). This is handled specially in the
WritingPipe class.
- opening a pipe for reading will pass through this method instantly, even if the opposing end has not been
opened yet, but the resulting file descriptor will behave as if O_NONBLOCK is set (even if we remove it
immediately here), resulting in empty reads until data is available. This is handled specially in the
ReadingPipe class.
"""
def opener(path, flags):
fd = os.open(path, flags | os.O_NONBLOCK)
os.set_blocking(fd, True)
return fd
self.file = open(self.path, self.direction, encoding=self.encoding, opener=opener)
def close(self):
self.doOpen = False
try:
if self.file is not None:
self.file.close()
os.unlink(self.path)
except FileNotFoundError:
# it seems like we keep calling this twice. no idea why, but we don't need the resulting error.
pass
except Exception:
logger.exception("Pipe.close()")
def __str__(self):
return self.path
class WritingPipe(Pipe):
def __init__(self, path, encoding=None):
self.queue = []
self.queueLock = threading.Lock()
super().__init__(path, "w", encoding=encoding)
self.open()
def open_and_dequeue(self):
"""
This method implements a retry loop that can be interrupted in case the Pipe gets shutdown before actually
being connected.
After the pipe is opened successfully, all data that has been queued is sent in the order it was passed into
write().
"""
retries = 0
while self.file is None and self.doOpen and retries < 10:
try:
super().open()
except OSError as error:
# ENXIO = FIFO has not been opened for reading
if error.errno == 6:
time.sleep(0.1)
retries += 1
else:
raise
# if doOpen is false, opening has been canceled, so no warning in that case.
if self.file is None:
if self.doOpen:
logger.warning("could not open FIFO %s", self.path)
return
with self.queueLock:
for i in self.queue:
self.file.write(i)
self.file.flush()
self.queue = None
def open(self):
"""
This sends the opening operation off to a background thread. If we were to block the thread here, another pipe
may be waiting in the queue to be opened on the opposing side, resulting in a deadlock
"""
threading.Thread(target=self.open_and_dequeue, name="csdr_pipe_thread").start()
def write(self, data):
"""
This method queues all data to be written until the file is actually opened. As soon as a file is available,
it becomes a passthrough.
"""
if self.file is None:
with self.queueLock:
self.queue.append(data)
return
r = self.file.write(data)
self.file.flush()
return r
class ReadingPipe(Pipe):
def __init__(self, path, encoding=None):
super().__init__(path, "r", encoding=encoding)
def open(self):
"""
This method implements an interruptible loop that waits for the file descriptor to be opened and the first
batch of data coming in using repeated select() calls.
:return:
"""
if not self.doOpen:
return
super().open()
while self.doOpen:
(read, _, _) = select.select([self.file], [], [], 1)
if self.file in read:
break
def read(self):
if self.file is None:
self.open()
return self.file.read()
def readline(self):
if self.file is None:
self.open()
return self.file.readline()