From fa09f9b9d23f0c71410fceaa8392e1561012f9d5 Mon Sep 17 00:00:00 2001 From: ha7ilm Date: Sun, 24 Jan 2016 00:03:08 +0100 Subject: [PATCH] Removed rtl_mus in favor of ncat. --- README.md | 13 +- config_rtl.py | 103 ------- config_webrx.py | 17 +- openwebrx.py | 15 +- plugins/dsp/csdr/plugin.py | 6 +- rtl_mus.py | 539 ------------------------------------- 6 files changed, 21 insertions(+), 672 deletions(-) delete mode 100644 config_rtl.py delete mode 100644 rtl_mus.py diff --git a/README.md b/README.md index f80c2f2..d0ea8d2 100644 --- a/README.md +++ b/README.md @@ -24,10 +24,13 @@ It has the following features: **News (2015-09-01)** - The DDC in *csdr* has been hand-optimized for ARM NEON, so it runs 3× faster on the Raspberry Pi than before. -- Also we use *ncat* instead of *rtl_mus*, and it is also 3× faster. +- Also we use *ncat* instead of *rtl_mus*, and it is 3× faster. - OpenWebRX now supports URLs like: http://localhost:8073/#freq=145555000,mod=usb -- When upgrading OpenWebRX, please make sure that you upgrade *csdr*, and install the new (optional) dependency *ncat*! +**News (2016-01-23)** +- *ncat* is now a requirement for OpenWebRX. + +When upgrading OpenWebRX, please make sure that you upgrade *csdr*, and install the new (optional) dependency *ncat*! ## Setup @@ -37,7 +40,11 @@ First you will need to install the dependencies: - libcsdr - rtl-sdr -- ncat (on Debian/Ubuntu, it is in the *nmap* package). *(It is optional, but highly advised.)* +- ncat (On Debian/Ubuntu, it is in the *nmap* package). + +> By the way, *nmap* is tool commonly used for auditing network security, and it is not used by OpenWebRX in any way. We need it because the *ncat* command is packaged with it. +> +> *ncat* is a better *netcat* alternative, which is used by OpenWebRX for internally distributing the I/Q data stream. It also solves the problem of having different versions of *netcat* on different Linux distributions, which are not compatible by their command-line arguments. After cloning this repository and connecting an RTL-SDR dongle to your computer, you can run the server: diff --git a/config_rtl.py b/config_rtl.py deleted file mode 100644 index d31d951..0000000 --- a/config_rtl.py +++ /dev/null @@ -1,103 +0,0 @@ -''' - This file is part of RTL Multi-User Server, - that makes multi-user access to your DVB-T dongle used as an SDR. - Copyright (c) 2013-2015 by Andras Retzler - - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU Affero General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License - along with this program. If not, see . - - ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ - - In addition, as a special exception, the copyright holders - state that config_rtl.py and config_webrx.py are not part of the - Corresponding Source defined in GNU AGPL version 3 section 1. - - (It means that you do not have to redistribute config_rtl.py and - config_webrx.py if you make any changes to these two configuration files, - and use them for running your own web service with OpenWebRX.) -''' - -my_ip='127.0.0.1' # leave blank for listening on all interfaces -my_listening_port = 4951 - -rtl_tcp_host,rtl_tcp_port='localhost',8888 - -send_first="" -#send_first=chr(9)+chr(0)+chr(0)+chr(0)+chr(1) # set direct sampling - -setuid_on_start = 0 # we normally start with root privileges and setuid() to another user -uid = 999 # determine by issuing: $ id -u username -ignore_clients_without_commands = 1 # we won't serve data to telnet sessions and things like that - # we'll start to serve data after getting the first valid command - -freq_allowed_ranges = [[0,2200000000]] - -client_cant_set_until=0 -first_client_can_set=True # openwebrx - spectrum thread will set things on start # no good, clients set parameters and things -buffer_size=25000000 # per client -log_file_path = "/dev/null" # Might be set to /dev/null to turn off logging - -''' -Allow any host to connect: - use_ip_access_control=0 - -Allow from specific ranges: - use_ip_access_control=1 - order_allow_deny=0 # deny and then allow - denied_ip_ranges=() # deny from all - allowed_ip_ranges=('192.168.','44.','127.0.0.1') # allow only from ... - -Deny from specific ranges: - use_ip_access_control=1 - order_allow_deny=0 # allow and then deny - allowed_ip_ranges=() # allow from all - denied_ip_ranges=('192.168.') # deny any hosts from ... -''' -use_ip_access_control=1 #You may want to open up the I/Q server to the public, then set this to zero. -order_allow_deny=0 -denied_ip_ranges=() # deny from all -allowed_ip_ranges=('127.0.0.1') # allow only local connections (from openwebrx). -allow_gain_set=1 - -use_dsp_command=False # you can process raw I/Q data with a custom command that starts a process that we can pipe the data into, and also pipe out of. -debug_dsp_command=False # show sample rate before and after the dsp command -dsp_command="" - -''' -Example DSP commands: - * Compress I/Q data with FLAC: - flac --force-raw-format --channels 2 --sample-rate=250000 --sign=unsigned --bps=8 --endian=little -o - - - * Decompress FLAC-coded I/Q data: - flac --force-raw-format --decode --endian=little --sign=unsigned - - -''' -watchdog_interval=0 -reconnect_interval=10 -''' -If there's no input I/Q data after N seconds, input will be filled with zero samples, -so that GNU Radio won't fail in OpenWebRX. It may reconnect rtl_tcp_thread. -If watchdog_interval is 0, then watchdog thread is not started. - -''' -cache_full_behaviour=2 -''' - 0 = drop samples - 1 = close client - 2 = openwebrx: don't care about that client until it wants samples again (gr-osmosdr bug workaround) -''' - -rtl_tcp_password=None -''' -This one applies to a special version of rtl_tcp that has authentication. -# You can find more info here: https://github.com/ha7ilm/rtl-sdr -# If it is set to a string (e.g. rtl_tcp_password="changeme"), rtl_mus will try to authenticate against the rtl_tcp server. -''' diff --git a/config_webrx.py b/config_webrx.py index a4a6fdf..b79dbdc 100644 --- a/config_webrx.py +++ b/config_webrx.py @@ -66,8 +66,8 @@ sdrhu_public_listing = False dsp_plugin="csdr" fft_fps=9 fft_size=4096 -#samp_rate = 2048000 -samp_rate = 250000 +samp_rate = 2048000 +#samp_rate = 250000 center_freq = 145525000 rf_gain = 5 @@ -82,22 +82,18 @@ start_rtl_thread=True # >> RTL-SDR via rtl_sdr -start_rtl_command="rtl_sdr -s {samp_rate} -f {center_freq} -p {ppm} -g {rf_gain} - | nc -vvl 127.0.0.1 8888".format(rf_gain=rf_gain, center_freq=center_freq, samp_rate=samp_rate, ppm=ppm) +start_rtl_command="rtl_sdr -s {samp_rate} -f {center_freq} -p {ppm} -g {rf_gain} -".format(rf_gain=rf_gain, center_freq=center_freq, samp_rate=samp_rate, ppm=ppm) format_conversion="csdr convert_u8_f" # >> Sound card SDR (needs ALSA) #I did not have the chance to properly test it. #samp_rate = 96000 -#start_rtl_command="arecord -f S16_LE -r {samp_rate} -c2 - | nc -vvl 127.0.0.1 8888".format(samp_rate=samp_rate) +#start_rtl_command="arecord -f S16_LE -r {samp_rate} -c2 -".format(samp_rate=samp_rate) #format_conversion="csdr convert_i16_f | csdr gain_ff 30" -# >> RTL_SDR via rtl_tcp -#start_rtl_command="rtl_tcp -s {samp_rate} -f {center_freq} -g {rf_gain} -P {ppm} -p 8888".format(rf_gain=rf_gain, center_freq=center_freq, samp_rate=samp_rate, ppm=ppm) -#format_conversion="csdr convert_u8_f" - # >> /dev/urandom test signal source #samp_rate = 2400000 -#start_rtl_command="cat /dev/urandom | (pv -qL `python -c 'print int({samp_rate} * 2.2)'` 2>&1) | nc -vvl 127.0.0.1 8888".format(rf_gain=rf_gain, center_freq=center_freq, samp_rate=samp_rate) +#start_rtl_command="cat /dev/urandom | (pv -qL `python -c 'print int({samp_rate} * 2.2)'` 2>&1)".format(rf_gain=rf_gain, center_freq=center_freq, samp_rate=samp_rate) #format_conversion="csdr convert_u8_f" #You can use other SDR hardware as well, by giving your own command that outputs the I/Q samples... @@ -112,5 +108,4 @@ client_audio_buffer_size = 5 start_freq = center_freq start_mod = "nfm" #nfm, am, lsb, usb, cw -iq_server_port = 4951 -# (if ncat is not available on your system, rtl_mus will be used, thus you will have to set the same port as "my_listening_port" in config_rtl.py as well) +iq_server_port = 4951 #TCP port for ncat to listen on. It will send I/Q data over its connections, for internal use in OpenWebRX. It is only accessible from the localhost by default. diff --git a/openwebrx.py b/openwebrx.py index 38b53b6..117f327 100755 --- a/openwebrx.py +++ b/openwebrx.py @@ -116,21 +116,14 @@ def main(): pass #Start rtl thread + if os.system("ncat --version > /dev/null") == 32512: #check for ncat + print "[openwebrx-main] Error: ncat not detected, please install it! The ncat tool is a netcat alternative, used for distributing the I/Q data stream. It is usually available in the nmap package (sudo apt-get install nmap). For more explanation, look into the README.md" + return if cfg.start_rtl_thread: + cfg.start_rtl_command += "| ncat -4l %d -k --send-only --allow 127.0.0.1" % cfg.iq_server_port rtl_thread=threading.Thread(target = lambda:subprocess.Popen(cfg.start_rtl_command, shell=True), args=()) rtl_thread.start() print "[openwebrx-main] Started rtl_thread: "+cfg.start_rtl_command - - #Run rtl_mus.py in a different OS thread - python_command="pypy" if pypy else "python2" - rtl_mus_cmd = python_command+" rtl_mus.py config_rtl" - if os.system("ncat --version > /dev/null") != 32512: - print "[openwebrx-main] ncat detected, using it instead of rtl_mus:" - rtl_mus_cmd = "ncat localhost 8888 | ncat -4l %d -k --send-only --allow 127.0.0.1 " % cfg.iq_server_port - print rtl_mus_cmd - rtl_mus_thread=threading.Thread(target = lambda:subprocess.Popen(rtl_mus_cmd, shell=True), args=()) - rtl_mus_thread.start() # The new feature in GNU Radio 3.7: top_block() locks up ALL python threads until it gets the TCP connection. - print "[openwebrx-main] Started rtl_mus." time.sleep(1) #wait until it really starts #Initialize clients diff --git a/plugins/dsp/csdr/plugin.py b/plugins/dsp/csdr/plugin.py index 091b218..ecbfffc 100644 --- a/plugins/dsp/csdr/plugin.py +++ b/plugins/dsp/csdr/plugin.py @@ -46,13 +46,9 @@ class dsp_plugin: self.format_conversion = "csdr convert_u8_f" self.base_bufsize = 512 self.nc_port = 4951 - try: - subprocess.Popen("nc",stdout=subprocess.PIPE,stderr=subprocess.PIPE).kill() - except: - print "[openwebrx-plugin:csdr] error: netcat not found, please install netcat!" def chain(self,which): - any_chain_base="nc -v 127.0.0.1 {nc_port} | csdr setbuf {start_bufsize} | csdr through | "+self.format_conversion+(" | " if self.format_conversion!="" else "") ##"csdr flowcontrol {flowcontrol} auto 1.5 10 | " + any_chain_base="ncat -v 127.0.0.1 {nc_port} | csdr setbuf {start_bufsize} | csdr through | "+self.format_conversion+(" | " if self.format_conversion!="" else "") ##"csdr flowcontrol {flowcontrol} auto 1.5 10 | " if which == "fft": fft_chain_base = "sleep 1; "+any_chain_base+"csdr fft_cc {fft_size} {fft_block_size} | csdr logpower_cf -70 | csdr fft_exchange_sides_ff {fft_size}" if self.fft_compression=="adpcm": diff --git a/rtl_mus.py b/rtl_mus.py deleted file mode 100644 index 9f5e230..0000000 --- a/rtl_mus.py +++ /dev/null @@ -1,539 +0,0 @@ -''' - This file is part of RTL Multi-User Server, - that makes multi-user access to your DVB-T dongle used as an SDR. - Copyright (c) 2013-2015 by Andras Retzler - - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU Affero General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License - along with this program. If not, see . - ------ - -2013-11? Asyncore version -2014-03 Fill with null on no data - -''' - -import socket -import sys -import array -import time -import logging -import os -import time -import subprocess -import fcntl -import thread -import pdb -import asyncore -import multiprocessing -import signal -#pypy compatiblity -try: import dl -except: pass - -import code -import traceback - -def handle_signal(signal, frame): - log.info("Ctrl+C: aborting.") - os._exit(1) #not too graceful exit - -def ip_match(this,ip_ranges,for_allow): - if not len(ip_ranges): - return 1 #empty list matches all ip addresses - for ip_range in ip_ranges: - #print this[0:len(ip_range)], ip_range - if this[0:len(ip_range)]==ip_range: - return 1 - return 0 - -def ip_access_control(ip): - if(not cfg.use_ip_access_control): return 1 - allowed=0 - if(cfg.order_allow_deny): - if ip_match(ip,cfg.allowed_ip_ranges,1): allowed=1 - if ip_match(ip,cfg.denied_ip_ranges,0): allowed=0 - else: - if ip_match(ip,cfg.denied_ip_ranges,0): - allowed=0 - if ip_match(ip,cfg.allowed_ip_ranges,1): - allowed=1 - return allowed - -def add_data_to_clients(new_data): - # might be called from: - # -> dsp_read - # -> rtl_tcp_asyncore.handle_read - global clients - global clients_mutex - clients_mutex.acquire() - for client in clients: - #print "client %d size: %d"%(client[0].ident,client[0].waiting_data.qsize()) - if(client[0].waiting_data.full()): - if cfg.cache_full_behaviour == 0: - log.error("client cache full, dropping samples: "+str(client[0].ident)+"@"+client[0].socket[1][0]) - while not client[0].waiting_data.empty(): # clear queue - client[0].waiting_data.get(False, None) - elif cfg.cache_full_behaviour == 1: - #rather closing client: - log.error("client cache full, dropping client: "+str(client[0].ident)+"@"+client[0].socket[1][0]) - client[0].close(False) - elif cfg.cache_full_behaviour == 2: - pass #client cache full, just not taking care - else: log.error("invalid value for cfg.cache_full_behaviour") - else: - client[0].waiting_data.put(new_data) - clients_mutex.release() - -def dsp_read_thread(): - global proc - global dsp_data_count - while True: - try: - my_buffer=proc.stdout.read(1024) - except IOError: - log.error("DSP subprocess is not ready for reading.") - time.sleep(1) - continue - add_data_to_clients(my_buffer) - if cfg.debug_dsp_command: - dsp_data_count+=len(my_buffer) - -def dsp_write_thread(): - global proc - global dsp_input_queue - global original_data_count - while True: - try: - my_buffer=dsp_input_queue.get(timeout=0.3) - except: - continue - proc.stdin.write(my_buffer) - proc.stdin.flush() - if cfg.debug_dsp_command: - original_data_count+=len(my_buffer) - -class client_handler(asyncore.dispatcher): - - def __init__(self,client_param): - self.client=client_param - self.client[0].asyncore=self - self.sent_dongle_id=False - self.last_waiting_buffer="" - asyncore.dispatcher.__init__(self, self.client[0].socket[0]) - self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - - def handle_read(self): - global commands - new_command = self.recv(5) - if len(new_command)>=5: - if handle_command(new_command, self.client): - commands.put(new_command) - - def handle_error(self): - exc_type, exc_value, exc_traceback = sys.exc_info() - log.info("client error: "+str(self.client[0].ident)+"@"+self.client[0].socket[1][0]) - traceback.print_tb(exc_traceback) - self.close() - - def handle_close(self): - self.client[0].close() - log.info("client disconnected: "+str(self.client[0].ident)+"@"+self.client[0].socket[1][0]) - - def writable(self): - #print "queryWritable",not self.client[0].waiting_data.empty() - return not self.client[0].waiting_data.empty() - - def handle_write(self): - global last_waiting - global rtl_dongle_identifier - global sample_rate - if not self.sent_dongle_id: - self.send(rtl_dongle_identifier) - self.sent_dongle_id=True - return - #print "write2client",self.client[0].waiting_data.qsize() - next=self.last_waiting_buffer+self.client[0].waiting_data.get() - sent=asyncore.dispatcher.send(self, next) - self.last_waiting_buffer=next[sent:] - -class server_asyncore(asyncore.dispatcher): - - def __init__(self): - asyncore.dispatcher.__init__(self) - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) - self.set_reuse_addr() - self.bind((cfg.my_ip, cfg.my_listening_port)) - self.listen(5) - self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - log.info("Server listening on port: "+str(cfg.my_listening_port)) - - def handle_accept(self): - global max_client_id - global clients_mutex - global clients - my_client=[client()] - my_client[0].socket=self.accept() - if (my_client[0].socket is None): # not sure if required - return - if (ip_access_control(my_client[0].socket[1][0])): - my_client[0].ident=max_client_id - max_client_id+=1 - my_client[0].start_time=time.time() - my_client[0].waiting_data=multiprocessing.Queue(500) - clients_mutex.acquire() - clients.append(my_client) - clients_mutex.release() - handler = client_handler(my_client) - log.info("client accepted: "+str(len(clients)-1)+"@"+my_client[0].socket[1][0]+":"+str(my_client[0].socket[1][1])+" users now: "+str(len(clients))) - else: - log.info("client denied: "+str(len(clients)-1)+"@"+my_client[0].socket[1][0]+":"+str(my_client[0].socket[1][1])+" blocked by ip") - my_client.socket.close() - -rtl_tcp_resetting=False #put me away - -def rtl_tcp_asyncore_reset(timeout): - global rtl_tcp_core - global rtl_tcp_resetting - if rtl_tcp_resetting: return - #print "rtl_tcp_asyncore_reset" - rtl_tcp_resetting=True - time.sleep(timeout) - try: - rtl_tcp_core.close() - except: - pass - try: - del rtl_tcp_core - except: - pass - rtl_tcp_core=rtl_tcp_asyncore() - #print asyncore.socket_map - rtl_tcp_resetting=False - -class rtl_tcp_asyncore(asyncore.dispatcher): - def __init__(self): - global server_missing_logged - asyncore.dispatcher.__init__(self) - self.password_sent = False - self.ok=True - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) - try: - self.connect((cfg.rtl_tcp_host, cfg.rtl_tcp_port)) - self.socket.settimeout(0.1) - except: - log.error("rtl_tcp connection refused. Retrying.") - thread.start_new_thread(rtl_tcp_asyncore_reset, (1,)) - self.close() - return - - def handle_error(self): - global server_missing_logged - global rtl_tcp_connected - rtl_tcp_connected=False - exc_type, exc_value, exc_traceback = sys.exc_info() - self.ok=False - server_is_missing=hasattr(exc_value,"errno") and exc_value.errno==111 - if (not server_is_missing) or (not server_missing_logged): - log.error("with rtl_tcp host connection: "+str(exc_value)) - #traceback.print_tb(exc_traceback) - server_missing_logged|=server_is_missing - try: - self.close() - except: - pass - thread.start_new_thread(rtl_tcp_asyncore_reset, (2,)) - - def handle_connect(self): - global server_missing_logged - global rtl_tcp_connected - self.socket.settimeout(0.1) - self.password_sent = False - rtl_tcp_connected=True - if self.ok: - log.info("rtl_tcp host connection estabilished") - server_missing_logged=False - - def handle_close(self): - global rtl_tcp_connected - global rtl_tcp_core - rtl_tcp_connected=False - log.error("rtl_tcp host connection has closed, now trying to reopen") - try: - self.close() - except: - pass - thread.start_new_thread(rtl_tcp_asyncore_reset, (2,)) - - def handle_read(self): - global rtl_dongle_identifier - global dsp_input_queue - global watchdog_data_count - if(len(rtl_dongle_identifier)==0): - rtl_dongle_identifier=self.recv(12) - return - new_data_buffer=self.recv(1024*16) - if cfg.watchdog_interval: - watchdog_data_count+=1024*16 - if cfg.use_dsp_command: - dsp_input_queue.put(new_data_buffer) - #print "did put anyway" - else: - add_data_to_clients(new_data_buffer) - - def writable(self): - - #check if any new commands to write - global commands - return (not self.password_sent and cfg.rtl_tcp_password != None) or not commands.empty() - - def handle_write(self): - if(not self.password_sent and cfg.rtl_tcp_password != None): - log.info("Sending rtl_tcp_password...") - self.send(cfg.rtl_tcp_password) - self.password_sent = True - global commands - while not commands.empty(): - mcmd=commands.get() - self.send(mcmd) - -def xxd(data): - #diagnostic purposes only - output="" - for d in data: - output+=hex(ord(d))[2:].zfill(2)+" " - return output - -def handle_command(command, client_param): - global sample_rate - client=client_param[0] - param=array.array("I", command[1:5])[0] - param=socket.ntohl(param) - command_id=ord(command[0]) - client_info=str(client.ident)+"@"+client.socket[1][0]+":"+str(client.socket[1][1]) - if(time.time()-client.start_time client can't set anything until "+str(cfg.client_cant_set_until)+" seconds") - return 0 - if command_id == 1: - if max(map((lambda r: param>=r[0] and param<=r[1]),cfg.freq_allowed_ranges)): - log.debug("allow: "+client_info+" -> set freq "+str(param)) - return 1 - else: - log.debug("deny: "+client_info+" -> set freq - out of range: "+str(param)) - elif command_id == 2: - log.debug("deny: "+client_info+" -> set sample rate: "+str(param)) - sample_rate=param - return 0 # ordinary clients are not allowed to do this - elif command_id == 3: - log.debug("deny/allow: "+client_info+" -> set gain mode: "+str(param)) - return cfg.allow_gain_set - elif command_id == 4: - log.debug("deny/allow: "+client_info+" -> set gain: "+str(param)) - return cfg.allow_gain_set - elif command_id == 5: - log.debug("deny: "+client_info+" -> set freq correction: "+str(param)) - return 0 - elif command_id == 6: - log.debug("deny/allow: set if stage gain") - return cfg.allow_gain_set - elif command_id == 7: - log.debug("deny: set test mode") - return 0 - elif command_id == 8: - log.debug("deny/allow: set agc mode") - return cfg.allow_gain_set - elif command_id == 9: - log.debug("deny: set direct sampling") - return 0 - elif command_id == 10: - log.debug("deny: set offset tuning") - return 0 - elif command_id == 11: - log.debug("deny: set rtl xtal") - return 0 - elif command_id == 12: - log.debug("deny: set tuner xtal") - return 0 - elif command_id == 13: - log.debug("deny/allow: set tuner gain by index") - return cfg.allow_gain_set - else: - log.debug("deny: "+client_info+" sent an ivalid command: "+str(param)) - return 0 - -def watchdog_thread(): - global rtl_tcp_connected - global rtl_tcp_core - global watchdog_data_count - global sample_rate - zero_buffer_size=16348 - second_frac=10 - zero_buffer='\x7f'*zero_buffer_size - watchdog_data_count=0 - rtl_tcp_connected=False - null_fill=False - time.sleep(4) # wait before activating this thread - log.info("watchdog started") - first_start=True - n=0 - while True: - wait_altogether=cfg.watchdog_interval if rtl_tcp_connected or first_start else cfg.reconnect_interval - first_start=False - if null_fill: - log.error("watchdog: filling buffer with zeros.") - while wait_altogether>0: - wait_altogether-=1.0/second_frac - for i in range(0,((2*sample_rate)/second_frac)/zero_buffer_size): - add_data_to_clients(zero_buffer) - n+=len(zero_buffer) - time.sleep(0) #yield - if watchdog_data_count: break - if watchdog_data_count: break - time.sleep(1.0/second_frac) - #print "sent altogether",n - else: - time.sleep(wait_altogether) - null_fill=not watchdog_data_count - if not watchdog_data_count: - log.error("watchdog: restarting rtl_tcp_asyncore() now.") - rtl_tcp_asyncore_reset(0) - watchdog_data_count=0 - - - -def dsp_debug_thread(): - global dsp_data_count - global original_data_count - while 1: - time.sleep(1) - print "[rtl-mus] DSP | Original data: "+str(int(original_data_count/1000))+"kB/sec | Processed data: "+str(int(dsp_data_count/1000))+"kB/sec" - dsp_data_count = original_data_count=0 - -class client: - ident=None #id - to_close=False - waiting_data=None - start_time=None - socket=None - asyncore=None - - def close(self, use_mutex=True): - global clients_mutex - global clients - if use_mutex: clients_mutex.acquire() - correction=0 - for i in range(0,len(clients)): - i-=correction - if clients[i][0].ident==self.ident: - try: - self.socket.close() - except: - pass - try: - self.asyncore.close() - del self.asyncore - except: - pass - del clients[i] - correction+=1 - if use_mutex: clients_mutex.release() - - -def main(): - global server_missing_logged - global rtl_dongle_identifier - global log - global clients - global clients_mutex - global original_data_count - global dsp_input_queue - global dsp_data_count - global proc - global commands - global max_client_id - global rtl_tcp_core - global sample_rate - - #Set signal handler - signal.signal(signal.SIGINT, handle_signal) #http://stackoverflow.com/questions/1112343/how-do-i-capture-sigint-in-python - - # set up logging - log = logging.getLogger("rtl_mus") - log.setLevel(logging.DEBUG) - formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') - stream_handler = logging.StreamHandler() - stream_handler.setLevel(logging.DEBUG) - stream_handler.setFormatter(formatter) - log.addHandler(stream_handler) - file_handler = logging.FileHandler(cfg.log_file_path) - file_handler.setLevel(logging.INFO) - file_handler.setFormatter(formatter) - log.addHandler(file_handler) - log.info("Server is UP") - - server_missing_logged=0 # Not to flood the screen with messages related to rtl_tcp disconnect - rtl_dongle_identifier='' # rtl_tcp sends some identifier on dongle type and gain values in the first few bytes right after connection - clients=[] - dsp_data_count=original_data_count=0 - commands=multiprocessing.Queue() - dsp_input_queue=multiprocessing.Queue() - clients_mutex=multiprocessing.Lock() - max_client_id=0 - sample_rate=250000 # so far only watchdog thread uses it to fill buffer up with zeros on missing input - - # start dsp threads - if cfg.use_dsp_command: - print "[rtl_mus] Opening DSP process..." - proc = subprocess.Popen (cfg.dsp_command.split(" "), stdin = subprocess.PIPE, stdout = subprocess.PIPE) #!! should fix the split :-S - dsp_read_thread_v=thread.start_new_thread(dsp_read_thread, ()) - dsp_write_thread_v=thread.start_new_thread(dsp_write_thread, ()) - if cfg.debug_dsp_command: - dsp_debug_thread_v=thread.start_new_thread(dsp_debug_thread,()) - - # start watchdog thread - if cfg.watchdog_interval != 0: - watchdog_thread_v=thread.start_new_thread(watchdog_thread,()) - - # start asyncores - rtl_tcp_core = rtl_tcp_asyncore() - server_core = server_asyncore() - - asyncore.loop(0.1) - - -if __name__=="__main__": - print - print "rtl_mus: Multi-User I/Q Data Server for RTL-SDR v0.22, made at HA5KFU Amateur Radio Club (http://ha5kfu.hu)" - print " code by Andras Retzler, HA7ILM " - print " distributed under GNU GPL v3" - print - - try: - for libcpath in ["/lib/i386-linux-gnu/libc.so.6","/lib/libc.so.6"]: - if os.path.exists(libcpath): - libc = dl.open(libcpath) - libc.call("prctl", 15, "rtl_mus", 0, 0, 0) - break - except: - pass - - # === Load configuration script === - if len(sys.argv)==1: - print "[rtl_mus] Warning! Configuration script not specified. I will use: \"config_rtl.py\"" - config_script="config_rtl" - else: - config_script=sys.argv[1] - cfg=__import__(config_script) - if cfg.setuid_on_start: - os.setuid(cfg.uid) - main()