introduce new pipe classes to improve sequencing
This commit is contained in:
parent
0e47f2d92a
commit
9cc850e578
163
csdr/csdr.py
163
csdr/csdr.py
@ -64,6 +64,77 @@ class output(object):
|
|||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
class Pipe(object):
|
||||||
|
READ = "r"
|
||||||
|
WRITE = "w"
|
||||||
|
NONE = None
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def create(path, t, encoding=None):
|
||||||
|
if t == Pipe.READ:
|
||||||
|
return ReadingPipe(path, encoding=encoding)
|
||||||
|
elif t == Pipe.WRITE:
|
||||||
|
return WritingPipe(path, encoding=encoding)
|
||||||
|
elif t == Pipe.NONE:
|
||||||
|
return Pipe(path, None, encoding=encoding)
|
||||||
|
|
||||||
|
def __init__(self, path, direction, encoding=None):
|
||||||
|
self.path = path
|
||||||
|
self.direction = direction
|
||||||
|
self.encoding = encoding
|
||||||
|
self.file = None
|
||||||
|
try:
|
||||||
|
os.unlink(path)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
os.mkfifo(path)
|
||||||
|
|
||||||
|
def open(self):
|
||||||
|
self.file = open(self.path, self.direction, encoding=self.encoding)
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
if self.file is None:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
self.file.close()
|
||||||
|
os.unlink(self.path)
|
||||||
|
except FileNotFoundError:
|
||||||
|
# it seems like we keep calling this twice. no idea why, but we don't need the resulting error.
|
||||||
|
pass
|
||||||
|
except Exception:
|
||||||
|
logger.exception("Pipe.close()")
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return self.path
|
||||||
|
|
||||||
|
|
||||||
|
class WritingPipe(Pipe):
|
||||||
|
def __init__(self, path, encoding=None):
|
||||||
|
super().__init__(path, "w", encoding=encoding)
|
||||||
|
|
||||||
|
def write(self, data):
|
||||||
|
if self.file is None:
|
||||||
|
self.open()
|
||||||
|
r =self.file.write(data)
|
||||||
|
self.file.flush()
|
||||||
|
return r
|
||||||
|
|
||||||
|
|
||||||
|
class ReadingPipe(Pipe):
|
||||||
|
def __init__(self, path, encoding=None):
|
||||||
|
super().__init__(path, "r", encoding=encoding)
|
||||||
|
|
||||||
|
def read(self):
|
||||||
|
if self.file is None:
|
||||||
|
self.open()
|
||||||
|
return self.file.read()
|
||||||
|
|
||||||
|
def readline(self):
|
||||||
|
if self.file is None:
|
||||||
|
self.open()
|
||||||
|
return self.file.readline()
|
||||||
|
|
||||||
|
|
||||||
class dsp(object):
|
class dsp(object):
|
||||||
def __init__(self, output):
|
def __init__(self, output):
|
||||||
self.samp_rate = 250000
|
self.samp_rate = 250000
|
||||||
@ -94,19 +165,18 @@ class dsp(object):
|
|||||||
self.secondary_fft_size = 1024
|
self.secondary_fft_size = 1024
|
||||||
self.secondary_process_fft = None
|
self.secondary_process_fft = None
|
||||||
self.secondary_process_demod = None
|
self.secondary_process_demod = None
|
||||||
self.pipe_names = [
|
self.pipe_names = {
|
||||||
"bpf_pipe",
|
"bpf_pipe": Pipe.WRITE,
|
||||||
"shift_pipe",
|
"shift_pipe": Pipe.WRITE,
|
||||||
"squelch_pipe",
|
"squelch_pipe": Pipe.WRITE,
|
||||||
"smeter_pipe",
|
"smeter_pipe": Pipe.READ,
|
||||||
"meta_pipe",
|
"meta_pipe": Pipe.READ,
|
||||||
"iqtee_pipe",
|
"iqtee_pipe": Pipe.NONE,
|
||||||
"iqtee2_pipe",
|
"iqtee2_pipe": Pipe.NONE,
|
||||||
"dmr_control_pipe",
|
"dmr_control_pipe": Pipe.WRITE,
|
||||||
]
|
}
|
||||||
self.pipes = {}
|
self.pipes = {}
|
||||||
self.pipe_files = {}
|
self.secondary_pipe_names = {"secondary_shift_pipe": Pipe.WRITE}
|
||||||
self.secondary_pipe_names = ["secondary_shift_pipe"]
|
|
||||||
self.secondary_offset_freq = 1000
|
self.secondary_offset_freq = 1000
|
||||||
self.unvoiced_quality = 1
|
self.unvoiced_quality = 1
|
||||||
self.modification_lock = threading.Lock()
|
self.modification_lock = threading.Lock()
|
||||||
@ -380,14 +450,12 @@ class dsp(object):
|
|||||||
|
|
||||||
# open control pipes for csdr and send initialization data
|
# open control pipes for csdr and send initialization data
|
||||||
if self.has_pipe("secondary_shift_pipe"): # TODO digimodes
|
if self.has_pipe("secondary_shift_pipe"): # TODO digimodes
|
||||||
self.pipe_files["secondary_shift_pipe"] = open(self.pipes["secondary_shift_pipe"], "w") # TODO digimodes
|
|
||||||
self.set_secondary_offset_freq(self.secondary_offset_freq) # TODO digimodes
|
self.set_secondary_offset_freq(self.secondary_offset_freq) # TODO digimodes
|
||||||
|
|
||||||
def set_secondary_offset_freq(self, value):
|
def set_secondary_offset_freq(self, value):
|
||||||
self.secondary_offset_freq = value
|
self.secondary_offset_freq = value
|
||||||
if self.secondary_processes_running and self.has_pipe("secondary_shift_pipe"):
|
if self.secondary_processes_running and self.has_pipe("secondary_shift_pipe"):
|
||||||
self.pipe_files["secondary_shift_pipe"].write("%g\n" % (-float(self.secondary_offset_freq) / self.if_samp_rate()))
|
self.pipes["secondary_shift_pipe"].write("%g\n" % (-float(self.secondary_offset_freq) / self.if_samp_rate()))
|
||||||
self.pipe_files["secondary_shift_pipe"].flush()
|
|
||||||
|
|
||||||
def stop_secondary_demodulator(self):
|
def stop_secondary_demodulator(self):
|
||||||
if not self.secondary_processes_running:
|
if not self.secondary_processes_running:
|
||||||
@ -534,18 +602,16 @@ class dsp(object):
|
|||||||
self.offset_freq = offset_freq
|
self.offset_freq = offset_freq
|
||||||
if self.running:
|
if self.running:
|
||||||
with self.modification_lock:
|
with self.modification_lock:
|
||||||
self.pipe_files["shift_pipe"].write("%g\n" % (-float(self.offset_freq) / self.samp_rate))
|
self.pipes["shift_pipe"].write("%g\n" % (-float(self.offset_freq) / self.samp_rate))
|
||||||
self.pipe_files["shift_pipe"].flush()
|
|
||||||
|
|
||||||
def set_bpf(self, low_cut, high_cut):
|
def set_bpf(self, low_cut, high_cut):
|
||||||
self.low_cut = low_cut
|
self.low_cut = low_cut
|
||||||
self.high_cut = high_cut
|
self.high_cut = high_cut
|
||||||
if self.running:
|
if self.running:
|
||||||
with self.modification_lock:
|
with self.modification_lock:
|
||||||
self.pipe_files["bpf_pipe"].write(
|
self.pipes["bpf_pipe"].write(
|
||||||
"%g %g\n" % (float(self.low_cut) / self.if_samp_rate(), float(self.high_cut) / self.if_samp_rate())
|
"%g %g\n" % (float(self.low_cut) / self.if_samp_rate(), float(self.high_cut) / self.if_samp_rate())
|
||||||
)
|
)
|
||||||
self.pipe_files["bpf_pipe"].flush()
|
|
||||||
|
|
||||||
def get_bpf(self):
|
def get_bpf(self):
|
||||||
return [self.low_cut, self.high_cut]
|
return [self.low_cut, self.high_cut]
|
||||||
@ -559,8 +625,7 @@ class dsp(object):
|
|||||||
actual_squelch = -150 if self.isDigitalVoice() or self.isPacket() or self.isPocsag() else self.squelch_level
|
actual_squelch = -150 if self.isDigitalVoice() or self.isPacket() or self.isPocsag() else self.squelch_level
|
||||||
if self.running:
|
if self.running:
|
||||||
with self.modification_lock:
|
with self.modification_lock:
|
||||||
self.pipe_files["squelch_pipe"].write("%g\n" % (self.convertToLinear(actual_squelch)))
|
self.pipes["squelch_pipe"].write("%g\n" % (self.convertToLinear(actual_squelch)))
|
||||||
self.pipe_files["squelch_pipe"].flush()
|
|
||||||
|
|
||||||
def set_unvoiced_quality(self, q):
|
def set_unvoiced_quality(self, q):
|
||||||
self.unvoiced_quality = q
|
self.unvoiced_quality = q
|
||||||
@ -571,25 +636,21 @@ class dsp(object):
|
|||||||
|
|
||||||
def set_dmr_filter(self, filter):
|
def set_dmr_filter(self, filter):
|
||||||
if self.has_pipe("dmr_control_pipe"):
|
if self.has_pipe("dmr_control_pipe"):
|
||||||
self.pipe_files["dmr_control_pipe"].write("{0}\n".format(filter))
|
self.pipes["dmr_control_pipe"].write("{0}\n".format(filter))
|
||||||
self.pipe_files["dmr_control_pipe"].flush()
|
|
||||||
|
|
||||||
def mkfifo(self, path):
|
|
||||||
try:
|
|
||||||
os.unlink(path)
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
os.mkfifo(path)
|
|
||||||
|
|
||||||
def ddc_transition_bw(self):
|
def ddc_transition_bw(self):
|
||||||
return self.ddc_transition_bw_rate * (self.if_samp_rate() / float(self.samp_rate))
|
return self.ddc_transition_bw_rate * (self.if_samp_rate() / float(self.samp_rate))
|
||||||
|
|
||||||
def try_create_pipes(self, pipe_names, command_base):
|
def try_create_pipes(self, pipe_names, command_base):
|
||||||
for pipe_name in pipe_names:
|
for pipe_name, pipe_type in pipe_names.items():
|
||||||
if "{" + pipe_name + "}" in command_base:
|
if "{" + pipe_name + "}" in command_base:
|
||||||
p = self.pipe_base_path + pipe_name
|
p = self.pipe_base_path + pipe_name
|
||||||
self.pipes[pipe_name] = p
|
encoding = None
|
||||||
self.mkfifo(p)
|
# TODO make digiham output unicode and then change this here
|
||||||
|
# the whole pipe enoding feature onlye exists because of this
|
||||||
|
if pipe_name == "meta_pipe":
|
||||||
|
encoding = "cp437"
|
||||||
|
self.pipes[pipe_name] = Pipe.create(p, pipe_type, encoding=encoding)
|
||||||
else:
|
else:
|
||||||
self.pipes[pipe_name] = None
|
self.pipes[pipe_name] = None
|
||||||
|
|
||||||
@ -599,15 +660,7 @@ class dsp(object):
|
|||||||
def try_delete_pipes(self, pipe_names):
|
def try_delete_pipes(self, pipe_names):
|
||||||
for pipe_name in pipe_names:
|
for pipe_name in pipe_names:
|
||||||
if self.has_pipe(pipe_name):
|
if self.has_pipe(pipe_name):
|
||||||
pipe_path = self.pipes[pipe_name]
|
self.pipes[pipe_name].close()
|
||||||
try:
|
|
||||||
os.unlink(pipe_path)
|
|
||||||
except FileNotFoundError:
|
|
||||||
# it seems like we keep calling this twice. no idea why, but we don't need the resulting error.
|
|
||||||
pass
|
|
||||||
except Exception:
|
|
||||||
logger.exception("try_delete_pipes()")
|
|
||||||
finally:
|
|
||||||
self.pipes[pipe_name] = None
|
self.pipes[pipe_name] = None
|
||||||
|
|
||||||
def try_create_configs(self, command):
|
def try_create_configs(self, command):
|
||||||
@ -701,23 +754,16 @@ class dsp(object):
|
|||||||
|
|
||||||
self.start_secondary_demodulator()
|
self.start_secondary_demodulator()
|
||||||
|
|
||||||
# open control pipes for csdr
|
|
||||||
for p in ["bpf_pipe", "shift_pipe", "squelch_pipe", "dmr_control_pipe"]:
|
|
||||||
if self.has_pipe(p):
|
|
||||||
self.pipe_files[p] = open(self.pipes[p], "w")
|
|
||||||
|
|
||||||
# send initial config through the pipes
|
# send initial config through the pipes
|
||||||
if self.has_pipe("squelch_pipe"):
|
|
||||||
self.set_squelch_level(self.squelch_level)
|
|
||||||
if self.has_pipe("shift_pipe"):
|
|
||||||
self.set_offset_freq(self.offset_freq)
|
|
||||||
if self.has_pipe("bpf_pipe"):
|
if self.has_pipe("bpf_pipe"):
|
||||||
self.set_bpf(self.low_cut, self.high_cut)
|
self.set_bpf(self.low_cut, self.high_cut)
|
||||||
|
if self.has_pipe("shift_pipe"):
|
||||||
|
self.set_offset_freq(self.offset_freq)
|
||||||
|
if self.has_pipe("squelch_pipe"):
|
||||||
|
self.set_squelch_level(self.squelch_level)
|
||||||
if self.has_pipe("smeter_pipe"):
|
if self.has_pipe("smeter_pipe"):
|
||||||
self.pipe_files["smeter_pipe"] = open(self.pipes["smeter_pipe"], "r")
|
|
||||||
|
|
||||||
def read_smeter():
|
def read_smeter():
|
||||||
raw = self.pipe_files["smeter_pipe"].readline()
|
raw = self.pipes["smeter_pipe"].readline()
|
||||||
if len(raw) == 0:
|
if len(raw) == 0:
|
||||||
return None
|
return None
|
||||||
else:
|
else:
|
||||||
@ -725,11 +771,8 @@ class dsp(object):
|
|||||||
|
|
||||||
self.output.send_output("smeter", read_smeter)
|
self.output.send_output("smeter", read_smeter)
|
||||||
if self.has_pipe("meta_pipe"):
|
if self.has_pipe("meta_pipe"):
|
||||||
# TODO make digiham output unicode and then change this here
|
|
||||||
self.pipe_files["meta_pipe"] = open(self.pipes["meta_pipe"], "r", encoding="cp437")
|
|
||||||
|
|
||||||
def read_meta():
|
def read_meta():
|
||||||
raw = self.pipe_files["meta_pipe"].readline()
|
raw = self.pipes["meta_pipe"].readline()
|
||||||
if len(raw) == 0:
|
if len(raw) == 0:
|
||||||
return None
|
return None
|
||||||
else:
|
else:
|
||||||
@ -737,6 +780,10 @@ class dsp(object):
|
|||||||
|
|
||||||
self.output.send_output("meta", read_meta)
|
self.output.send_output("meta", read_meta)
|
||||||
|
|
||||||
|
if self.csdr_dynamic_bufsize:
|
||||||
|
self.process.stdout.read(8) # dummy read to skip bufsize & preamble
|
||||||
|
logger.debug("Note: CSDR_DYNAMIC_BUFSIZE_ON = 1")
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
with self.modification_lock:
|
with self.modification_lock:
|
||||||
self.running = False
|
self.running = False
|
||||||
|
@ -66,10 +66,6 @@ class SpectrumThread(csdr.output):
|
|||||||
return t == "audio"
|
return t == "audio"
|
||||||
|
|
||||||
def receive_output(self, type, read_fn):
|
def receive_output(self, type, read_fn):
|
||||||
if self.props["csdr_dynamic_bufsize"]:
|
|
||||||
read_fn(8) # dummy read to skip bufsize & preamble
|
|
||||||
logger.debug("Note: CSDR_DYNAMIC_BUFSIZE_ON = 1")
|
|
||||||
|
|
||||||
threading.Thread(target=self.pump(read_fn, self.sdrSource.writeSpectrumData)).start()
|
threading.Thread(target=self.pump(read_fn, self.sdrSource.writeSpectrumData)).start()
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
|
Loading…
Reference in New Issue
Block a user