introduce subscription concept to simplify unsubscribing from events

This commit is contained in:
Jakob Ketterl 2019-05-18 21:38:15 +02:00
parent ff8f03c983
commit e6150e4aca
4 changed files with 97 additions and 54 deletions

View File

@ -137,7 +137,10 @@ class dsp(object):
"CSDR_FIXED_BUFSIZE=1 csdr psk31_varicode_decoder_u8_u8" "CSDR_FIXED_BUFSIZE=1 csdr psk31_varicode_decoder_u8_u8"
def set_secondary_demodulator(self, what): def set_secondary_demodulator(self, what):
if self.get_secondary_demodulator() == what:
return
self.secondary_demodulator = what self.secondary_demodulator = what
self.restart()
def secondary_fft_block_size(self): def secondary_fft_block_size(self):
return (self.samp_rate/self.decimation)/(self.fft_fps*2) #*2 is there because we do FFT on real signal here return (self.samp_rate/self.decimation)/(self.fft_fps*2) #*2 is there because we do FFT on real signal here

View File

@ -2,25 +2,49 @@ import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class Subscription(object):
def __init__(self, subscriptee, subscriber):
self.subscriptee = subscriptee
self.subscriber = subscriber
def call(self, *args, **kwargs):
self.subscriber(*args, **kwargs)
def cancel(self):
self.subscriptee.unwire(self)
class Property(object): class Property(object):
def __init__(self, value = None): def __init__(self, value = None):
self.value = value self.value = value
self.callbacks = [] self.subscribers = []
def getValue(self): def getValue(self):
return self.value return self.value
def setValue(self, value): def setValue(self, value):
if (self.value == value): if (self.value == value):
return self return self
self.value = value self.value = value
for c in self.callbacks: for c in self.subscribers:
try: try:
c(self.value) c.call(self.value)
except Exception as e: except Exception as e:
logger.exception(e) logger.exception(e)
return self return self
def wire(self, callback): def wire(self, callback):
self.callbacks.append(callback) sub = Subscription(self, callback)
if not self.value is None: callback(self.value) self.subscribers.append(sub)
if not self.value is None: sub.call(self.value)
return sub
def unwire(self, sub):
try:
self.subscribers.remove(sub)
except ValueError:
# happens when already removed before
pass
return self return self
class PropertyManager(object): class PropertyManager(object):
@ -36,7 +60,7 @@ class PropertyManager(object):
def __init__(self, properties = None): def __init__(self, properties = None):
self.properties = {} self.properties = {}
self.callbacks = [] self.subscribers = []
if properties is not None: if properties is not None:
for (name, prop) in properties.items(): for (name, prop) in properties.items():
self.add(name, prop) self.add(name, prop)
@ -44,9 +68,9 @@ class PropertyManager(object):
def add(self, name, prop): def add(self, name, prop):
self.properties[name] = prop self.properties[name] = prop
def fireCallbacks(value): def fireCallbacks(value):
for c in self.callbacks: for c in self.subscribers:
try: try:
c(name, value) c.call(name, value)
except Exception as e: except Exception as e:
logger.exception(e) logger.exception(e)
prop.wire(fireCallbacks) prop.wire(fireCallbacks)
@ -78,11 +102,16 @@ class PropertyManager(object):
return self.getProperty(name).getValue() return self.getProperty(name).getValue()
def wire(self, callback): def wire(self, callback):
self.callbacks.append(callback) sub = Subscription(self, callback)
return self self.subscribers.append(sub)
return sub
def unwire(self, callback): def unwire(self, sub):
self.callbacks.remove(callback) try:
self.subscribers.remove(sub)
except ValueError:
# happens when already removed before
pass
return self return self
def defaults(self, other_pm): def defaults(self, other_pm):

View File

@ -19,7 +19,7 @@ class OpenWebRxClient(object):
self.dsp = None self.dsp = None
self.sdr = None self.sdr = None
self.configProps = None self.configSub = None
pm = PropertyManager.getSharedInstance() pm = PropertyManager.getSharedInstance()
@ -39,11 +39,6 @@ class OpenWebRxClient(object):
CpuUsageThread.getSharedInstance().add_client(self) CpuUsageThread.getSharedInstance().add_client(self)
def sendConfig(self, key, value):
config = dict((key, self.configProps[key]) for key in OpenWebRxClient.config_keys)
# TODO mathematical properties? hmmmm
config["start_offset_freq"] = self.configProps["start_freq"] - self.configProps["center_freq"]
self.write_config(config)
def setSdr(self, id = None): def setSdr(self, id = None):
next = SdrService.getSource(id) next = SdrService.getSource(id)
if (next == self.sdr): if (next == self.sdr):
@ -51,16 +46,23 @@ class OpenWebRxClient(object):
self.stopDsp() self.stopDsp()
if self.configProps is not None: if self.configSub is not None:
self.configProps.unwire(self.sendConfig) self.configSub.cancel()
self.configSub = None
self.sdr = next self.sdr = next
# send initial config # send initial config
self.configProps = self.sdr.getProps().collect(*OpenWebRxClient.config_keys).defaults(PropertyManager.getSharedInstance()) configProps = self.sdr.getProps().collect(*OpenWebRxClient.config_keys).defaults(PropertyManager.getSharedInstance())
self.configProps.wire(self.sendConfig) def sendConfig(key, value):
self.sendConfig(None, None) config = dict((key, configProps[key]) for key in OpenWebRxClient.config_keys)
# TODO mathematical properties? hmmmm
config["start_offset_freq"] = configProps["start_freq"] - configProps["center_freq"]
self.write_config(config)
self.configSub = configProps.wire(sendConfig)
sendConfig(None, None)
self.sdr.addSpectrumClient(self) self.sdr.addSpectrumClient(self)
@ -73,6 +75,9 @@ class OpenWebRxClient(object):
self.stopDsp() self.stopDsp()
CpuUsageThread.getSharedInstance().remove_client(self) CpuUsageThread.getSharedInstance().remove_client(self)
ClientRegistry.getSharedInstance().removeClient(self) ClientRegistry.getSharedInstance().removeClient(self)
if self.configSub is not None:
self.configSub.cancel()
self.configSub = None
self.conn.close() self.conn.close()
logger.debug("connection closed") logger.debug("connection closed")

View File

@ -261,10 +261,6 @@ class SpectrumThread(csdr.output):
self.dsp = dsp = csdr.dsp(self) self.dsp = dsp = csdr.dsp(self)
dsp.nc_port = self.sdrSource.getPort() dsp.nc_port = self.sdrSource.getPort()
dsp.set_demodulator("fft") dsp.set_demodulator("fft")
props.getProperty("samp_rate").wire(dsp.set_samp_rate)
props.getProperty("fft_size").wire(dsp.set_fft_size)
props.getProperty("fft_fps").wire(dsp.set_fft_fps)
props.getProperty("fft_compression").wire(dsp.set_fft_compression)
def set_fft_averages(key, value): def set_fft_averages(key, value):
samp_rate = props["samp_rate"] samp_rate = props["samp_rate"]
@ -273,7 +269,15 @@ class SpectrumThread(csdr.output):
fft_voverlap_factor = props["fft_voverlap_factor"] fft_voverlap_factor = props["fft_voverlap_factor"]
dsp.set_fft_averages(int(round(1.0 * samp_rate / fft_size / fft_fps / (1.0 - fft_voverlap_factor))) if fft_voverlap_factor>0 else 0) dsp.set_fft_averages(int(round(1.0 * samp_rate / fft_size / fft_fps / (1.0 - fft_voverlap_factor))) if fft_voverlap_factor>0 else 0)
props.collect("samp_rate", "fft_size", "fft_fps", "fft_voverlap_factor").wire(set_fft_averages)
self.subscriptions = [
props.getProperty("samp_rate").wire(dsp.set_samp_rate),
props.getProperty("fft_size").wire(dsp.set_fft_size),
props.getProperty("fft_fps").wire(dsp.set_fft_fps),
props.getProperty("fft_compression").wire(dsp.set_fft_compression),
props.collect("samp_rate", "fft_size", "fft_fps", "fft_voverlap_factor").wire(set_fft_averages)
]
set_fft_averages(None, None) set_fft_averages(None, None)
dsp.csdr_dynamic_bufsize = props["csdr_dynamic_bufsize"] dsp.csdr_dynamic_bufsize = props["csdr_dynamic_bufsize"]
@ -309,6 +313,9 @@ class SpectrumThread(csdr.output):
def stop(self): def stop(self):
self.dsp.stop() self.dsp.stop()
self.sdrSource.removeClient(self) self.sdrSource.removeClient(self)
for c in self.subscriptions:
c.cancel()
self.subscriptions = []
def onSdrAvailable(self): def onSdrAvailable(self):
self.dsp.start() self.dsp.start()
@ -326,43 +333,40 @@ class DspManager(csdr.output):
).defaults(PropertyManager.getSharedInstance()) ).defaults(PropertyManager.getSharedInstance())
self.dsp = csdr.dsp(self) self.dsp = csdr.dsp(self)
#dsp_initialized=False
self.localProps.getProperty("audio_compression").wire(self.dsp.set_audio_compression)
self.localProps.getProperty("fft_compression").wire(self.dsp.set_fft_compression)
self.dsp.set_offset_freq(0)
self.dsp.set_bpf(-4000,4000)
self.localProps.getProperty("digimodes_fft_size").wire(self.dsp.set_secondary_fft_size)
self.dsp.nc_port = self.sdrSource.getPort() self.dsp.nc_port = self.sdrSource.getPort()
self.dsp.csdr_dynamic_bufsize = self.localProps["csdr_dynamic_bufsize"]
self.dsp.csdr_print_bufsizes = self.localProps["csdr_print_bufsizes"]
self.dsp.csdr_through = self.localProps["csdr_through"]
self.localProps.getProperty("samp_rate").wire(self.dsp.set_samp_rate)
self.localProps.getProperty("output_rate").wire(self.dsp.set_output_rate)
self.localProps.getProperty("offset_freq").wire(self.dsp.set_offset_freq)
self.localProps.getProperty("squelch_level").wire(self.dsp.set_squelch_level)
def set_low_cut(cut): def set_low_cut(cut):
bpf = self.dsp.get_bpf() bpf = self.dsp.get_bpf()
bpf[0] = cut bpf[0] = cut
self.dsp.set_bpf(*bpf) self.dsp.set_bpf(*bpf)
self.localProps.getProperty("low_cut").wire(set_low_cut)
def set_high_cut(cut): def set_high_cut(cut):
bpf = self.dsp.get_bpf() bpf = self.dsp.get_bpf()
bpf[1] = cut bpf[1] = cut
self.dsp.set_bpf(*bpf) self.dsp.set_bpf(*bpf)
self.localProps.getProperty("high_cut").wire(set_high_cut)
self.localProps.getProperty("mod").wire(self.dsp.set_demodulator) self.subscriptions = [
self.localProps.getProperty("audio_compression").wire(self.dsp.set_audio_compression),
self.localProps.getProperty("fft_compression").wire(self.dsp.set_fft_compression),
self.localProps.getProperty("digimodes_fft_size").wire(self.dsp.set_secondary_fft_size),
self.localProps.getProperty("samp_rate").wire(self.dsp.set_samp_rate),
self.localProps.getProperty("output_rate").wire(self.dsp.set_output_rate),
self.localProps.getProperty("offset_freq").wire(self.dsp.set_offset_freq),
self.localProps.getProperty("squelch_level").wire(self.dsp.set_squelch_level),
self.localProps.getProperty("low_cut").wire(set_low_cut),
self.localProps.getProperty("high_cut").wire(set_high_cut),
self.localProps.getProperty("mod").wire(self.dsp.set_demodulator)
]
self.dsp.set_offset_freq(0)
self.dsp.set_bpf(-4000,4000)
self.dsp.csdr_dynamic_bufsize = self.localProps["csdr_dynamic_bufsize"]
self.dsp.csdr_print_bufsizes = self.localProps["csdr_print_bufsizes"]
self.dsp.csdr_through = self.localProps["csdr_through"]
if (self.localProps["digimodes_enable"]): if (self.localProps["digimodes_enable"]):
def set_secondary_mod(mod): def set_secondary_mod(mod):
if mod == False: mod = None if mod == False: mod = None
if self.dsp.get_secondary_demodulator() == mod: return
self.dsp.stop()
self.dsp.set_secondary_demodulator(mod) self.dsp.set_secondary_demodulator(mod)
if mod is not None: if mod is not None:
self.handler.write_secondary_dsp_config({ self.handler.write_secondary_dsp_config({
@ -370,11 +374,10 @@ class DspManager(csdr.output):
"if_samp_rate":self.dsp.if_samp_rate(), "if_samp_rate":self.dsp.if_samp_rate(),
"secondary_bw":self.dsp.secondary_bw() "secondary_bw":self.dsp.secondary_bw()
}) })
self.dsp.start() self.subscriptions += [
self.localProps.getProperty("secondary_mod").wire(set_secondary_mod),
self.localProps.getProperty("secondary_mod").wire(set_secondary_mod) self.localProps.getProperty("secondary_offset_freq").wire(self.dsp.set_secondary_offset_freq)
]
self.localProps.getProperty("secondary_offset_freq").wire(self.dsp.set_secondary_offset_freq)
self.sdrSource.addClient(self) self.sdrSource.addClient(self)
@ -412,6 +415,9 @@ class DspManager(csdr.output):
def stop(self): def stop(self):
self.dsp.stop() self.dsp.stop()
self.sdrSource.removeClient(self) self.sdrSource.removeClient(self)
for sub in self.subscriptions:
sub.cancel()
self.subscriptions = []
def setProperty(self, prop, value): def setProperty(self, prop, value):
self.localProps.getProperty(prop).setValue(value) self.localProps.getProperty(prop).setValue(value)