From e6150e4aca5a5a3fecf416820d6e70c2e6443ed4 Mon Sep 17 00:00:00 2001 From: Jakob Ketterl Date: Sat, 18 May 2019 21:38:15 +0200 Subject: [PATCH] introduce subscription concept to simplify unsubscribing from events --- csdr.py | 3 ++ owrx/config.py | 53 ++++++++++++++++++++++++++++-------- owrx/connection.py | 27 ++++++++++-------- owrx/source.py | 68 +++++++++++++++++++++++++--------------------- 4 files changed, 97 insertions(+), 54 deletions(-) diff --git a/csdr.py b/csdr.py index 77b017b..6ce0ea8 100755 --- a/csdr.py +++ b/csdr.py @@ -137,7 +137,10 @@ class dsp(object): "CSDR_FIXED_BUFSIZE=1 csdr psk31_varicode_decoder_u8_u8" def set_secondary_demodulator(self, what): + if self.get_secondary_demodulator() == what: + return self.secondary_demodulator = what + self.restart() def secondary_fft_block_size(self): return (self.samp_rate/self.decimation)/(self.fft_fps*2) #*2 is there because we do FFT on real signal here diff --git a/owrx/config.py b/owrx/config.py index 8fb6513..f3608ce 100644 --- a/owrx/config.py +++ b/owrx/config.py @@ -2,25 +2,49 @@ import logging logger = logging.getLogger(__name__) +class Subscription(object): + def __init__(self, subscriptee, subscriber): + self.subscriptee = subscriptee + self.subscriber = subscriber + + def call(self, *args, **kwargs): + self.subscriber(*args, **kwargs) + + def cancel(self): + self.subscriptee.unwire(self) + + class Property(object): def __init__(self, value = None): self.value = value - self.callbacks = [] + self.subscribers = [] + def getValue(self): return self.value + def setValue(self, value): if (self.value == value): return self self.value = value - for c in self.callbacks: + for c in self.subscribers: try: - c(self.value) + c.call(self.value) except Exception as e: logger.exception(e) return self + def wire(self, callback): - self.callbacks.append(callback) - if not self.value is None: callback(self.value) + sub = Subscription(self, callback) + self.subscribers.append(sub) + if not self.value is None: sub.call(self.value) + return sub + + def unwire(self, sub): + try: + self.subscribers.remove(sub) + except ValueError: + # happens when already removed before + pass return self class PropertyManager(object): @@ -36,7 +60,7 @@ class PropertyManager(object): def __init__(self, properties = None): self.properties = {} - self.callbacks = [] + self.subscribers = [] if properties is not None: for (name, prop) in properties.items(): self.add(name, prop) @@ -44,9 +68,9 @@ class PropertyManager(object): def add(self, name, prop): self.properties[name] = prop def fireCallbacks(value): - for c in self.callbacks: + for c in self.subscribers: try: - c(name, value) + c.call(name, value) except Exception as e: logger.exception(e) prop.wire(fireCallbacks) @@ -78,11 +102,16 @@ class PropertyManager(object): return self.getProperty(name).getValue() def wire(self, callback): - self.callbacks.append(callback) - return self + sub = Subscription(self, callback) + self.subscribers.append(sub) + return sub - def unwire(self, callback): - self.callbacks.remove(callback) + def unwire(self, sub): + try: + self.subscribers.remove(sub) + except ValueError: + # happens when already removed before + pass return self def defaults(self, other_pm): diff --git a/owrx/connection.py b/owrx/connection.py index 76a93a4..2a301d7 100644 --- a/owrx/connection.py +++ b/owrx/connection.py @@ -19,7 +19,7 @@ class OpenWebRxClient(object): self.dsp = None self.sdr = None - self.configProps = None + self.configSub = None pm = PropertyManager.getSharedInstance() @@ -39,11 +39,6 @@ class OpenWebRxClient(object): CpuUsageThread.getSharedInstance().add_client(self) - def sendConfig(self, key, value): - config = dict((key, self.configProps[key]) for key in OpenWebRxClient.config_keys) - # TODO mathematical properties? hmmmm - config["start_offset_freq"] = self.configProps["start_freq"] - self.configProps["center_freq"] - self.write_config(config) def setSdr(self, id = None): next = SdrService.getSource(id) if (next == self.sdr): @@ -51,16 +46,23 @@ class OpenWebRxClient(object): self.stopDsp() - if self.configProps is not None: - self.configProps.unwire(self.sendConfig) + if self.configSub is not None: + self.configSub.cancel() + self.configSub = None self.sdr = next # send initial config - self.configProps = self.sdr.getProps().collect(*OpenWebRxClient.config_keys).defaults(PropertyManager.getSharedInstance()) + configProps = self.sdr.getProps().collect(*OpenWebRxClient.config_keys).defaults(PropertyManager.getSharedInstance()) - self.configProps.wire(self.sendConfig) - self.sendConfig(None, None) + def sendConfig(key, value): + config = dict((key, configProps[key]) for key in OpenWebRxClient.config_keys) + # TODO mathematical properties? hmmmm + config["start_offset_freq"] = configProps["start_freq"] - configProps["center_freq"] + self.write_config(config) + + self.configSub = configProps.wire(sendConfig) + sendConfig(None, None) self.sdr.addSpectrumClient(self) @@ -73,6 +75,9 @@ class OpenWebRxClient(object): self.stopDsp() CpuUsageThread.getSharedInstance().remove_client(self) ClientRegistry.getSharedInstance().removeClient(self) + if self.configSub is not None: + self.configSub.cancel() + self.configSub = None self.conn.close() logger.debug("connection closed") diff --git a/owrx/source.py b/owrx/source.py index c5003d4..b5264b0 100644 --- a/owrx/source.py +++ b/owrx/source.py @@ -261,10 +261,6 @@ class SpectrumThread(csdr.output): self.dsp = dsp = csdr.dsp(self) dsp.nc_port = self.sdrSource.getPort() dsp.set_demodulator("fft") - props.getProperty("samp_rate").wire(dsp.set_samp_rate) - props.getProperty("fft_size").wire(dsp.set_fft_size) - props.getProperty("fft_fps").wire(dsp.set_fft_fps) - props.getProperty("fft_compression").wire(dsp.set_fft_compression) def set_fft_averages(key, value): samp_rate = props["samp_rate"] @@ -273,7 +269,15 @@ class SpectrumThread(csdr.output): fft_voverlap_factor = props["fft_voverlap_factor"] dsp.set_fft_averages(int(round(1.0 * samp_rate / fft_size / fft_fps / (1.0 - fft_voverlap_factor))) if fft_voverlap_factor>0 else 0) - props.collect("samp_rate", "fft_size", "fft_fps", "fft_voverlap_factor").wire(set_fft_averages) + + self.subscriptions = [ + props.getProperty("samp_rate").wire(dsp.set_samp_rate), + props.getProperty("fft_size").wire(dsp.set_fft_size), + props.getProperty("fft_fps").wire(dsp.set_fft_fps), + props.getProperty("fft_compression").wire(dsp.set_fft_compression), + props.collect("samp_rate", "fft_size", "fft_fps", "fft_voverlap_factor").wire(set_fft_averages) + ] + set_fft_averages(None, None) dsp.csdr_dynamic_bufsize = props["csdr_dynamic_bufsize"] @@ -309,6 +313,9 @@ class SpectrumThread(csdr.output): def stop(self): self.dsp.stop() self.sdrSource.removeClient(self) + for c in self.subscriptions: + c.cancel() + self.subscriptions = [] def onSdrAvailable(self): self.dsp.start() @@ -326,43 +333,40 @@ class DspManager(csdr.output): ).defaults(PropertyManager.getSharedInstance()) self.dsp = csdr.dsp(self) - #dsp_initialized=False - self.localProps.getProperty("audio_compression").wire(self.dsp.set_audio_compression) - self.localProps.getProperty("fft_compression").wire(self.dsp.set_fft_compression) - self.dsp.set_offset_freq(0) - self.dsp.set_bpf(-4000,4000) - self.localProps.getProperty("digimodes_fft_size").wire(self.dsp.set_secondary_fft_size) - self.dsp.nc_port = self.sdrSource.getPort() - self.dsp.csdr_dynamic_bufsize = self.localProps["csdr_dynamic_bufsize"] - self.dsp.csdr_print_bufsizes = self.localProps["csdr_print_bufsizes"] - self.dsp.csdr_through = self.localProps["csdr_through"] - - self.localProps.getProperty("samp_rate").wire(self.dsp.set_samp_rate) - - self.localProps.getProperty("output_rate").wire(self.dsp.set_output_rate) - self.localProps.getProperty("offset_freq").wire(self.dsp.set_offset_freq) - self.localProps.getProperty("squelch_level").wire(self.dsp.set_squelch_level) def set_low_cut(cut): bpf = self.dsp.get_bpf() bpf[0] = cut self.dsp.set_bpf(*bpf) - self.localProps.getProperty("low_cut").wire(set_low_cut) def set_high_cut(cut): bpf = self.dsp.get_bpf() bpf[1] = cut self.dsp.set_bpf(*bpf) - self.localProps.getProperty("high_cut").wire(set_high_cut) - self.localProps.getProperty("mod").wire(self.dsp.set_demodulator) + self.subscriptions = [ + self.localProps.getProperty("audio_compression").wire(self.dsp.set_audio_compression), + self.localProps.getProperty("fft_compression").wire(self.dsp.set_fft_compression), + self.localProps.getProperty("digimodes_fft_size").wire(self.dsp.set_secondary_fft_size), + self.localProps.getProperty("samp_rate").wire(self.dsp.set_samp_rate), + self.localProps.getProperty("output_rate").wire(self.dsp.set_output_rate), + self.localProps.getProperty("offset_freq").wire(self.dsp.set_offset_freq), + self.localProps.getProperty("squelch_level").wire(self.dsp.set_squelch_level), + self.localProps.getProperty("low_cut").wire(set_low_cut), + self.localProps.getProperty("high_cut").wire(set_high_cut), + self.localProps.getProperty("mod").wire(self.dsp.set_demodulator) + ] + + self.dsp.set_offset_freq(0) + self.dsp.set_bpf(-4000,4000) + self.dsp.csdr_dynamic_bufsize = self.localProps["csdr_dynamic_bufsize"] + self.dsp.csdr_print_bufsizes = self.localProps["csdr_print_bufsizes"] + self.dsp.csdr_through = self.localProps["csdr_through"] if (self.localProps["digimodes_enable"]): def set_secondary_mod(mod): if mod == False: mod = None - if self.dsp.get_secondary_demodulator() == mod: return - self.dsp.stop() self.dsp.set_secondary_demodulator(mod) if mod is not None: self.handler.write_secondary_dsp_config({ @@ -370,11 +374,10 @@ class DspManager(csdr.output): "if_samp_rate":self.dsp.if_samp_rate(), "secondary_bw":self.dsp.secondary_bw() }) - self.dsp.start() - - self.localProps.getProperty("secondary_mod").wire(set_secondary_mod) - - self.localProps.getProperty("secondary_offset_freq").wire(self.dsp.set_secondary_offset_freq) + self.subscriptions += [ + self.localProps.getProperty("secondary_mod").wire(set_secondary_mod), + self.localProps.getProperty("secondary_offset_freq").wire(self.dsp.set_secondary_offset_freq) + ] self.sdrSource.addClient(self) @@ -412,6 +415,9 @@ class DspManager(csdr.output): def stop(self): self.dsp.stop() self.sdrSource.removeClient(self) + for sub in self.subscriptions: + sub.cancel() + self.subscriptions = [] def setProperty(self, prop, value): self.localProps.getProperty(prop).setValue(value)