diff --git a/csdr.py b/csdr.py index 51d5034..c0ea54e 100755 --- a/csdr.py +++ b/csdr.py @@ -25,13 +25,20 @@ import time import os import signal import threading +from functools import partial import logging logger = logging.getLogger(__name__) -class dsp: +class output(object): + def add_output(self, type, read_fn): + pass + def reset(self): + pass - def __init__(self): +class dsp(object): + + def __init__(self, output): self.samp_rate = 250000 self.output_rate = 11025 #this is default, and cannot be set at the moment self.fft_size = 1024 @@ -64,6 +71,7 @@ class dsp: self.secondary_pipe_names=["secondary_shift_pipe"] self.secondary_offset_freq = 1000 self.modification_lock = threading.Lock() + self.output = output def chain(self,which): if which in [ "dmr", "dstar", "nxdn", "ysf" ]: @@ -191,6 +199,9 @@ class dsp: logger.debug("[openwebrx-dsp-plugin:csdr] Popen on secondary command (demod)") #TODO digimodes self.secondary_processes_running = True + self.output.add_output("secondary_fft", partial(self.secondary_process_fft.stdout.read, int(self.get_secondary_fft_bytes_to_read()))) + self.output.add_output("secondary_demod", partial(self.secondary_process_demod.stdout.read, 1)) + #open control pipes for csdr and send initialization data if self.secondary_shift_pipe != None: #TODO digimodes self.secondary_shift_pipe_file=open(self.secondary_shift_pipe,"w") #TODO digimodes @@ -219,12 +230,6 @@ class dsp: pass self.secondary_processes_running = False - def read_secondary_demod(self, size): - return self.secondary_process_demod.stdout.read(size) - - def read_secondary_fft(self, size): - return self.secondary_process_fft.stdout.read(size) - def get_secondary_demodulator(self): return self.secondary_demodulator @@ -322,20 +327,6 @@ class dsp: self.squelch_pipe_file.flush() self.modification_lock.release() - def get_smeter_level(self): - if self.running: - line=self.smeter_pipe_file.readline() - try: - return float(line[:-1]) - except ValueError: - return 0 - else: - time.sleep(1) - - def get_metadata(self): - if self.running and self.meta_pipe: - return self.meta_pipe_file.readline() - def mkfifo(self,path): try: os.unlink(path) @@ -398,6 +389,8 @@ class dsp: threading.Thread(target = watch_thread).start() + self.output.add_output("audio", partial(self.process.stdout.read, int(self.get_fft_bytes_to_read()) if self.demodulator == "fft" else 256)) + # open control pipes for csdr if self.bpf_pipe != None: self.bpf_pipe_file=open(self.bpf_pipe,"w") @@ -419,11 +412,22 @@ class dsp: self.set_bpf(self.low_cut, self.high_cut) if self.smeter_pipe: self.smeter_pipe_file=open(self.smeter_pipe,"r") + def read_smeter(): + raw = self.smeter_pipe_file.readline() + if len(raw) == 0: + return None + else: + return float(raw.rstrip("\n")) + self.output.add_output("smeter", read_smeter) if self.meta_pipe != None: self.meta_pipe_file=open(self.meta_pipe,"r") - - def read(self,size): - return self.process.stdout.read(size) + def read_meta(): + raw = self.meta_pipe_file.readline() + if len(raw) == 0: + return None + else: + return raw.rstrip("\n") + self.output.add_output("meta", read_meta) def stop(self): self.modification_lock.acquire() diff --git a/htdocs/openwebrx.js b/htdocs/openwebrx.js index 8610778..0ef67bb 100644 --- a/htdocs/openwebrx.js +++ b/htdocs/openwebrx.js @@ -1238,6 +1238,9 @@ function on_ws_recv(evt) $('[data-feature="' + feature + '"')[json.value[feature] ? "show" : "hide"](); } break; + case "metadata": + update_metadata(json.value); + break; default: console.warn('received message of unknown type: ' + json.type); } @@ -1303,6 +1306,55 @@ function on_ws_recv(evt) } } +function update_metadata(stringData) { + var metaPanels = Array.prototype.filter.call(document.getElementsByClassName('openwebrx-panel'), function(el) { + return el.dataset.panelName === 'metadata'; + }); + + var meta = {}; + stringData.split(";").forEach(function(s) { + var item = s.split(":"); + meta[item[0]] = item[1]; + }); + + var update = function(el) { + el.innerHTML = ""; + }; + if (meta.protocol) switch (meta.protocol) { + case 'DMR': + if (meta.slot) { + var html = 'Timeslot: ' + meta.slot; + if (meta.type) html += ' Typ: ' + meta.type; + if (meta.source && meta.target) html += ' Source: ' + meta.source + ' Target: ' + meta.target; + update = function(el) { + var slotEl = el.getElementsByClassName('slot-' + meta.slot); + if (!slotEl.length) { + slotEl = document.createElement('div'); + slotEl.className = 'slot-' + meta.slot; + el.appendChild(slotEl); + } else { + slotEl = slotEl[0]; + } + slotEl.innerHTML = html; + }; + } + break; + case 'YSF': + var strings = []; + if (meta.source) strings.push("Source: " + meta.source); + if (meta.target) strings.push("Destination: " + meta.target); + if (meta.up) strings.push("Up: " + meta.up); + if (meta.down) strings.push("Down: " + meta.down); + var html = strings.join(' '); + update = function(el) { + el.innerHTML = html; + } + break; + } + + metaPanels.forEach(update); +} + function add_problem(what) { problems_span=e("openwebrx-problems"); diff --git a/owrx/connection.py b/owrx/connection.py index 346f56d..76a93a4 100644 --- a/owrx/connection.py +++ b/owrx/connection.py @@ -127,6 +127,8 @@ class OpenWebRxClient(object): self.protected_send({"type":"profiles","value":profiles}) def write_features(self, features): self.protected_send({"type":"features","value":features}) + def write_metadata(self, metadata): + self.protected_send({"type":"metadata","value":metadata}) class WebSocketMessageHandler(object): def __init__(self): diff --git a/owrx/source.py b/owrx/source.py index 3efc7d4..f2be0af 100644 --- a/owrx/source.py +++ b/owrx/source.py @@ -169,9 +169,6 @@ class SdrSource(object): self.monitor = threading.Thread(target = wait_for_process_to_end) self.monitor.start() - self.spectrumThread = SpectrumThread(self) - self.spectrumThread.start() - self.modificationLock.release() for c in self.clients: @@ -186,9 +183,6 @@ class SdrSource(object): self.modificationLock.acquire() - if self.spectrumThread is not None: - self.spectrumThread.stop() - if self.process is not None: try: os.killpg(os.getpgid(self.process.pid), signal.SIGTERM) @@ -216,12 +210,18 @@ class SdrSource(object): def addSpectrumClient(self, c): self.spectrumClients.append(c) + if self.spectrumThread is None: + self.spectrumThread = SpectrumThread(self) + self.spectrumThread.start() def removeSpectrumClient(self, c): try: self.spectrumClients.remove(c) except ValueError: pass + if not self.spectrumClients and self.spectrumThread is not None: + self.spectrumThread.stop() + self.spectrumThread = None def writeSpectrumData(self, data): for c in self.spectrumClients: @@ -249,19 +249,18 @@ class SdrplaySource(SdrSource): def sleepOnRestart(self): time.sleep(1) -class SpectrumThread(threading.Thread): +class SpectrumThread(csdr.output): def __init__(self, sdrSource): - self.doRun = True self.sdrSource = sdrSource super().__init__() - def run(self): + def start(self): props = self.sdrSource.props.collect( "samp_rate", "fft_size", "fft_fps", "fft_voverlap_factor", "fft_compression", "csdr_dynamic_bufsize", "csdr_print_bufsizes", "csdr_through" ).defaults(PropertyManager.getSharedInstance()) - self.dsp = dsp = csdr.dsp() + self.dsp = dsp = csdr.dsp(self) dsp.nc_port = self.sdrSource.getPort() dsp.set_demodulator("fft") props.getProperty("samp_rate").wire(dsp.set_samp_rate) @@ -288,25 +287,27 @@ class SpectrumThread(threading.Thread): dsp.read(8) #dummy read to skip bufsize & preamble logger.debug("Note: CSDR_DYNAMIC_BUFSIZE_ON = 1") logger.debug("Spectrum thread started.") - bytes_to_read=int(dsp.get_fft_bytes_to_read()) - while self.doRun: - data=dsp.read(bytes_to_read) - if len(data) == 0: - time.sleep(1) - else: - self.sdrSource.writeSpectrumData(data) - dsp.stop() - logger.debug("spectrum thread shut down") + def add_output(self, type, read_fn): + if type != "audio": + logger.error("unsupported output type received by FFT: %s", type) + return - self.thread = None - self.sdrSource.removeClient(self) + def pipe(): + run = True + while run: + data = read_fn() + if len(data) == 0: + run = False + else: + self.sdrSource.writeSpectrumData(data) + + threading.Thread(target = pipe).start() def stop(self): - logger.debug("stopping spectrum thread") - self.doRun = False + self.dsp.stop() -class DspManager(object): +class DspManager(csdr.output): def __init__(self, handler, sdrSource): self.doRun = False self.handler = handler @@ -319,7 +320,7 @@ class DspManager(object): "csdr_print_bufsizes", "csdr_through", "digimodes_enable", "samp_rate" ).defaults(PropertyManager.getSharedInstance()) - self.dsp = csdr.dsp() + self.dsp = csdr.dsp(self) #dsp_initialized=False self.localProps.getProperty("audio_compression").wire(self.dsp.set_audio_compression) self.localProps.getProperty("fft_compression").wire(self.dsp.set_fft_compression) @@ -356,7 +357,6 @@ class DspManager(object): def set_secondary_mod(mod): if mod == False: mod = None if self.dsp.get_secondary_demodulator() == mod: return - self.stopSecondaryThreads() self.dsp.stop() self.dsp.set_secondary_demodulator(mod) if mod is not None: @@ -367,9 +367,6 @@ class DspManager(object): }) self.dsp.start() - if mod: - self.startSecondaryThreads() - self.localProps.getProperty("secondary_mod").wire(set_secondary_mod) self.localProps.getProperty("secondary_offset_freq").wire(self.dsp.set_secondary_offset_freq) @@ -380,47 +377,34 @@ class DspManager(object): self.doRun = self.sdrSource.isAvailable() if self.doRun: self.dsp.start() - threading.Thread(target = self.readDspOutput).start() - threading.Thread(target = self.readSMeterOutput).start() - def startSecondaryThreads(self): - self.runSecondary = True - self.secondaryDemodThread = threading.Thread(target = self.readSecondaryDemod) - self.secondaryDemodThread.start() - self.secondaryFftThread = threading.Thread(target = self.readSecondaryFft) - self.secondaryFftThread.start() + def add_output(self, t, read_fn): + logger.debug("adding new output of type %s", t) + writers = { + "audio": self.handler.write_dsp_data, + "smeter": self.handler.write_s_meter_level, + "secondary_fft": self.handler.write_secondary_fft, + "secondary_demod": self.handler.write_secondary_demod, + "meta": self.handler.write_metadata + } + write = writers[t] - def stopSecondaryThreads(self): - self.runSecondary = False - self.secondaryDemodThread = None - self.secondaryFftThread = None + def pump(read, write): + def copy(): + run = True + while run: + data = read() + if data is None or (isinstance(data, bytes) and len(data) == 0): + logger.warning("zero read on {0}".format(t)) + run = False + else: + write(data) + return copy - def readDspOutput(self): - while (self.doRun): - data = self.dsp.read(256) - if len(data) != 256: - time.sleep(1) - else: - self.handler.write_dsp_data(data) - - def readSMeterOutput(self): - while (self.doRun): - level = self.dsp.get_smeter_level() - self.handler.write_s_meter_level(level) - - def readSecondaryDemod(self): - while (self.runSecondary): - data = self.dsp.read_secondary_demod(1) - self.handler.write_secondary_demod(data) - - def readSecondaryFft(self): - while (self.runSecondary): - data = self.dsp.read_secondary_fft(int(self.dsp.get_secondary_fft_bytes_to_read())) - self.handler.write_secondary_fft(data) + threading.Thread(target=pump(read_fn, write)).start() def stop(self): self.doRun = False - self.runSecondary = False self.dsp.stop() self.sdrSource.removeClient(self) @@ -433,8 +417,6 @@ class DspManager(object): self.doRun = True if self.dsp is not None: self.dsp.start() - threading.Thread(target = self.readDspOutput).start() - threading.Thread(target = self.readSMeterOutput).start() def onSdrUnavailable(self): logger.debug("received onSdrUnavailable, shutting down DspSource")