attempt to reduce cpu usage by pre-selecting parts of the spectrum with
resamplers
This commit is contained in:
		@@ -1,9 +1,11 @@
 | 
				
			|||||||
import threading
 | 
					import threading
 | 
				
			||||||
 | 
					import socket
 | 
				
			||||||
from owrx.source import SdrService
 | 
					from owrx.source import SdrService
 | 
				
			||||||
from owrx.bands import Bandplan
 | 
					from owrx.bands import Bandplan
 | 
				
			||||||
from csdr import dsp, output
 | 
					from csdr import dsp, output
 | 
				
			||||||
from owrx.wsjt import WsjtParser
 | 
					from owrx.wsjt import WsjtParser
 | 
				
			||||||
from owrx.config import PropertyManager
 | 
					from owrx.config import PropertyManager
 | 
				
			||||||
 | 
					from owrx.source import Resampler
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import logging
 | 
					import logging
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -63,28 +65,110 @@ class ServiceHandler(object):
 | 
				
			|||||||
        self.startupTimer = threading.Timer(10, self.updateServices)
 | 
					        self.startupTimer = threading.Timer(10, self.updateServices)
 | 
				
			||||||
        self.startupTimer.start()
 | 
					        self.startupTimer.start()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def getAvailablePort(self):
 | 
				
			||||||
 | 
					        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 | 
				
			||||||
 | 
					        s.bind(("", 0))
 | 
				
			||||||
 | 
					        s.listen(1)
 | 
				
			||||||
 | 
					        port = s.getsockname()[1]
 | 
				
			||||||
 | 
					        s.close()
 | 
				
			||||||
 | 
					        return port
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def updateServices(self):
 | 
					    def updateServices(self):
 | 
				
			||||||
        logger.debug("re-scheduling services due to sdr changes")
 | 
					        logger.debug("re-scheduling services due to sdr changes")
 | 
				
			||||||
        self.stopServices()
 | 
					        self.stopServices()
 | 
				
			||||||
        cf = self.source.getProps()["center_freq"]
 | 
					        cf = self.source.getProps()["center_freq"]
 | 
				
			||||||
        srh = self.source.getProps()["samp_rate"] / 2
 | 
					        sr = self.source.getProps()["samp_rate"]
 | 
				
			||||||
 | 
					        srh = sr / 2
 | 
				
			||||||
        frequency_range = (cf - srh, cf + srh)
 | 
					        frequency_range = (cf - srh, cf + srh)
 | 
				
			||||||
        self.services = [
 | 
					
 | 
				
			||||||
            self.setupService(dial["mode"], dial["frequency"])
 | 
					        dials = [
 | 
				
			||||||
 | 
					            dial
 | 
				
			||||||
            for dial in Bandplan.getSharedInstance().collectDialFrequencies(frequency_range)
 | 
					            for dial in Bandplan.getSharedInstance().collectDialFrequencies(frequency_range)
 | 
				
			||||||
            if self.isSupported(dial["mode"])
 | 
					            if self.isSupported(dial["mode"])
 | 
				
			||||||
        ]
 | 
					        ]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def setupService(self, mode, frequency):
 | 
					        if not dials:
 | 
				
			||||||
 | 
					            logger.debug("no services available")
 | 
				
			||||||
 | 
					            return
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        self.services = []
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        for group in self.optimizeResampling(dials, sr):
 | 
				
			||||||
 | 
					            frequencies = sorted([f["frequency"] for f in group])
 | 
				
			||||||
 | 
					            min = frequencies[0]
 | 
				
			||||||
 | 
					            max = frequencies[-1]
 | 
				
			||||||
 | 
					            cf = (min + max) / 2
 | 
				
			||||||
 | 
					            bw = max - min
 | 
				
			||||||
 | 
					            logger.debug("group center frequency: {0}, bandwidth: {1}".format(cf, bw))
 | 
				
			||||||
 | 
					            resampler_props = PropertyManager()
 | 
				
			||||||
 | 
					            resampler_props["center_freq"] = cf
 | 
				
			||||||
 | 
					            # TODO the + 24000 is a temporary fix since the resampling optimizer does not account for required bandwidths
 | 
				
			||||||
 | 
					            resampler_props["samp_rate"] = bw + 24000
 | 
				
			||||||
 | 
					            resampler = Resampler(resampler_props, self.getAvailablePort(), self.source)
 | 
				
			||||||
 | 
					            resampler.start()
 | 
				
			||||||
 | 
					            self.services.append(resampler)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            for dial in group:
 | 
				
			||||||
 | 
					                self.services.append(self.setupService(dial["mode"], dial["frequency"], resampler))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def optimizeResampling(self, freqs, bandwidth):
 | 
				
			||||||
 | 
					        freqs = sorted(freqs, key=lambda f: f["frequency"])
 | 
				
			||||||
 | 
					        distances = [{
 | 
				
			||||||
 | 
					            "frequency": freqs[i]["frequency"],
 | 
				
			||||||
 | 
					            "distance": freqs[i+1]["frequency"] - freqs[i]["frequency"],
 | 
				
			||||||
 | 
					        } for i in range(0, len(freqs)-1)]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        distances = [d for d in distances if d["distance"] > 0]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        distances = sorted(distances, key=lambda f: f["distance"], reverse=True)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        def calculate_usage(num_splits):
 | 
				
			||||||
 | 
					            splits = sorted([f["frequency"] for f in distances[0:num_splits]])
 | 
				
			||||||
 | 
					            previous = 0
 | 
				
			||||||
 | 
					            groups = []
 | 
				
			||||||
 | 
					            for split in splits:
 | 
				
			||||||
 | 
					                groups.append([f for f in freqs if previous < f["frequency"] <= split])
 | 
				
			||||||
 | 
					                previous = split
 | 
				
			||||||
 | 
					            groups.append([f for f in freqs if previous < f["frequency"]])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            def get_bandwitdh(group):
 | 
				
			||||||
 | 
					                freqs = sorted([f["frequency"] for f in group])
 | 
				
			||||||
 | 
					                # the group will process the full BW once, plus the reduced BW once for each group member
 | 
				
			||||||
 | 
					                return bandwidth + len(group) * (freqs[-1] - freqs[0] + 24000)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            total_bandwidth = sum([get_bandwitdh(group) for group in groups])
 | 
				
			||||||
 | 
					            return {
 | 
				
			||||||
 | 
					                "num_splits": num_splits,
 | 
				
			||||||
 | 
					                "total_bandwidth": total_bandwidth,
 | 
				
			||||||
 | 
					                "groups": groups,
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        usages = [calculate_usage(i) for i in range(0, len(freqs))]
 | 
				
			||||||
 | 
					        # this is simulating no resampling. i haven't seen this as the best result yet
 | 
				
			||||||
 | 
					        usages += [{
 | 
				
			||||||
 | 
					            "num_splits": None,
 | 
				
			||||||
 | 
					            "total_bandwidth": bandwidth * len(freqs),
 | 
				
			||||||
 | 
					            "groups": [freqs]
 | 
				
			||||||
 | 
					        }]
 | 
				
			||||||
 | 
					        results = sorted(usages, key=lambda f: f["total_bandwidth"])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        for r in results:
 | 
				
			||||||
 | 
					            logger.debug("splits: {0}, total: {1}".format(r["num_splits"], r["total_bandwidth"]))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        return results[0]["groups"]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def setupService(self, mode, frequency, source):
 | 
				
			||||||
        logger.debug("setting up service {0} on frequency {1}".format(mode, frequency))
 | 
					        logger.debug("setting up service {0} on frequency {1}".format(mode, frequency))
 | 
				
			||||||
        d = dsp(ServiceOutput(frequency))
 | 
					        d = dsp(ServiceOutput(frequency))
 | 
				
			||||||
        d.nc_port = self.source.getPort()
 | 
					        d.nc_port = source.getPort()
 | 
				
			||||||
        d.set_offset_freq(frequency - self.source.getProps()["center_freq"])
 | 
					        d.set_offset_freq(frequency - source.getProps()["center_freq"])
 | 
				
			||||||
        d.set_demodulator("usb")
 | 
					        d.set_demodulator("usb")
 | 
				
			||||||
        d.set_bpf(0, 3000)
 | 
					        d.set_bpf(0, 3000)
 | 
				
			||||||
        d.set_secondary_demodulator(mode)
 | 
					        d.set_secondary_demodulator(mode)
 | 
				
			||||||
        d.set_audio_compression("none")
 | 
					        d.set_audio_compression("none")
 | 
				
			||||||
        d.set_samp_rate(self.source.getProps()["samp_rate"])
 | 
					        d.set_samp_rate(source.getProps()["samp_rate"])
 | 
				
			||||||
        d.start()
 | 
					        d.start()
 | 
				
			||||||
        return d
 | 
					        return d
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -274,6 +274,93 @@ class SdrSource(object):
 | 
				
			|||||||
            c.write_spectrum_data(data)
 | 
					            c.write_spectrum_data(data)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class Resampler(SdrSource):
 | 
				
			||||||
 | 
					    def __init__(self, props, port, sdr):
 | 
				
			||||||
 | 
					        sdrProps = sdr.getProps()
 | 
				
			||||||
 | 
					        self.shift = (sdrProps["center_freq"] - props["center_freq"]) / sdrProps["samp_rate"]
 | 
				
			||||||
 | 
					        self.decimation = int(float(sdrProps["samp_rate"]) / props["samp_rate"])
 | 
				
			||||||
 | 
					        if_samp_rate = sdrProps["samp_rate"] / self.decimation
 | 
				
			||||||
 | 
					        self.transition_bw = 0.15 * (if_samp_rate / float(sdrProps["samp_rate"]))
 | 
				
			||||||
 | 
					        props["samp_rate"] = if_samp_rate
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        self.sdr = sdr
 | 
				
			||||||
 | 
					        super().__init__(props, port)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def start(self):
 | 
				
			||||||
 | 
					        self.modificationLock.acquire()
 | 
				
			||||||
 | 
					        if self.monitor:
 | 
				
			||||||
 | 
					            self.modificationLock.release()
 | 
				
			||||||
 | 
					            return
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        props = self.rtlProps
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        resampler_command = [
 | 
				
			||||||
 | 
					            "nc -v 127.0.0.1 {nc_port}".format(nc_port=self.sdr.getPort()),
 | 
				
			||||||
 | 
					            "csdr shift_addition_cc {shift}".format(shift=self.shift),
 | 
				
			||||||
 | 
					            "csdr fir_decimate_cc {decimation} {ddc_transition_bw} HAMMING".format(
 | 
				
			||||||
 | 
					                decimation=self.decimation,
 | 
				
			||||||
 | 
					                ddc_transition_bw=self.transition_bw,
 | 
				
			||||||
 | 
					            ),
 | 
				
			||||||
 | 
					        ]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        nmux_bufcnt = nmux_bufsize = 0
 | 
				
			||||||
 | 
					        while nmux_bufsize < props["samp_rate"] / 4:
 | 
				
			||||||
 | 
					            nmux_bufsize += 4096
 | 
				
			||||||
 | 
					        while nmux_bufsize * nmux_bufcnt < props["nmux_memory"] * 1e6:
 | 
				
			||||||
 | 
					            nmux_bufcnt += 1
 | 
				
			||||||
 | 
					        if nmux_bufcnt == 0 or nmux_bufsize == 0:
 | 
				
			||||||
 | 
					            logger.error(
 | 
				
			||||||
 | 
					                "Error: nmux_bufsize or nmux_bufcnt is zero. These depend on nmux_memory and samp_rate options in config_webrx.py"
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
 | 
					            self.modificationLock.release()
 | 
				
			||||||
 | 
					            return
 | 
				
			||||||
 | 
					        logger.debug("nmux_bufsize = %d, nmux_bufcnt = %d" % (nmux_bufsize, nmux_bufcnt))
 | 
				
			||||||
 | 
					        resampler_command += ["nmux --bufsize %d --bufcnt %d --port %d --address 127.0.0.1" % (
 | 
				
			||||||
 | 
					            nmux_bufsize,
 | 
				
			||||||
 | 
					            nmux_bufcnt,
 | 
				
			||||||
 | 
					            self.port,
 | 
				
			||||||
 | 
					        )]
 | 
				
			||||||
 | 
					        cmd = " | ".join(resampler_command)
 | 
				
			||||||
 | 
					        logger.debug("resampler command: %s", cmd)
 | 
				
			||||||
 | 
					        self.process = subprocess.Popen(cmd, shell=True, preexec_fn=os.setpgrp)
 | 
				
			||||||
 | 
					        logger.info("Started resampler source: " + cmd)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        available = False
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        def wait_for_process_to_end():
 | 
				
			||||||
 | 
					            rc = self.process.wait()
 | 
				
			||||||
 | 
					            logger.debug("shut down with RC={0}".format(rc))
 | 
				
			||||||
 | 
					            self.monitor = None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        self.monitor = threading.Thread(target=wait_for_process_to_end)
 | 
				
			||||||
 | 
					        self.monitor.start()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        retries = 1000
 | 
				
			||||||
 | 
					        while retries > 0:
 | 
				
			||||||
 | 
					            retries -= 1
 | 
				
			||||||
 | 
					            if self.monitor is None:
 | 
				
			||||||
 | 
					                break
 | 
				
			||||||
 | 
					            testsock = socket.socket()
 | 
				
			||||||
 | 
					            try:
 | 
				
			||||||
 | 
					                testsock.connect(("127.0.0.1", self.getPort()))
 | 
				
			||||||
 | 
					                testsock.close()
 | 
				
			||||||
 | 
					                available = True
 | 
				
			||||||
 | 
					                break
 | 
				
			||||||
 | 
					            except:
 | 
				
			||||||
 | 
					                time.sleep(0.1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        self.modificationLock.release()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if not available:
 | 
				
			||||||
 | 
					            raise SdrSourceException("resampler source failed to start up")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        for c in self.clients:
 | 
				
			||||||
 | 
					            c.onSdrAvailable()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def activateProfile(self, profile_id=None):
 | 
				
			||||||
 | 
					        pass
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class RtlSdrSource(SdrSource):
 | 
					class RtlSdrSource(SdrSource):
 | 
				
			||||||
    def getCommand(self):
 | 
					    def getCommand(self):
 | 
				
			||||||
        return "rtl_sdr -s {samp_rate} -f {center_freq} -p {ppm} -g {rf_gain} -"
 | 
					        return "rtl_sdr -s {samp_rate} -f {center_freq} -p {ppm} -g {rf_gain} -"
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user