we got fft

This commit is contained in:
Jakob Ketterl 2019-05-04 20:26:11 +02:00
parent 89690d214d
commit 1f909080db
8 changed files with 254 additions and 47 deletions

View File

@ -175,7 +175,7 @@ iq_server_port = 4951 #TCP port for ncat to listen on. It will send I/Q data ove
#A guide is available to help you set these values: https://github.com/simonyiszk/openwebrx/wiki/Calibrating-waterfall-display-levels #A guide is available to help you set these values: https://github.com/simonyiszk/openwebrx/wiki/Calibrating-waterfall-display-levels
### default theme by teejez: ### default theme by teejez:
waterfall_colors = "[0x000000ff,0x0000ffff,0x00ffffff,0x00ff00ff,0xffff00ff,0xff0000ff,0xff00ffff,0xffffffff]" waterfall_colors = [0x000000ff,0x0000ffff,0x00ffffff,0x00ff00ff,0xffff00ff,0xff0000ff,0xff00ffff,0xffffffff]
waterfall_min_level = -88 #in dB waterfall_min_level = -88 #in dB
waterfall_max_level = -20 waterfall_max_level = -20
waterfall_auto_level_margin = (5, 40) waterfall_auto_level_margin = (5, 40)

14
csdr.py
View File

@ -129,7 +129,7 @@ class dsp:
def start_secondary_demodulator(self): def start_secondary_demodulator(self):
if(not self.secondary_demodulator): return if(not self.secondary_demodulator): return
print "[openwebrx] starting secondary demodulator from IF input sampled at %d"%self.if_samp_rate() print("[openwebrx] starting secondary demodulator from IF input sampled at %d"%self.if_samp_rate())
secondary_command_fft=self.secondary_chain("fft") secondary_command_fft=self.secondary_chain("fft")
secondary_command_demod=self.secondary_chain(self.secondary_demodulator) secondary_command_demod=self.secondary_chain(self.secondary_demodulator)
self.try_create_pipes(self.secondary_pipe_names, secondary_command_demod + secondary_command_fft) self.try_create_pipes(self.secondary_pipe_names, secondary_command_demod + secondary_command_fft)
@ -150,16 +150,16 @@ class dsp:
if_samp_rate=self.if_samp_rate() if_samp_rate=self.if_samp_rate()
) )
print "[openwebrx-dsp-plugin:csdr] secondary command (fft) =", secondary_command_fft print("[openwebrx-dsp-plugin:csdr] secondary command (fft) =", secondary_command_fft)
print "[openwebrx-dsp-plugin:csdr] secondary command (demod) =", secondary_command_demod print("[openwebrx-dsp-plugin:csdr] secondary command (demod) =", secondary_command_demod)
#code.interact(local=locals()) #code.interact(local=locals())
my_env=os.environ.copy() my_env=os.environ.copy()
#if self.csdr_dynamic_bufsize: my_env["CSDR_DYNAMIC_BUFSIZE_ON"]="1"; #if self.csdr_dynamic_bufsize: my_env["CSDR_DYNAMIC_BUFSIZE_ON"]="1";
if self.csdr_print_bufsizes: my_env["CSDR_PRINT_BUFSIZES"]="1"; if self.csdr_print_bufsizes: my_env["CSDR_PRINT_BUFSIZES"]="1";
self.secondary_process_fft = subprocess.Popen(secondary_command_fft, stdout=subprocess.PIPE, shell=True, preexec_fn=os.setpgrp, env=my_env) self.secondary_process_fft = subprocess.Popen(secondary_command_fft, stdout=subprocess.PIPE, shell=True, preexec_fn=os.setpgrp, env=my_env)
print "[openwebrx-dsp-plugin:csdr] Popen on secondary command (fft)" print("[openwebrx-dsp-plugin:csdr] Popen on secondary command (fft)")
self.secondary_process_demod = subprocess.Popen(secondary_command_demod, stdout=subprocess.PIPE, shell=True, preexec_fn=os.setpgrp, env=my_env) #TODO digimodes self.secondary_process_demod = subprocess.Popen(secondary_command_demod, stdout=subprocess.PIPE, shell=True, preexec_fn=os.setpgrp, env=my_env) #TODO digimodes
print "[openwebrx-dsp-plugin:csdr] Popen on secondary command (demod)" #TODO digimodes print("[openwebrx-dsp-plugin:csdr] Popen on secondary command (demod)") #TODO digimodes
self.secondary_processes_running = True self.secondary_processes_running = True
#open control pipes for csdr and send initialization data #open control pipes for csdr and send initialization data
@ -313,7 +313,7 @@ class dsp:
pipe_path = getattr(self,pipe_name,None) pipe_path = getattr(self,pipe_name,None)
if pipe_path: if pipe_path:
try: os.unlink(pipe_path) try: os.unlink(pipe_path)
except Exception as e: print "[openwebrx-dsp-plugin:csdr] try_delete_pipes() ::", e except Exception as e: print("[openwebrx-dsp-plugin:csdr] try_delete_pipes() ::", e)
def set_pipe_nonblocking(self, pipe): def set_pipe_nonblocking(self, pipe):
flags = fcntl.fcntl(pipe, fcntl.F_GETFL) flags = fcntl.fcntl(pipe, fcntl.F_GETFL)
@ -354,7 +354,7 @@ class dsp:
flowcontrol=int(self.samp_rate*2), start_bufsize=self.base_bufsize*self.decimation, nc_port=self.nc_port, \ flowcontrol=int(self.samp_rate*2), start_bufsize=self.base_bufsize*self.decimation, nc_port=self.nc_port, \
squelch_pipe=self.squelch_pipe, smeter_pipe=self.smeter_pipe, iqtee_pipe=self.iqtee_pipe, iqtee2_pipe=self.iqtee2_pipe ) squelch_pipe=self.squelch_pipe, smeter_pipe=self.smeter_pipe, iqtee_pipe=self.iqtee_pipe, iqtee2_pipe=self.iqtee2_pipe )
print "[openwebrx-dsp-plugin:csdr] Command =",command print("[openwebrx-dsp-plugin:csdr] Command =",command)
#code.interact(local=locals()) #code.interact(local=locals())
my_env=os.environ.copy() my_env=os.environ.copy()
if self.csdr_dynamic_bufsize: my_env["CSDR_DYNAMIC_BUFSIZE_ON"]="1"; if self.csdr_dynamic_bufsize: my_env["CSDR_DYNAMIC_BUFSIZE_ON"]="1";

View File

@ -1184,6 +1184,25 @@ function on_ws_recv(evt)
} }
} else if (evt.data instanceof ArrayBuffer) { } else if (evt.data instanceof ArrayBuffer) {
// binary messages // binary messages
type = new Uint8Array(evt.data, 0, 1)[0]
data = evt.data.slice(1)
switch (type) {
case 1:
if (fft_compression=="none") {
waterfall_add_queue(new Float32Array(data));
} else if (fft_compression == "adpcm") {
fft_codec.reset();
var waterfall_i16=fft_codec.decode(new Uint8Array(data));
var waterfall_f32=new Float32Array(waterfall_i16.length-COMPRESS_FFT_PAD_N);
for(var i=0;i<waterfall_i16.length;i++) waterfall_f32[i]=waterfall_i16[i+COMPRESS_FFT_PAD_N]/100;
waterfall_add_queue(waterfall_f32);
}
break;
default:
console.warn('unknown type of binary message: ' + type)
}
} }
return return
if(!(evt.data instanceof ArrayBuffer)) { divlog("on_ws_recv(): Not ArrayBuffer received...",1); return; } if(!(evt.data instanceof ArrayBuffer)) { divlog("on_ws_recv(): Not ArrayBuffer received...",1); return; }
@ -1656,8 +1675,8 @@ function audio_preinit()
audio_calculate_resampling(audio_context.sampleRate); audio_calculate_resampling(audio_context.sampleRate);
audio_resampler = new sdrjs.RationalResamplerFF(audio_client_resampling_factor,1); audio_resampler = new sdrjs.RationalResamplerFF(audio_client_resampling_factor,1);
ws.send("SET output_rate="+audio_server_output_rate.toString()+" action=start"); //now we'll get AUD packets as well
ws.send(JSON.stringify({"type":"start","params":{"output_rate":audio_server_output_rate}}))
} }
function audio_init() function audio_init()
@ -1982,18 +2001,14 @@ function waterfall_add(data)
waterfall_image.data[base+x*4+i] = ((color>>>0)>>((3-i)*8))&0xff; waterfall_image.data[base+x*4+i] = ((color>>>0)>>((3-i)*8))&0xff;
}*/ }*/
if(mathbox_mode==MATHBOX_MODES.WATERFALL) if (mathbox_mode==MATHBOX_MODES.WATERFALL) {
{
//Handle mathbox //Handle mathbox
for(var i=0;i<fft_size;i++) mathbox_data[i+mathbox_data_index*fft_size]=data[i]; for(var i=0;i<fft_size;i++) mathbox_data[i+mathbox_data_index*fft_size]=data[i];
mathbox_shift(); mathbox_shift();
} } else {
else
{
//Add line to waterfall image //Add line to waterfall image
oneline_image = canvas_context.createImageData(w,1); oneline_image = canvas_context.createImageData(w,1);
for(x=0;x<w;x++) for (x=0;x<w;x++) {
{
color=waterfall_mkcolor(data[x]); color=waterfall_mkcolor(data[x]);
for(i=0;i<4;i++) for(i=0;i<4;i++)
oneline_image.data[x*4+i] = ((color>>>0)>>((3-i)*8))&0xff; oneline_image.data[x*4+i] = ((color>>>0)>>((3-i)*8))&0xff;

View File

@ -21,3 +21,6 @@ class PropertyManager(object):
if not name in self.properties: if not name in self.properties:
self.properties[name] = Property() self.properties[name] = Property()
return self.properties[name] return self.properties[name]
def getPropertyValue(self, name):
return self.getProperty(name).getValue()

View File

@ -1,6 +1,9 @@
import mimetypes import mimetypes
from owrx.websocket import WebSocketConnection from owrx.websocket import WebSocketConnection
from owrx.config import PropertyManager from owrx.config import PropertyManager
from owrx.source import SpectrumThread
import csdr
import json
class Controller(object): class Controller(object):
def __init__(self, handler, matches): def __init__(self, handler, matches):
@ -44,12 +47,18 @@ class AssetsController(Controller):
filename = self.matches.group(1) filename = self.matches.group(1)
self.serve_file(filename) self.serve_file(filename)
class SpectrumForwarder(object):
def __init__(self, conn):
self.conn = conn
def write_spectrum_data(self, data):
self.conn.send(bytes([0x01]) + data)
class WebSocketController(Controller): class WebSocketMessageHandler(object):
def handle_request(self): def __init__(self):
conn = WebSocketConnection(self.handler) self.forwarder = None
conn.send("CLIENT DE SERVER openwebrx.py")
def handleTextMessage(self, conn, message):
if (message[:16] == "SERVER DE CLIENT"):
config = {} config = {}
pm = PropertyManager.getSharedInstance() pm = PropertyManager.getSharedInstance()
@ -60,3 +69,44 @@ class WebSocketController(Controller):
config[key] = pm.getProperty(key).getValue() config[key] = pm.getProperty(key).getValue()
conn.send({"type":"config","value":config}) conn.send({"type":"config","value":config})
print("client connection intitialized")
dsp = self.dsp = csdr.dsp()
dsp_initialized=False
dsp.set_audio_compression(pm.getPropertyValue("audio_compression"))
dsp.set_fft_compression(pm.getPropertyValue("fft_compression")) #used by secondary chains
dsp.set_format_conversion(pm.getPropertyValue("format_conversion"))
dsp.set_offset_freq(0)
dsp.set_bpf(-4000,4000)
dsp.set_secondary_fft_size(pm.getPropertyValue("digimodes_fft_size"))
dsp.nc_port=pm.getPropertyValue("iq_server_port")
dsp.csdr_dynamic_bufsize = pm.getPropertyValue("csdr_dynamic_bufsize")
dsp.csdr_print_bufsizes = pm.getPropertyValue("csdr_print_bufsizes")
dsp.csdr_through = pm.getPropertyValue("csdr_through")
do_secondary_demod=False
self.forwarder = SpectrumForwarder(conn)
SpectrumThread.getSharedInstance().add_client(self.forwarder)
else:
try:
message = json.loads(message)
if message["type"] == "start":
self.dsp.set_samp_rate(message["params"]["output_rate"])
self.dsp.start()
except json.JSONDecodeError:
print("message is not json: {0}".format(message))
def handleBinaryMessage(self, conn, data):
print("unsupported binary message, discarding")
def handleClose(self, conn):
if self.forwarder:
SpectrumThread.getSharedInstance().remove_client(self.forwarder)
class WebSocketController(Controller):
def handle_request(self):
conn = WebSocketConnection(self.handler, WebSocketMessageHandler())
conn.send("CLIENT DE SERVER openwebrx.py")
# enter read loop
conn.read_loop()

101
owrx/source.py Normal file
View File

@ -0,0 +1,101 @@
import subprocess
from owrx.config import PropertyManager
import threading
import csdr
import time
class RtlNmuxSource(object):
def __init__(self):
pm = PropertyManager.getSharedInstance()
nmux_bufcnt = nmux_bufsize = 0
while nmux_bufsize < pm.getPropertyValue("samp_rate")/4: nmux_bufsize += 4096
while nmux_bufsize * nmux_bufcnt < pm.getPropertyValue("nmux_memory") * 1e6: nmux_bufcnt += 1
if nmux_bufcnt == 0 or nmux_bufsize == 0:
print("[openwebrx-main] Error: nmux_bufsize or nmux_bufcnt is zero. These depend on nmux_memory and samp_rate options in config_webrx.py")
return
print("[openwebrx-main] nmux_bufsize = %d, nmux_bufcnt = %d" % (nmux_bufsize, nmux_bufcnt))
cmd = pm.getPropertyValue("start_rtl_command") + "| nmux --bufsize %d --bufcnt %d --port %d --address 127.0.0.1" % (nmux_bufsize, nmux_bufcnt, pm.getPropertyValue("iq_server_port"))
subprocess.Popen(cmd, shell=True)
print("[openwebrx-main] Started rtl source: " + cmd)
class SpectrumThread(threading.Thread):
sharedInstance = None
@staticmethod
def getSharedInstance():
if SpectrumThread.sharedInstance is None:
SpectrumThread.sharedInstance = SpectrumThread()
SpectrumThread.sharedInstance.start()
return SpectrumThread.sharedInstance
def __init__(self):
self.clients = []
self.doRun = True
super().__init__()
def run(self):
pm = PropertyManager.getSharedInstance()
samp_rate = pm.getPropertyValue("samp_rate")
fft_size = pm.getPropertyValue("fft_size")
fft_fps = pm.getPropertyValue("fft_fps")
fft_voverlap_factor = pm.getPropertyValue("fft_voverlap_factor")
fft_compression = pm.getPropertyValue("fft_compression")
format_conversion = pm.getPropertyValue("format_conversion")
spectrum_dsp=dsp=csdr.dsp()
dsp.nc_port = pm.getPropertyValue("iq_server_port")
dsp.set_demodulator("fft")
dsp.set_samp_rate(samp_rate)
dsp.set_fft_size(fft_size)
dsp.set_fft_fps(fft_fps)
dsp.set_fft_averages(int(round(1.0 * samp_rate / fft_size / fft_fps / (1.0 - fft_voverlap_factor))) if fft_voverlap_factor>0 else 0)
dsp.set_fft_compression(fft_compression)
dsp.set_format_conversion(format_conversion)
dsp.csdr_dynamic_bufsize = pm.getPropertyValue("csdr_dynamic_bufsize")
dsp.csdr_print_bufsizes = pm.getPropertyValue("csdr_print_bufsizes")
dsp.csdr_through = pm.getPropertyValue("csdr_through")
sleep_sec=0.87/fft_fps
print("[openwebrx-spectrum] Spectrum thread initialized successfully.")
dsp.start()
if pm.getPropertyValue("csdr_dynamic_bufsize"):
dsp.read(8) #dummy read to skip bufsize & preamble
print("[openwebrx-spectrum] Note: CSDR_DYNAMIC_BUFSIZE_ON = 1")
print("[openwebrx-spectrum] Spectrum thread started.")
bytes_to_read=int(dsp.get_fft_bytes_to_read())
spectrum_thread_counter=0
while self.doRun:
data=dsp.read(bytes_to_read)
#print("gotcha",len(data),"bytes of spectrum data via spectrum_thread_function()")
if spectrum_thread_counter >= fft_fps:
spectrum_thread_counter=0
else: spectrum_thread_counter+=1
for c in self.clients:
c.write_spectrum_data(data)
'''
correction=0
for i in range(0,len(clients)):
i-=correction
if (clients[i].ws_started):
if clients[i].spectrum_queue.full():
print "[openwebrx-spectrum] client spectrum queue full, closing it."
close_client(i, False)
correction+=1
else:
clients[i].spectrum_queue.put([data]) # add new string by "reference" to all clients
'''
print("spectrum thread shut down")
def add_client(self, c):
self.clients.append(c)
def remove_client(self, c):
self.clients.remove(c)
if not self.clients:
self.shutdown()
def shutdown(self):
print("shutting down spectrum thread")
SpectrumThread.sharedInstance = None
self.doRun = False

View File

@ -3,8 +3,9 @@ import hashlib
import json import json
class WebSocketConnection(object): class WebSocketConnection(object):
def __init__(self, handler): def __init__(self, handler, messageHandler):
self.handler = handler self.handler = handler
self.messageHandler = messageHandler
my_headers = self.handler.headers.items() my_headers = self.handler.headers.items()
my_header_keys = list(map(lambda x:x[0],my_headers)) my_header_keys = list(map(lambda x:x[0],my_headers))
h_key_exists = lambda x:my_header_keys.count(x) h_key_exists = lambda x:my_header_keys.count(x)
@ -40,8 +41,35 @@ class WebSocketConnection(object):
else: else:
header = self.get_header(len(data), 2) header = self.get_header(len(data), 2)
self.handler.wfile.write(header) self.handler.wfile.write(header)
self.handler.wfile.write(data.encode()) self.handler.wfile.write(data)
self.handler.wfile.flush() self.handler.wfile.flush()
def read_loop(self):
open = True
while (open):
header = self.handler.rfile.read(2)
opcode = header[0] & 0x0F
length = header[1] & 0x7F
mask = (header[1] & 0x80) >> 7
if (length == 126):
header = self.handler.rfile.read(2)
length = (header[0] << 8) + header[1]
if (mask):
masking_key = self.handler.rfile.read(4)
data = self.handler.rfile.read(length)
print("opcode: {0}, length: {1}, mask: {2}".format(opcode, length, mask))
if (mask):
data = bytes([b ^ masking_key[index % 4] for (index, b) in enumerate(data)])
if (opcode == 1):
message = data.decode('utf-8')
self.messageHandler.handleTextMessage(self, message)
elif (opcode == 2):
self.messageHandler.handleBinaryMessage(self, data)
elif (opcode == 8):
open = False
self.messageHandler.handleClose(self)
else:
print("unsupported opcode: {0}".format(opcode))
class WebSocketException(Exception): class WebSocketException(Exception):
pass pass

View File

@ -1,13 +1,23 @@
from http.server import HTTPServer from http.server import HTTPServer
from owrx.http import RequestHandler from owrx.http import RequestHandler
from owrx.config import PropertyManager from owrx.config import PropertyManager
from owrx.source import RtlNmuxSource, SpectrumThread
from socketserver import ThreadingMixIn
cfg=__import__("config_webrx") class ThreadedHttpServer(ThreadingMixIn, HTTPServer):
pm = PropertyManager.getSharedInstance() pass
for name, value in cfg.__dict__.items():
def main():
cfg=__import__("config_webrx")
pm = PropertyManager.getSharedInstance()
for name, value in cfg.__dict__.items():
if (name.startswith("__")): continue if (name.startswith("__")): continue
pm.getProperty(name).setValue(value) pm.getProperty(name).setValue(value)
server = HTTPServer(('0.0.0.0', 3000), RequestHandler) RtlNmuxSource()
server.serve_forever()
server = ThreadedHttpServer(('0.0.0.0', 3000), RequestHandler)
server.serve_forever()
if __name__=="__main__":
main()