From 9cc850e57875929c6754ddc18d3b785fbc186842 Mon Sep 17 00:00:00 2001 From: Jakob Ketterl Date: Fri, 28 Feb 2020 00:20:37 +0100 Subject: [PATCH] introduce new pipe classes to improve sequencing --- csdr/csdr.py | 165 +++++++++++++++++++++++++++++++++------------------ owrx/fft.py | 4 -- 2 files changed, 106 insertions(+), 63 deletions(-) diff --git a/csdr/csdr.py b/csdr/csdr.py index 441a1fc..9dc52b5 100644 --- a/csdr/csdr.py +++ b/csdr/csdr.py @@ -64,6 +64,77 @@ class output(object): return True +class Pipe(object): + READ = "r" + WRITE = "w" + NONE = None + + @staticmethod + def create(path, t, encoding=None): + if t == Pipe.READ: + return ReadingPipe(path, encoding=encoding) + elif t == Pipe.WRITE: + return WritingPipe(path, encoding=encoding) + elif t == Pipe.NONE: + return Pipe(path, None, encoding=encoding) + + def __init__(self, path, direction, encoding=None): + self.path = path + self.direction = direction + self.encoding = encoding + self.file = None + try: + os.unlink(path) + except Exception: + pass + os.mkfifo(path) + + def open(self): + self.file = open(self.path, self.direction, encoding=self.encoding) + + def close(self): + if self.file is None: + return + try: + self.file.close() + os.unlink(self.path) + except FileNotFoundError: + # it seems like we keep calling this twice. no idea why, but we don't need the resulting error. + pass + except Exception: + logger.exception("Pipe.close()") + + def __str__(self): + return self.path + + +class WritingPipe(Pipe): + def __init__(self, path, encoding=None): + super().__init__(path, "w", encoding=encoding) + + def write(self, data): + if self.file is None: + self.open() + r =self.file.write(data) + self.file.flush() + return r + + +class ReadingPipe(Pipe): + def __init__(self, path, encoding=None): + super().__init__(path, "r", encoding=encoding) + + def read(self): + if self.file is None: + self.open() + return self.file.read() + + def readline(self): + if self.file is None: + self.open() + return self.file.readline() + + class dsp(object): def __init__(self, output): self.samp_rate = 250000 @@ -94,19 +165,18 @@ class dsp(object): self.secondary_fft_size = 1024 self.secondary_process_fft = None self.secondary_process_demod = None - self.pipe_names = [ - "bpf_pipe", - "shift_pipe", - "squelch_pipe", - "smeter_pipe", - "meta_pipe", - "iqtee_pipe", - "iqtee2_pipe", - "dmr_control_pipe", - ] + self.pipe_names = { + "bpf_pipe": Pipe.WRITE, + "shift_pipe": Pipe.WRITE, + "squelch_pipe": Pipe.WRITE, + "smeter_pipe": Pipe.READ, + "meta_pipe": Pipe.READ, + "iqtee_pipe": Pipe.NONE, + "iqtee2_pipe": Pipe.NONE, + "dmr_control_pipe": Pipe.WRITE, + } self.pipes = {} - self.pipe_files = {} - self.secondary_pipe_names = ["secondary_shift_pipe"] + self.secondary_pipe_names = {"secondary_shift_pipe": Pipe.WRITE} self.secondary_offset_freq = 1000 self.unvoiced_quality = 1 self.modification_lock = threading.Lock() @@ -380,14 +450,12 @@ class dsp(object): # open control pipes for csdr and send initialization data if self.has_pipe("secondary_shift_pipe"): # TODO digimodes - self.pipe_files["secondary_shift_pipe"] = open(self.pipes["secondary_shift_pipe"], "w") # TODO digimodes self.set_secondary_offset_freq(self.secondary_offset_freq) # TODO digimodes def set_secondary_offset_freq(self, value): self.secondary_offset_freq = value if self.secondary_processes_running and self.has_pipe("secondary_shift_pipe"): - self.pipe_files["secondary_shift_pipe"].write("%g\n" % (-float(self.secondary_offset_freq) / self.if_samp_rate())) - self.pipe_files["secondary_shift_pipe"].flush() + self.pipes["secondary_shift_pipe"].write("%g\n" % (-float(self.secondary_offset_freq) / self.if_samp_rate())) def stop_secondary_demodulator(self): if not self.secondary_processes_running: @@ -534,18 +602,16 @@ class dsp(object): self.offset_freq = offset_freq if self.running: with self.modification_lock: - self.pipe_files["shift_pipe"].write("%g\n" % (-float(self.offset_freq) / self.samp_rate)) - self.pipe_files["shift_pipe"].flush() + self.pipes["shift_pipe"].write("%g\n" % (-float(self.offset_freq) / self.samp_rate)) def set_bpf(self, low_cut, high_cut): self.low_cut = low_cut self.high_cut = high_cut if self.running: with self.modification_lock: - self.pipe_files["bpf_pipe"].write( + self.pipes["bpf_pipe"].write( "%g %g\n" % (float(self.low_cut) / self.if_samp_rate(), float(self.high_cut) / self.if_samp_rate()) ) - self.pipe_files["bpf_pipe"].flush() def get_bpf(self): return [self.low_cut, self.high_cut] @@ -559,8 +625,7 @@ class dsp(object): actual_squelch = -150 if self.isDigitalVoice() or self.isPacket() or self.isPocsag() else self.squelch_level if self.running: with self.modification_lock: - self.pipe_files["squelch_pipe"].write("%g\n" % (self.convertToLinear(actual_squelch))) - self.pipe_files["squelch_pipe"].flush() + self.pipes["squelch_pipe"].write("%g\n" % (self.convertToLinear(actual_squelch))) def set_unvoiced_quality(self, q): self.unvoiced_quality = q @@ -571,25 +636,21 @@ class dsp(object): def set_dmr_filter(self, filter): if self.has_pipe("dmr_control_pipe"): - self.pipe_files["dmr_control_pipe"].write("{0}\n".format(filter)) - self.pipe_files["dmr_control_pipe"].flush() - - def mkfifo(self, path): - try: - os.unlink(path) - except: - pass - os.mkfifo(path) + self.pipes["dmr_control_pipe"].write("{0}\n".format(filter)) def ddc_transition_bw(self): return self.ddc_transition_bw_rate * (self.if_samp_rate() / float(self.samp_rate)) def try_create_pipes(self, pipe_names, command_base): - for pipe_name in pipe_names: + for pipe_name, pipe_type in pipe_names.items(): if "{" + pipe_name + "}" in command_base: p = self.pipe_base_path + pipe_name - self.pipes[pipe_name] = p - self.mkfifo(p) + encoding = None + # TODO make digiham output unicode and then change this here + # the whole pipe enoding feature onlye exists because of this + if pipe_name == "meta_pipe": + encoding = "cp437" + self.pipes[pipe_name] = Pipe.create(p, pipe_type, encoding=encoding) else: self.pipes[pipe_name] = None @@ -599,16 +660,8 @@ class dsp(object): def try_delete_pipes(self, pipe_names): for pipe_name in pipe_names: if self.has_pipe(pipe_name): - pipe_path = self.pipes[pipe_name] - try: - os.unlink(pipe_path) - except FileNotFoundError: - # it seems like we keep calling this twice. no idea why, but we don't need the resulting error. - pass - except Exception: - logger.exception("try_delete_pipes()") - finally: - self.pipes[pipe_name] = None + self.pipes[pipe_name].close() + self.pipes[pipe_name] = None def try_create_configs(self, command): if "{direwolf_config}" in command: @@ -701,23 +754,16 @@ class dsp(object): self.start_secondary_demodulator() - # open control pipes for csdr - for p in ["bpf_pipe", "shift_pipe", "squelch_pipe", "dmr_control_pipe"]: - if self.has_pipe(p): - self.pipe_files[p] = open(self.pipes[p], "w") - # send initial config through the pipes - if self.has_pipe("squelch_pipe"): - self.set_squelch_level(self.squelch_level) - if self.has_pipe("shift_pipe"): - self.set_offset_freq(self.offset_freq) if self.has_pipe("bpf_pipe"): self.set_bpf(self.low_cut, self.high_cut) + if self.has_pipe("shift_pipe"): + self.set_offset_freq(self.offset_freq) + if self.has_pipe("squelch_pipe"): + self.set_squelch_level(self.squelch_level) if self.has_pipe("smeter_pipe"): - self.pipe_files["smeter_pipe"] = open(self.pipes["smeter_pipe"], "r") - def read_smeter(): - raw = self.pipe_files["smeter_pipe"].readline() + raw = self.pipes["smeter_pipe"].readline() if len(raw) == 0: return None else: @@ -725,11 +771,8 @@ class dsp(object): self.output.send_output("smeter", read_smeter) if self.has_pipe("meta_pipe"): - # TODO make digiham output unicode and then change this here - self.pipe_files["meta_pipe"] = open(self.pipes["meta_pipe"], "r", encoding="cp437") - def read_meta(): - raw = self.pipe_files["meta_pipe"].readline() + raw = self.pipes["meta_pipe"].readline() if len(raw) == 0: return None else: @@ -737,6 +780,10 @@ class dsp(object): self.output.send_output("meta", read_meta) + if self.csdr_dynamic_bufsize: + self.process.stdout.read(8) # dummy read to skip bufsize & preamble + logger.debug("Note: CSDR_DYNAMIC_BUFSIZE_ON = 1") + def stop(self): with self.modification_lock: self.running = False diff --git a/owrx/fft.py b/owrx/fft.py index 246f110..b1c61de 100644 --- a/owrx/fft.py +++ b/owrx/fft.py @@ -66,10 +66,6 @@ class SpectrumThread(csdr.output): return t == "audio" def receive_output(self, type, read_fn): - if self.props["csdr_dynamic_bufsize"]: - read_fn(8) # dummy read to skip bufsize & preamble - logger.debug("Note: CSDR_DYNAMIC_BUFSIZE_ON = 1") - threading.Thread(target=self.pump(read_fn, self.sdrSource.writeSpectrumData)).start() def stop(self):