Merge branch 'develop' into packet
This commit is contained in:
commit
25a1d06dcb
9
csdr.py
9
csdr.py
@ -131,11 +131,10 @@ class dsp(object):
|
||||
if self.fft_compression == "adpcm":
|
||||
chain += ["csdr compress_fft_adpcm_f_u8 {fft_size}"]
|
||||
return chain
|
||||
chain += [
|
||||
"csdr shift_addition_cc --fifo {shift_pipe}",
|
||||
"csdr fir_decimate_cc {decimation} {ddc_transition_bw} HAMMING",
|
||||
"csdr bandpass_fir_fft_cc --fifo {bpf_pipe} {bpf_transition_bw} HAMMING",
|
||||
]
|
||||
chain += ["csdr shift_addition_cc --fifo {shift_pipe}"]
|
||||
if self.decimation > 1:
|
||||
chain += ["csdr fir_decimate_cc {decimation} {ddc_transition_bw} HAMMING"]
|
||||
chain += ["csdr bandpass_fir_fft_cc --fifo {bpf_pipe} {bpf_transition_bw} HAMMING"]
|
||||
if self.output.supports_type("smeter"):
|
||||
chain += [
|
||||
"csdr squelch_and_smeter_cc --fifo {squelch_pipe} --outfifo {smeter_pipe} 5 {smeter_report_every}"
|
||||
|
@ -1,10 +1,12 @@
|
||||
import threading
|
||||
import socket
|
||||
from owrx.source import SdrService
|
||||
from owrx.bands import Bandplan
|
||||
from csdr import dsp, output
|
||||
from owrx.wsjt import WsjtParser
|
||||
from owrx.aprs import AprsParser
|
||||
from owrx.config import PropertyManager
|
||||
from owrx.source import Resampler
|
||||
|
||||
import logging
|
||||
|
||||
@ -81,19 +83,101 @@ class ServiceHandler(object):
|
||||
self.startupTimer = threading.Timer(10, self.updateServices)
|
||||
self.startupTimer.start()
|
||||
|
||||
def getAvailablePort(self):
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
s.bind(("", 0))
|
||||
s.listen(1)
|
||||
port = s.getsockname()[1]
|
||||
s.close()
|
||||
return port
|
||||
|
||||
def updateServices(self):
|
||||
logger.debug("re-scheduling services due to sdr changes")
|
||||
self.stopServices()
|
||||
cf = self.source.getProps()["center_freq"]
|
||||
srh = self.source.getProps()["samp_rate"] / 2
|
||||
sr = self.source.getProps()["samp_rate"]
|
||||
srh = sr / 2
|
||||
frequency_range = (cf - srh, cf + srh)
|
||||
self.services = [
|
||||
self.setupService(dial["mode"], dial["frequency"])
|
||||
|
||||
dials = [
|
||||
dial
|
||||
for dial in Bandplan.getSharedInstance().collectDialFrequencies(frequency_range)
|
||||
if self.isSupported(dial["mode"])
|
||||
]
|
||||
|
||||
def setupService(self, mode, frequency):
|
||||
if not dials:
|
||||
logger.debug("no services available")
|
||||
return
|
||||
|
||||
self.services = []
|
||||
|
||||
for group in self.optimizeResampling(dials, sr):
|
||||
frequencies = sorted([f["frequency"] for f in group])
|
||||
min = frequencies[0]
|
||||
max = frequencies[-1]
|
||||
cf = (min + max) / 2
|
||||
bw = max - min
|
||||
logger.debug("group center frequency: {0}, bandwidth: {1}".format(cf, bw))
|
||||
resampler_props = PropertyManager()
|
||||
resampler_props["center_freq"] = cf
|
||||
# TODO the + 24000 is a temporary fix since the resampling optimizer does not account for required bandwidths
|
||||
resampler_props["samp_rate"] = bw + 24000
|
||||
resampler = Resampler(resampler_props, self.getAvailablePort(), self.source)
|
||||
resampler.start()
|
||||
self.services.append(resampler)
|
||||
|
||||
for dial in group:
|
||||
self.services.append(self.setupService(dial["mode"], dial["frequency"], resampler))
|
||||
|
||||
def optimizeResampling(self, freqs, bandwidth):
|
||||
freqs = sorted(freqs, key=lambda f: f["frequency"])
|
||||
distances = [{
|
||||
"frequency": freqs[i]["frequency"],
|
||||
"distance": freqs[i+1]["frequency"] - freqs[i]["frequency"],
|
||||
} for i in range(0, len(freqs)-1)]
|
||||
|
||||
distances = [d for d in distances if d["distance"] > 0]
|
||||
|
||||
distances = sorted(distances, key=lambda f: f["distance"], reverse=True)
|
||||
|
||||
|
||||
def calculate_usage(num_splits):
|
||||
splits = sorted([f["frequency"] for f in distances[0:num_splits]])
|
||||
previous = 0
|
||||
groups = []
|
||||
for split in splits:
|
||||
groups.append([f for f in freqs if previous < f["frequency"] <= split])
|
||||
previous = split
|
||||
groups.append([f for f in freqs if previous < f["frequency"]])
|
||||
|
||||
def get_bandwitdh(group):
|
||||
freqs = sorted([f["frequency"] for f in group])
|
||||
# the group will process the full BW once, plus the reduced BW once for each group member
|
||||
return bandwidth + len(group) * (freqs[-1] - freqs[0] + 24000)
|
||||
|
||||
total_bandwidth = sum([get_bandwitdh(group) for group in groups])
|
||||
return {
|
||||
"num_splits": num_splits,
|
||||
"total_bandwidth": total_bandwidth,
|
||||
"groups": groups,
|
||||
}
|
||||
|
||||
|
||||
usages = [calculate_usage(i) for i in range(0, len(freqs))]
|
||||
# this is simulating no resampling. i haven't seen this as the best result yet
|
||||
usages += [{
|
||||
"num_splits": None,
|
||||
"total_bandwidth": bandwidth * len(freqs),
|
||||
"groups": [freqs]
|
||||
}]
|
||||
results = sorted(usages, key=lambda f: f["total_bandwidth"])
|
||||
|
||||
for r in results:
|
||||
logger.debug("splits: {0}, total: {1}".format(r["num_splits"], r["total_bandwidth"]))
|
||||
|
||||
return results[0]["groups"]
|
||||
|
||||
def setupService(self, mode, frequency, source):
|
||||
logger.debug("setting up service {0} on frequency {1}".format(mode, frequency))
|
||||
# TODO selecting outputs will need some more intelligence here
|
||||
if mode == "packet":
|
||||
@ -101,8 +185,8 @@ class ServiceHandler(object):
|
||||
else:
|
||||
output = WsjtServiceOutput(frequency)
|
||||
d = dsp(output)
|
||||
d.nc_port = self.source.getPort()
|
||||
d.set_offset_freq(frequency - self.source.getProps()["center_freq"])
|
||||
d.nc_port = source.getPort()
|
||||
d.set_offset_freq(frequency - source.getProps()["center_freq"])
|
||||
if mode == "packet":
|
||||
d.set_demodulator("nfm")
|
||||
d.set_bpf(-4000, 4000)
|
||||
@ -111,7 +195,7 @@ class ServiceHandler(object):
|
||||
d.set_bpf(0, 3000)
|
||||
d.set_secondary_demodulator(mode)
|
||||
d.set_audio_compression("none")
|
||||
d.set_samp_rate(self.source.getProps()["samp_rate"])
|
||||
d.set_samp_rate(source.getProps()["samp_rate"])
|
||||
d.start()
|
||||
return d
|
||||
|
||||
|
@ -275,6 +275,93 @@ class SdrSource(object):
|
||||
c.write_spectrum_data(data)
|
||||
|
||||
|
||||
class Resampler(SdrSource):
|
||||
def __init__(self, props, port, sdr):
|
||||
sdrProps = sdr.getProps()
|
||||
self.shift = (sdrProps["center_freq"] - props["center_freq"]) / sdrProps["samp_rate"]
|
||||
self.decimation = int(float(sdrProps["samp_rate"]) / props["samp_rate"])
|
||||
if_samp_rate = sdrProps["samp_rate"] / self.decimation
|
||||
self.transition_bw = 0.15 * (if_samp_rate / float(sdrProps["samp_rate"]))
|
||||
props["samp_rate"] = if_samp_rate
|
||||
|
||||
self.sdr = sdr
|
||||
super().__init__(props, port)
|
||||
|
||||
def start(self):
|
||||
self.modificationLock.acquire()
|
||||
if self.monitor:
|
||||
self.modificationLock.release()
|
||||
return
|
||||
|
||||
props = self.rtlProps
|
||||
|
||||
resampler_command = [
|
||||
"nc -v 127.0.0.1 {nc_port}".format(nc_port=self.sdr.getPort()),
|
||||
"csdr shift_addition_cc {shift}".format(shift=self.shift),
|
||||
"csdr fir_decimate_cc {decimation} {ddc_transition_bw} HAMMING".format(
|
||||
decimation=self.decimation,
|
||||
ddc_transition_bw=self.transition_bw,
|
||||
),
|
||||
]
|
||||
|
||||
nmux_bufcnt = nmux_bufsize = 0
|
||||
while nmux_bufsize < props["samp_rate"] / 4:
|
||||
nmux_bufsize += 4096
|
||||
while nmux_bufsize * nmux_bufcnt < props["nmux_memory"] * 1e6:
|
||||
nmux_bufcnt += 1
|
||||
if nmux_bufcnt == 0 or nmux_bufsize == 0:
|
||||
logger.error(
|
||||
"Error: nmux_bufsize or nmux_bufcnt is zero. These depend on nmux_memory and samp_rate options in config_webrx.py"
|
||||
)
|
||||
self.modificationLock.release()
|
||||
return
|
||||
logger.debug("nmux_bufsize = %d, nmux_bufcnt = %d" % (nmux_bufsize, nmux_bufcnt))
|
||||
resampler_command += ["nmux --bufsize %d --bufcnt %d --port %d --address 127.0.0.1" % (
|
||||
nmux_bufsize,
|
||||
nmux_bufcnt,
|
||||
self.port,
|
||||
)]
|
||||
cmd = " | ".join(resampler_command)
|
||||
logger.debug("resampler command: %s", cmd)
|
||||
self.process = subprocess.Popen(cmd, shell=True, preexec_fn=os.setpgrp)
|
||||
logger.info("Started resampler source: " + cmd)
|
||||
|
||||
available = False
|
||||
|
||||
def wait_for_process_to_end():
|
||||
rc = self.process.wait()
|
||||
logger.debug("shut down with RC={0}".format(rc))
|
||||
self.monitor = None
|
||||
|
||||
self.monitor = threading.Thread(target=wait_for_process_to_end)
|
||||
self.monitor.start()
|
||||
|
||||
retries = 1000
|
||||
while retries > 0:
|
||||
retries -= 1
|
||||
if self.monitor is None:
|
||||
break
|
||||
testsock = socket.socket()
|
||||
try:
|
||||
testsock.connect(("127.0.0.1", self.getPort()))
|
||||
testsock.close()
|
||||
available = True
|
||||
break
|
||||
except:
|
||||
time.sleep(0.1)
|
||||
|
||||
self.modificationLock.release()
|
||||
|
||||
if not available:
|
||||
raise SdrSourceException("resampler source failed to start up")
|
||||
|
||||
for c in self.clients:
|
||||
c.onSdrAvailable()
|
||||
|
||||
def activateProfile(self, profile_id=None):
|
||||
pass
|
||||
|
||||
|
||||
class RtlSdrSource(SdrSource):
|
||||
def getCommand(self):
|
||||
return "rtl_sdr -s {samp_rate} -f {center_freq} -p {ppm} -g {rf_gain} -"
|
||||
|
Loading…
Reference in New Issue
Block a user