refactor dsp outputs
add digimode metadata
This commit is contained in:
		
							
								
								
									
										54
									
								
								csdr.py
									
									
									
									
									
								
							
							
						
						
									
										54
									
								
								csdr.py
									
									
									
									
									
								
							| @@ -25,13 +25,20 @@ import time | ||||
| import os | ||||
| import signal | ||||
| import threading | ||||
| from functools import partial | ||||
|  | ||||
| import logging | ||||
| logger = logging.getLogger(__name__) | ||||
|  | ||||
| class dsp: | ||||
| class output(object): | ||||
|     def add_output(self, type, read_fn): | ||||
|         pass | ||||
|     def reset(self): | ||||
|         pass | ||||
|  | ||||
|     def __init__(self): | ||||
| class dsp(object): | ||||
|  | ||||
|     def __init__(self, output): | ||||
|         self.samp_rate = 250000 | ||||
|         self.output_rate = 11025 #this is default, and cannot be set at the moment | ||||
|         self.fft_size = 1024 | ||||
| @@ -64,6 +71,7 @@ class dsp: | ||||
|         self.secondary_pipe_names=["secondary_shift_pipe"] | ||||
|         self.secondary_offset_freq = 1000 | ||||
|         self.modification_lock = threading.Lock() | ||||
|         self.output = output | ||||
|  | ||||
|     def chain(self,which): | ||||
|         if which in [ "dmr", "dstar", "nxdn", "ysf" ]: | ||||
| @@ -191,6 +199,9 @@ class dsp: | ||||
|         logger.debug("[openwebrx-dsp-plugin:csdr] Popen on secondary command (demod)") #TODO digimodes | ||||
|         self.secondary_processes_running = True | ||||
|  | ||||
|         self.output.add_output("secondary_fft", partial(self.secondary_process_fft.stdout.read, int(self.get_secondary_fft_bytes_to_read()))) | ||||
|         self.output.add_output("secondary_demod", partial(self.secondary_process_demod.stdout.read, 1)) | ||||
|  | ||||
|         #open control pipes for csdr and send initialization data | ||||
|         if self.secondary_shift_pipe != None: #TODO digimodes | ||||
|             self.secondary_shift_pipe_file=open(self.secondary_shift_pipe,"w") #TODO digimodes | ||||
| @@ -219,12 +230,6 @@ class dsp: | ||||
|                 pass | ||||
|         self.secondary_processes_running = False | ||||
|  | ||||
|     def read_secondary_demod(self, size): | ||||
|         return self.secondary_process_demod.stdout.read(size) | ||||
|  | ||||
|     def read_secondary_fft(self, size): | ||||
|         return self.secondary_process_fft.stdout.read(size) | ||||
|  | ||||
|     def get_secondary_demodulator(self): | ||||
|         return self.secondary_demodulator | ||||
|  | ||||
| @@ -322,20 +327,6 @@ class dsp: | ||||
|             self.squelch_pipe_file.flush() | ||||
|             self.modification_lock.release() | ||||
|  | ||||
|     def get_smeter_level(self): | ||||
|         if self.running: | ||||
|             line=self.smeter_pipe_file.readline() | ||||
|             try: | ||||
|                 return float(line[:-1]) | ||||
|             except ValueError: | ||||
|                 return 0 | ||||
|         else: | ||||
|             time.sleep(1) | ||||
|  | ||||
|     def get_metadata(self): | ||||
|         if self.running and self.meta_pipe: | ||||
|             return self.meta_pipe_file.readline() | ||||
|  | ||||
|     def mkfifo(self,path): | ||||
|         try: | ||||
|             os.unlink(path) | ||||
| @@ -398,6 +389,8 @@ class dsp: | ||||
|  | ||||
|         threading.Thread(target = watch_thread).start() | ||||
|  | ||||
|         self.output.add_output("audio", partial(self.process.stdout.read, int(self.get_fft_bytes_to_read()) if self.demodulator == "fft" else 256)) | ||||
|  | ||||
|         # open control pipes for csdr | ||||
|         if self.bpf_pipe != None: | ||||
|             self.bpf_pipe_file=open(self.bpf_pipe,"w") | ||||
| @@ -419,11 +412,22 @@ class dsp: | ||||
|             self.set_bpf(self.low_cut, self.high_cut) | ||||
|         if self.smeter_pipe: | ||||
|             self.smeter_pipe_file=open(self.smeter_pipe,"r") | ||||
|             def read_smeter(): | ||||
|                 raw = self.smeter_pipe_file.readline() | ||||
|                 if len(raw) == 0: | ||||
|                     return None | ||||
|                 else: | ||||
|                     return float(raw.rstrip("\n")) | ||||
|             self.output.add_output("smeter", read_smeter) | ||||
|         if self.meta_pipe != None: | ||||
|             self.meta_pipe_file=open(self.meta_pipe,"r") | ||||
|  | ||||
|     def read(self,size): | ||||
|         return self.process.stdout.read(size) | ||||
|             def read_meta(): | ||||
|                 raw = self.meta_pipe_file.readline() | ||||
|                 if len(raw) == 0: | ||||
|                     return None | ||||
|                 else: | ||||
|                     return raw.rstrip("\n") | ||||
|             self.output.add_output("meta", read_meta) | ||||
|  | ||||
|     def stop(self): | ||||
|         self.modification_lock.acquire() | ||||
|   | ||||
| @@ -1238,6 +1238,9 @@ function on_ws_recv(evt) | ||||
| 							$('[data-feature="' + feature + '"')[json.value[feature] ? "show" : "hide"](); | ||||
| 						} | ||||
| 					break; | ||||
| 					case "metadata": | ||||
| 						update_metadata(json.value); | ||||
| 					break; | ||||
|                     default: | ||||
|                         console.warn('received message of unknown type: ' + json.type); | ||||
| 		        } | ||||
| @@ -1303,6 +1306,55 @@ function on_ws_recv(evt) | ||||
|     } | ||||
| } | ||||
|  | ||||
| function update_metadata(stringData) { | ||||
|     var metaPanels = Array.prototype.filter.call(document.getElementsByClassName('openwebrx-panel'), function(el) { | ||||
|         return el.dataset.panelName === 'metadata'; | ||||
|     }); | ||||
|  | ||||
|     var meta = {}; | ||||
|     stringData.split(";").forEach(function(s) { | ||||
|         var item = s.split(":"); | ||||
|         meta[item[0]] = item[1]; | ||||
|     }); | ||||
|  | ||||
|     var update = function(el) { | ||||
|         el.innerHTML = ""; | ||||
|     }; | ||||
|     if (meta.protocol) switch (meta.protocol) { | ||||
|         case 'DMR': | ||||
|             if (meta.slot) { | ||||
|                 var html = 'Timeslot: ' + meta.slot; | ||||
|                 if (meta.type) html += ' Typ: ' + meta.type; | ||||
|                 if (meta.source && meta.target) html += ' Source: ' + meta.source + ' Target: ' + meta.target; | ||||
|                 update = function(el) { | ||||
|                     var slotEl = el.getElementsByClassName('slot-' + meta.slot); | ||||
|                     if (!slotEl.length) { | ||||
|                         slotEl = document.createElement('div'); | ||||
|                         slotEl.className = 'slot-' + meta.slot; | ||||
|                         el.appendChild(slotEl); | ||||
|                     } else { | ||||
|                         slotEl = slotEl[0]; | ||||
|                     } | ||||
|                     slotEl.innerHTML = html; | ||||
|                 }; | ||||
|             } | ||||
|             break; | ||||
|         case 'YSF': | ||||
|             var strings = []; | ||||
|             if (meta.source) strings.push("Source: " + meta.source); | ||||
|             if (meta.target) strings.push("Destination: " + meta.target); | ||||
|             if (meta.up) strings.push("Up: " + meta.up); | ||||
|             if (meta.down) strings.push("Down: " + meta.down); | ||||
|             var html = strings.join(' '); | ||||
|             update = function(el) { | ||||
|                 el.innerHTML = html; | ||||
|             } | ||||
|             break; | ||||
|     } | ||||
|  | ||||
|     metaPanels.forEach(update); | ||||
| } | ||||
|  | ||||
| function add_problem(what) | ||||
| { | ||||
| 	problems_span=e("openwebrx-problems"); | ||||
|   | ||||
| @@ -127,6 +127,8 @@ class OpenWebRxClient(object): | ||||
|         self.protected_send({"type":"profiles","value":profiles}) | ||||
|     def write_features(self, features): | ||||
|         self.protected_send({"type":"features","value":features}) | ||||
|     def write_metadata(self, metadata): | ||||
|         self.protected_send({"type":"metadata","value":metadata}) | ||||
|  | ||||
| class WebSocketMessageHandler(object): | ||||
|     def __init__(self): | ||||
|   | ||||
							
								
								
									
										114
									
								
								owrx/source.py
									
									
									
									
									
								
							
							
						
						
									
										114
									
								
								owrx/source.py
									
									
									
									
									
								
							| @@ -169,9 +169,6 @@ class SdrSource(object): | ||||
|         self.monitor = threading.Thread(target = wait_for_process_to_end) | ||||
|         self.monitor.start() | ||||
|  | ||||
|         self.spectrumThread = SpectrumThread(self) | ||||
|         self.spectrumThread.start() | ||||
|  | ||||
|         self.modificationLock.release() | ||||
|  | ||||
|         for c in self.clients: | ||||
| @@ -186,9 +183,6 @@ class SdrSource(object): | ||||
|  | ||||
|         self.modificationLock.acquire() | ||||
|  | ||||
|         if self.spectrumThread is not None: | ||||
|             self.spectrumThread.stop() | ||||
|  | ||||
|         if self.process is not None: | ||||
|             try: | ||||
|                 os.killpg(os.getpgid(self.process.pid), signal.SIGTERM) | ||||
| @@ -216,12 +210,18 @@ class SdrSource(object): | ||||
|  | ||||
|     def addSpectrumClient(self, c): | ||||
|         self.spectrumClients.append(c) | ||||
|         if self.spectrumThread is None: | ||||
|             self.spectrumThread = SpectrumThread(self) | ||||
|             self.spectrumThread.start() | ||||
|  | ||||
|     def removeSpectrumClient(self, c): | ||||
|         try: | ||||
|             self.spectrumClients.remove(c) | ||||
|         except ValueError: | ||||
|             pass | ||||
|         if not self.spectrumClients and self.spectrumThread is not None: | ||||
|             self.spectrumThread.stop() | ||||
|             self.spectrumThread = None | ||||
|  | ||||
|     def writeSpectrumData(self, data): | ||||
|         for c in self.spectrumClients: | ||||
| @@ -249,19 +249,18 @@ class SdrplaySource(SdrSource): | ||||
|     def sleepOnRestart(self): | ||||
|         time.sleep(1) | ||||
|  | ||||
| class SpectrumThread(threading.Thread): | ||||
| class SpectrumThread(csdr.output): | ||||
|     def __init__(self, sdrSource): | ||||
|         self.doRun = True | ||||
|         self.sdrSource = sdrSource | ||||
|         super().__init__() | ||||
|  | ||||
|     def run(self): | ||||
|     def start(self): | ||||
|         props = self.sdrSource.props.collect( | ||||
|             "samp_rate", "fft_size", "fft_fps", "fft_voverlap_factor", "fft_compression", | ||||
|             "csdr_dynamic_bufsize", "csdr_print_bufsizes", "csdr_through" | ||||
|         ).defaults(PropertyManager.getSharedInstance()) | ||||
|  | ||||
|         self.dsp = dsp = csdr.dsp() | ||||
|         self.dsp = dsp = csdr.dsp(self) | ||||
|         dsp.nc_port = self.sdrSource.getPort() | ||||
|         dsp.set_demodulator("fft") | ||||
|         props.getProperty("samp_rate").wire(dsp.set_samp_rate) | ||||
| @@ -288,25 +287,27 @@ class SpectrumThread(threading.Thread): | ||||
|             dsp.read(8) #dummy read to skip bufsize & preamble | ||||
|             logger.debug("Note: CSDR_DYNAMIC_BUFSIZE_ON = 1") | ||||
|         logger.debug("Spectrum thread started.") | ||||
|         bytes_to_read=int(dsp.get_fft_bytes_to_read()) | ||||
|         while self.doRun: | ||||
|             data=dsp.read(bytes_to_read) | ||||
|             if len(data) == 0: | ||||
|                 time.sleep(1) | ||||
|             else: | ||||
|                 self.sdrSource.writeSpectrumData(data) | ||||
|  | ||||
|         dsp.stop() | ||||
|         logger.debug("spectrum thread shut down") | ||||
|     def add_output(self, type, read_fn): | ||||
|         if type != "audio": | ||||
|             logger.error("unsupported output type received by FFT: %s", type) | ||||
|             return | ||||
|  | ||||
|         self.thread = None | ||||
|         self.sdrSource.removeClient(self) | ||||
|         def pipe(): | ||||
|             run = True | ||||
|             while run: | ||||
|                 data = read_fn() | ||||
|                 if len(data) == 0: | ||||
|                     run = False | ||||
|                 else: | ||||
|                     self.sdrSource.writeSpectrumData(data) | ||||
|  | ||||
|         threading.Thread(target = pipe).start() | ||||
|  | ||||
|     def stop(self): | ||||
|         logger.debug("stopping spectrum thread") | ||||
|         self.doRun = False | ||||
|         self.dsp.stop() | ||||
|  | ||||
| class DspManager(object): | ||||
| class DspManager(csdr.output): | ||||
|     def __init__(self, handler, sdrSource): | ||||
|         self.doRun = False | ||||
|         self.handler = handler | ||||
| @@ -319,7 +320,7 @@ class DspManager(object): | ||||
|             "csdr_print_bufsizes", "csdr_through", "digimodes_enable", "samp_rate" | ||||
|         ).defaults(PropertyManager.getSharedInstance()) | ||||
|  | ||||
|         self.dsp = csdr.dsp() | ||||
|         self.dsp = csdr.dsp(self) | ||||
|         #dsp_initialized=False | ||||
|         self.localProps.getProperty("audio_compression").wire(self.dsp.set_audio_compression) | ||||
|         self.localProps.getProperty("fft_compression").wire(self.dsp.set_fft_compression) | ||||
| @@ -356,7 +357,6 @@ class DspManager(object): | ||||
|             def set_secondary_mod(mod): | ||||
|                 if mod == False: mod = None | ||||
|                 if self.dsp.get_secondary_demodulator() == mod: return | ||||
|                 self.stopSecondaryThreads() | ||||
|                 self.dsp.stop() | ||||
|                 self.dsp.set_secondary_demodulator(mod) | ||||
|                 if mod is not None: | ||||
| @@ -367,9 +367,6 @@ class DspManager(object): | ||||
|                     }) | ||||
|                 self.dsp.start() | ||||
|  | ||||
|                 if mod: | ||||
|                     self.startSecondaryThreads() | ||||
|  | ||||
|             self.localProps.getProperty("secondary_mod").wire(set_secondary_mod) | ||||
|  | ||||
|             self.localProps.getProperty("secondary_offset_freq").wire(self.dsp.set_secondary_offset_freq) | ||||
| @@ -380,47 +377,34 @@ class DspManager(object): | ||||
|         self.doRun = self.sdrSource.isAvailable() | ||||
|         if self.doRun: | ||||
|             self.dsp.start() | ||||
|             threading.Thread(target = self.readDspOutput).start() | ||||
|             threading.Thread(target = self.readSMeterOutput).start() | ||||
|  | ||||
|     def startSecondaryThreads(self): | ||||
|         self.runSecondary = True | ||||
|         self.secondaryDemodThread = threading.Thread(target = self.readSecondaryDemod) | ||||
|         self.secondaryDemodThread.start() | ||||
|         self.secondaryFftThread = threading.Thread(target = self.readSecondaryFft) | ||||
|         self.secondaryFftThread.start() | ||||
|     def add_output(self, t, read_fn): | ||||
|         logger.debug("adding new output of type %s", t) | ||||
|         writers = { | ||||
|             "audio": self.handler.write_dsp_data, | ||||
|             "smeter": self.handler.write_s_meter_level, | ||||
|             "secondary_fft": self.handler.write_secondary_fft, | ||||
|             "secondary_demod": self.handler.write_secondary_demod, | ||||
|             "meta": self.handler.write_metadata | ||||
|         } | ||||
|         write = writers[t] | ||||
|  | ||||
|     def stopSecondaryThreads(self): | ||||
|         self.runSecondary = False | ||||
|         self.secondaryDemodThread = None | ||||
|         self.secondaryFftThread = None | ||||
|         def pump(read, write): | ||||
|             def copy(): | ||||
|                 run = True | ||||
|                 while run: | ||||
|                     data = read() | ||||
|                     if data is None or (isinstance(data, bytes) and len(data) == 0): | ||||
|                         logger.warning("zero read on {0}".format(t)) | ||||
|                         run = False | ||||
|                     else: | ||||
|                         write(data) | ||||
|             return copy | ||||
|  | ||||
|     def readDspOutput(self): | ||||
|         while (self.doRun): | ||||
|             data = self.dsp.read(256) | ||||
|             if len(data) != 256: | ||||
|                 time.sleep(1) | ||||
|             else: | ||||
|                 self.handler.write_dsp_data(data) | ||||
|  | ||||
|     def readSMeterOutput(self): | ||||
|         while (self.doRun): | ||||
|             level = self.dsp.get_smeter_level() | ||||
|             self.handler.write_s_meter_level(level) | ||||
|  | ||||
|     def readSecondaryDemod(self): | ||||
|         while (self.runSecondary): | ||||
|             data = self.dsp.read_secondary_demod(1) | ||||
|             self.handler.write_secondary_demod(data) | ||||
|  | ||||
|     def readSecondaryFft(self): | ||||
|         while (self.runSecondary): | ||||
|             data = self.dsp.read_secondary_fft(int(self.dsp.get_secondary_fft_bytes_to_read())) | ||||
|             self.handler.write_secondary_fft(data) | ||||
|         threading.Thread(target=pump(read_fn, write)).start() | ||||
|  | ||||
|     def stop(self): | ||||
|         self.doRun = False | ||||
|         self.runSecondary = False | ||||
|         self.dsp.stop() | ||||
|         self.sdrSource.removeClient(self) | ||||
|  | ||||
| @@ -433,8 +417,6 @@ class DspManager(object): | ||||
|             self.doRun = True | ||||
|         if self.dsp is not None: | ||||
|             self.dsp.start() | ||||
|             threading.Thread(target = self.readDspOutput).start() | ||||
|             threading.Thread(target = self.readSMeterOutput).start() | ||||
|  | ||||
|     def onSdrUnavailable(self): | ||||
|         logger.debug("received onSdrUnavailable, shutting down DspSource") | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Jakob Ketterl
					Jakob Ketterl