WebSocket, JavaScript fixes, etc.
This commit is contained in:
		| @@ -1040,8 +1040,9 @@ function waterfall_add_queue(what) | ||||
| function waterfall_dequeue() | ||||
| { | ||||
| 	if(waterfall_queue.length) waterfall_add(waterfall_queue.shift()); | ||||
| 	if(waterfall_queue.length>fft_fps/2) //in case of emergency  | ||||
| 	if(waterfall_queue.length>Math.max(fft_fps/2,8)) //in case of emergency  | ||||
| 	{ | ||||
| 		console.log(waterfall_queue.length); | ||||
| 		add_problem("fft overflow"); | ||||
| 		while(waterfall_queue.length) waterfall_add(waterfall_queue.shift()); | ||||
| 	} | ||||
| @@ -1213,6 +1214,9 @@ function webrx_set_param(what, value) | ||||
|  | ||||
| function audio_init() | ||||
| { | ||||
| 	audio_debug_time_start=(new Date()).getTime(); | ||||
| 	audio_debug_time_last_start=audio_debug_time_start; | ||||
|  | ||||
| 	//https://github.com/0xfe/experiments/blob/master/www/tone/js/sinewave.js | ||||
| 	audio_initialized=1; // only tell on_ws_recv() not to call it again | ||||
| 	try  | ||||
| @@ -1563,13 +1567,18 @@ var rt = function (s,n) {return s.replace(/[a-zA-Z]/g,function(c){return String. | ||||
| var irt = function (s,n) {return s.replace(/[a-zA-Z]/g,function(c){return String.fromCharCode((c>="a"?97:65)<=(c=c.charCodeAt(0)-n)?c:c+26);});} | ||||
| var sendmail2 = function (s) { window.location.href="mailto:"+irt(s.replace("=",String.fromCharCode(0100)).replace("$","."),8); } | ||||
|  | ||||
| var audio_debug_time_taken=0; | ||||
| var audio_debug_time_start=0; | ||||
| var audio_debug_time_last_start=0; | ||||
|  | ||||
| function debug_audio() | ||||
| { | ||||
| 	audio_debug_time_taken+=1; | ||||
| 	if(audio_debug_time_start==0) return; //audio_init has not been called | ||||
| 	time_now=(new Date()).getTime(); | ||||
| 	audio_debug_time_since_last_call=(time_now-audio_debug_time_last_start)/1000; | ||||
| 	audio_debug_time_last_start=time_now; //now | ||||
| 	audio_debug_time_taken=(time_now-audio_debug_time_start)/1000; | ||||
| 	e("openwebrx-audio-sps").innerHTML= | ||||
| 		"audio recv. at "+audio_buffer_current_size_debug.toString()+" sps ("+ | ||||
| 		"audio recv. at "+(audio_buffer_current_size_debug/audio_debug_time_since_last_call).toFixed(0)+" sps ("+ | ||||
| 		(audio_buffer_all_size_debug/audio_debug_time_taken).toFixed(1)+" sps avg.), feed at "+ | ||||
| 		((audio_buffer_current_count_debug*audio_buffer_size)/audio_debug_time_taken).toFixed(1)+" sps output"; | ||||
| 	audio_buffer_current_size_debug=0; | ||||
|   | ||||
							
								
								
									
										12
									
								
								openwebrx.py
									
									
									
									
									
								
							
							
						
						
									
										12
									
								
								openwebrx.py
									
									
									
									
									
								
							| @@ -158,6 +158,7 @@ def spectrum_thread_function(): | ||||
| 			i-=correction | ||||
| 			if (clients[i].ws_started): | ||||
| 				if clients[i].spectrum_queue.full(): | ||||
| 					print "[openwebrx-spectrum] client spectrum queue full, closing it." | ||||
| 					close_client(i, False) | ||||
| 					correction+=1 | ||||
| 				else: | ||||
| @@ -192,6 +193,7 @@ def cleanup_clients(): | ||||
| 		i-=correction | ||||
| 		#print "cleanup_clients:: len(clients)=", len(clients), "i=", i | ||||
| 		if (not clients[i].ws_started) and (time.time()-clients[i].gen_time)>180: | ||||
| 			print "[openwebrx] cleanup_clients :: client timeout to open WebSocket" | ||||
| 			close_client(i, False) | ||||
| 			correction+=1 | ||||
| 	clients_mutex.release() | ||||
| @@ -200,12 +202,13 @@ def generate_client_id(ip): | ||||
| 	#add a client | ||||
| 	global clients | ||||
| 	global clients_mutex | ||||
| 	new_client=namedtuple("ClientStruct", "id gen_time ws_started sprectum_queue ip")	 | ||||
| 	new_client=namedtuple("ClientStruct", "id gen_time ws_started sprectum_queue ip closed")	 | ||||
| 	new_client.id=md5.md5(str(random.random())).hexdigest() | ||||
| 	new_client.gen_time=time.time() | ||||
| 	new_client.ws_started=False # to check whether client has ever tried to open the websocket | ||||
| 	new_client.spectrum_queue=Queue.Queue(1000) | ||||
| 	new_client.ip=ip | ||||
| 	new_client.closed=[False] #byref, not exactly sure if required | ||||
| 	clients_mutex.acquire() | ||||
| 	clients.append(new_client) | ||||
| 	log_client(new_client,"client added. Clients now: {0}".format(len(clients))) | ||||
| @@ -218,6 +221,7 @@ def close_client(i, use_mutex=True): | ||||
| 	global clients | ||||
| 	log_client(clients[i],"client being closed.") | ||||
| 	if use_mutex: clients_mutex.acquire() | ||||
| 	clients[i].closed[0]=True | ||||
| 	del clients[i] | ||||
| 	if use_mutex: clients_mutex.release() | ||||
| 	 | ||||
| @@ -256,7 +260,7 @@ class WebRXHandler(BaseHTTPRequestHandler): | ||||
|  | ||||
| 					# ========= Client handshake ========= | ||||
| 					if myclient.ws_started: | ||||
| 						print "[openwebrx-httpd] error: second client ws connection, throwing it." | ||||
| 						print "[openwebrx-httpd] error: second WS connection with the same client id, throwing it." | ||||
| 						self.send_error(400, 'Bad request.') #client already started | ||||
| 						return | ||||
| 					rxws.send(self, "CLIENT DE SERVER openwebrx.py") | ||||
| @@ -277,6 +281,10 @@ class WebRXHandler(BaseHTTPRequestHandler): | ||||
| 					dsp.start() | ||||
| 					 | ||||
| 					while True: | ||||
| 						if myclient.closed[0]:  | ||||
| 							print "[openwebrx-httpd:ws] client closed by other thread" | ||||
| 							break | ||||
|  | ||||
| 						# ========= send audio ========= | ||||
| 						temp_audio_data=dsp.read(1024*8) | ||||
| 						rxws.send(self, temp_audio_data, "AUD ") | ||||
|   | ||||
							
								
								
									
										38
									
								
								rxws.py
									
									
									
									
									
								
							
							
						
						
									
										38
									
								
								rxws.py
									
									
									
									
									
								
							| @@ -24,6 +24,7 @@ Authors: | ||||
| import base64 | ||||
| import sha | ||||
| import select | ||||
| import code | ||||
|  | ||||
| class WebSocketException(Exception): | ||||
| 	pass | ||||
| @@ -43,7 +44,7 @@ def handshake(myself): | ||||
| 	ws_key=h_value("sec-websocket-key") | ||||
| 	ws_key_toreturn=base64.b64encode(sha.new(ws_key+"258EAFA5-E914-47DA-95CA-C5AB0DC85B11").digest()) | ||||
| 	#A sample list of keys we get: [('origin', 'http://localhost:8073'), ('upgrade', 'websocket'), ('sec-websocket-extensions', 'x-webkit-deflate-frame'), ('sec-websocket-version', '13'), ('host', 'localhost:8073'), ('sec-websocket-key', 't9J1rgy4fc9fg2Hshhnkmg=='), ('connection', 'Upgrade'), ('pragma', 'no-cache'), ('cache-control', 'no-cache')] | ||||
| 	myself.connection.send("HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: "+ws_key_toreturn+"\r\nCQ-CQ-de: HA5KFU\r\n\r\n") | ||||
| 	myself.wfile.write("HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: "+ws_key_toreturn+"\r\nCQ-CQ-de: HA5KFU\r\n\r\n") | ||||
|  | ||||
| def get_header(size): | ||||
| 	#this does something similar: https://github.com/lemmingzshadow/php-websocket/blob/master/server/lib/WebSocket/Connection.php | ||||
| @@ -84,12 +85,31 @@ def xxd(data): | ||||
| 		output+=hex(ord(d))[2:].zfill(2)+" "  | ||||
| 	return output | ||||
|  | ||||
| #for R/W the WebSocket, use recv/send | ||||
| #for reading the TCP socket, use readsock  | ||||
| #for writing the TCP socket, use myself.wfile.write and flush | ||||
|  | ||||
| def readsock(myself,size,blocking): | ||||
| 	#http://thenestofheliopolis.blogspot.hu/2011/01/how-to-implement-non-blocking-two-way.html | ||||
| 	if blocking: | ||||
| 		return myself.rfile.read(size) | ||||
| 	else: | ||||
| 		poll = select.poll() | ||||
| 		poll.register(myself.rfile.fileno(), select.POLLIN or select.POLLPRI) | ||||
| 		fd = poll.poll(0) #timeout is 0 | ||||
| 		if len(fd): | ||||
| 			f = fd[0] | ||||
| 			if f[1] > 0: | ||||
| 				return myself.rfile.read(size) | ||||
| 	return "" | ||||
|  | ||||
|  | ||||
| def recv(myself, blocking=False, debug=False): | ||||
| 	bufsize=70000 | ||||
| 	myself.connection.setblocking(blocking) | ||||
| 	#myself.connection.setblocking(blocking) #umm... we cannot do that with rfile | ||||
| 	if debug: print "ws_recv begin" | ||||
| 	try: | ||||
| 		data=myself.connection.recv(6) | ||||
| 		data=readsock(myself,6,blocking) | ||||
| 		#print "rxws.recv bytes:",xxd(data)	 | ||||
| 	except: | ||||
| 		if debug: print "ws_recv error"	 | ||||
| @@ -99,7 +119,7 @@ def recv(myself, blocking=False, debug=False): | ||||
| 	fin=ord(data[0])&128!=0 | ||||
| 	is_text_frame=ord(data[0])&15==1 | ||||
| 	length=ord(data[1])&0x7f | ||||
| 	data+=myself.connection.recv(length) | ||||
| 	data+=readsock(myself,length,blocking) | ||||
| 	#print "rxws.recv length is ",length," (multiple packets together?) len(data) =",len(data) | ||||
| 	has_one_byte_length=length<125 | ||||
| 	masked=ord(data[1])&0x80!=0 | ||||
| @@ -119,7 +139,9 @@ def recv(myself, blocking=False, debug=False): | ||||
|  | ||||
|  | ||||
| def flush(myself):  | ||||
| 	lR,lW,lX = select.select([],[myself.connection,],[],60) | ||||
| 	myself.wfile.flush() | ||||
| 	#or the socket, not the rfile: | ||||
| 	#lR,lW,lX = select.select([],[myself.connection,],[],60) | ||||
| 	 | ||||
|  | ||||
| def send(myself, data, begin_id="", debug=0): | ||||
| @@ -133,13 +155,15 @@ def send(myself, data, begin_id="", debug=0): | ||||
| 			data_to_send=begin_id+data[counter:counter+base_frame_size-len(begin_id)] | ||||
| 			header=get_header(len(data_to_send)) | ||||
| 			flush(myself) | ||||
| 			myself.connection.send(header+data_to_send) | ||||
| 			myself.wfile.write(header+data_to_send) | ||||
| 			flush(myself) | ||||
| 			if debug: print "rxws.send ==================== #1 if branch :: from={0} to={1} dlen={2} hlen={3}".format(counter,counter+base_frame_size-len(begin_id),len(data_to_send),len(header)) | ||||
| 		else: | ||||
| 			data_to_send=begin_id+data[counter:] | ||||
| 			header=get_header(len(data_to_send)) | ||||
| 			flush(myself) | ||||
| 			myself.connection.send(header+data_to_send) | ||||
| 			myself.wfile.write(header+data_to_send) | ||||
| 			flush(myself) | ||||
| 			if debug: print "rxws.send :: #2 else branch :: dlen={0} hlen={1}".format(len(data_to_send),len(header)) | ||||
| 			#if debug: print "header:\n"+xxdg(header)+"\n\nws data:\n"+xxdg(data_to_send) | ||||
| 			break | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 ha7ilm
					ha7ilm