diff --git a/csdr.py b/csdr.py index f921239..c542197 100644 --- a/csdr.py +++ b/csdr.py @@ -131,11 +131,10 @@ class dsp(object): if self.fft_compression == "adpcm": chain += ["csdr compress_fft_adpcm_f_u8 {fft_size}"] return chain - chain += [ - "csdr shift_addition_cc --fifo {shift_pipe}", - "csdr fir_decimate_cc {decimation} {ddc_transition_bw} HAMMING", - "csdr bandpass_fir_fft_cc --fifo {bpf_pipe} {bpf_transition_bw} HAMMING", - ] + chain += ["csdr shift_addition_cc --fifo {shift_pipe}"] + if self.decimation > 1: + chain += ["csdr fir_decimate_cc {decimation} {ddc_transition_bw} HAMMING"] + chain += ["csdr bandpass_fir_fft_cc --fifo {bpf_pipe} {bpf_transition_bw} HAMMING"] if self.output.supports_type("smeter"): chain += [ "csdr squelch_and_smeter_cc --fifo {squelch_pipe} --outfifo {smeter_pipe} 5 {smeter_report_every}" diff --git a/owrx/service.py b/owrx/service.py index c2d41fa..213d335 100644 --- a/owrx/service.py +++ b/owrx/service.py @@ -1,10 +1,12 @@ import threading +import socket from owrx.source import SdrService from owrx.bands import Bandplan from csdr import dsp, output from owrx.wsjt import WsjtParser from owrx.aprs import AprsParser from owrx.config import PropertyManager +from owrx.source import Resampler import logging @@ -81,19 +83,101 @@ class ServiceHandler(object): self.startupTimer = threading.Timer(10, self.updateServices) self.startupTimer.start() + def getAvailablePort(self): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.bind(("", 0)) + s.listen(1) + port = s.getsockname()[1] + s.close() + return port + def updateServices(self): logger.debug("re-scheduling services due to sdr changes") self.stopServices() cf = self.source.getProps()["center_freq"] - srh = self.source.getProps()["samp_rate"] / 2 + sr = self.source.getProps()["samp_rate"] + srh = sr / 2 frequency_range = (cf - srh, cf + srh) - self.services = [ - self.setupService(dial["mode"], dial["frequency"]) + + dials = [ + dial for dial in Bandplan.getSharedInstance().collectDialFrequencies(frequency_range) if self.isSupported(dial["mode"]) ] - def setupService(self, mode, frequency): + if not dials: + logger.debug("no services available") + return + + self.services = [] + + for group in self.optimizeResampling(dials, sr): + frequencies = sorted([f["frequency"] for f in group]) + min = frequencies[0] + max = frequencies[-1] + cf = (min + max) / 2 + bw = max - min + logger.debug("group center frequency: {0}, bandwidth: {1}".format(cf, bw)) + resampler_props = PropertyManager() + resampler_props["center_freq"] = cf + # TODO the + 24000 is a temporary fix since the resampling optimizer does not account for required bandwidths + resampler_props["samp_rate"] = bw + 24000 + resampler = Resampler(resampler_props, self.getAvailablePort(), self.source) + resampler.start() + self.services.append(resampler) + + for dial in group: + self.services.append(self.setupService(dial["mode"], dial["frequency"], resampler)) + + def optimizeResampling(self, freqs, bandwidth): + freqs = sorted(freqs, key=lambda f: f["frequency"]) + distances = [{ + "frequency": freqs[i]["frequency"], + "distance": freqs[i+1]["frequency"] - freqs[i]["frequency"], + } for i in range(0, len(freqs)-1)] + + distances = [d for d in distances if d["distance"] > 0] + + distances = sorted(distances, key=lambda f: f["distance"], reverse=True) + + + def calculate_usage(num_splits): + splits = sorted([f["frequency"] for f in distances[0:num_splits]]) + previous = 0 + groups = [] + for split in splits: + groups.append([f for f in freqs if previous < f["frequency"] <= split]) + previous = split + groups.append([f for f in freqs if previous < f["frequency"]]) + + def get_bandwitdh(group): + freqs = sorted([f["frequency"] for f in group]) + # the group will process the full BW once, plus the reduced BW once for each group member + return bandwidth + len(group) * (freqs[-1] - freqs[0] + 24000) + + total_bandwidth = sum([get_bandwitdh(group) for group in groups]) + return { + "num_splits": num_splits, + "total_bandwidth": total_bandwidth, + "groups": groups, + } + + + usages = [calculate_usage(i) for i in range(0, len(freqs))] + # this is simulating no resampling. i haven't seen this as the best result yet + usages += [{ + "num_splits": None, + "total_bandwidth": bandwidth * len(freqs), + "groups": [freqs] + }] + results = sorted(usages, key=lambda f: f["total_bandwidth"]) + + for r in results: + logger.debug("splits: {0}, total: {1}".format(r["num_splits"], r["total_bandwidth"])) + + return results[0]["groups"] + + def setupService(self, mode, frequency, source): logger.debug("setting up service {0} on frequency {1}".format(mode, frequency)) # TODO selecting outputs will need some more intelligence here if mode == "packet": @@ -101,8 +185,8 @@ class ServiceHandler(object): else: output = WsjtServiceOutput(frequency) d = dsp(output) - d.nc_port = self.source.getPort() - d.set_offset_freq(frequency - self.source.getProps()["center_freq"]) + d.nc_port = source.getPort() + d.set_offset_freq(frequency - source.getProps()["center_freq"]) if mode == "packet": d.set_demodulator("nfm") d.set_bpf(-4000, 4000) @@ -111,7 +195,7 @@ class ServiceHandler(object): d.set_bpf(0, 3000) d.set_secondary_demodulator(mode) d.set_audio_compression("none") - d.set_samp_rate(self.source.getProps()["samp_rate"]) + d.set_samp_rate(source.getProps()["samp_rate"]) d.start() return d diff --git a/owrx/source.py b/owrx/source.py index e867af6..8a1be3f 100644 --- a/owrx/source.py +++ b/owrx/source.py @@ -275,6 +275,93 @@ class SdrSource(object): c.write_spectrum_data(data) +class Resampler(SdrSource): + def __init__(self, props, port, sdr): + sdrProps = sdr.getProps() + self.shift = (sdrProps["center_freq"] - props["center_freq"]) / sdrProps["samp_rate"] + self.decimation = int(float(sdrProps["samp_rate"]) / props["samp_rate"]) + if_samp_rate = sdrProps["samp_rate"] / self.decimation + self.transition_bw = 0.15 * (if_samp_rate / float(sdrProps["samp_rate"])) + props["samp_rate"] = if_samp_rate + + self.sdr = sdr + super().__init__(props, port) + + def start(self): + self.modificationLock.acquire() + if self.monitor: + self.modificationLock.release() + return + + props = self.rtlProps + + resampler_command = [ + "nc -v 127.0.0.1 {nc_port}".format(nc_port=self.sdr.getPort()), + "csdr shift_addition_cc {shift}".format(shift=self.shift), + "csdr fir_decimate_cc {decimation} {ddc_transition_bw} HAMMING".format( + decimation=self.decimation, + ddc_transition_bw=self.transition_bw, + ), + ] + + nmux_bufcnt = nmux_bufsize = 0 + while nmux_bufsize < props["samp_rate"] / 4: + nmux_bufsize += 4096 + while nmux_bufsize * nmux_bufcnt < props["nmux_memory"] * 1e6: + nmux_bufcnt += 1 + if nmux_bufcnt == 0 or nmux_bufsize == 0: + logger.error( + "Error: nmux_bufsize or nmux_bufcnt is zero. These depend on nmux_memory and samp_rate options in config_webrx.py" + ) + self.modificationLock.release() + return + logger.debug("nmux_bufsize = %d, nmux_bufcnt = %d" % (nmux_bufsize, nmux_bufcnt)) + resampler_command += ["nmux --bufsize %d --bufcnt %d --port %d --address 127.0.0.1" % ( + nmux_bufsize, + nmux_bufcnt, + self.port, + )] + cmd = " | ".join(resampler_command) + logger.debug("resampler command: %s", cmd) + self.process = subprocess.Popen(cmd, shell=True, preexec_fn=os.setpgrp) + logger.info("Started resampler source: " + cmd) + + available = False + + def wait_for_process_to_end(): + rc = self.process.wait() + logger.debug("shut down with RC={0}".format(rc)) + self.monitor = None + + self.monitor = threading.Thread(target=wait_for_process_to_end) + self.monitor.start() + + retries = 1000 + while retries > 0: + retries -= 1 + if self.monitor is None: + break + testsock = socket.socket() + try: + testsock.connect(("127.0.0.1", self.getPort())) + testsock.close() + available = True + break + except: + time.sleep(0.1) + + self.modificationLock.release() + + if not available: + raise SdrSourceException("resampler source failed to start up") + + for c in self.clients: + c.onSdrAvailable() + + def activateProfile(self, profile_id=None): + pass + + class RtlSdrSource(SdrSource): def getCommand(self): return "rtl_sdr -s {samp_rate} -f {center_freq} -p {ppm} -g {rf_gain} -"