un-couple messaging between connections; use non-blocking io

This commit is contained in:
Jakob Ketterl 2019-09-21 22:10:16 +02:00
parent 6ec85aa349
commit 1ed69de5b0
2 changed files with 92 additions and 47 deletions

View File

@ -3,8 +3,10 @@ from owrx.source import DspManager, CpuUsageThread, SdrService, ClientRegistry
from owrx.feature import FeatureDetector from owrx.feature import FeatureDetector
from owrx.version import openwebrx_version from owrx.version import openwebrx_version
from owrx.bands import Bandplan from owrx.bands import Bandplan
import json
from owrx.map import Map from owrx.map import Map
from multiprocessing import Queue
import json
import threading
import logging import logging
@ -14,20 +16,29 @@ logger = logging.getLogger(__name__)
class Client(object): class Client(object):
def __init__(self, conn): def __init__(self, conn):
self.conn = conn self.conn = conn
self.multiprocessingPipe = Queue()
def mp_passthru():
run = True
while run:
try:
data = self.multiprocessingPipe.get()
self.protected_send(data)
except (EOFError, OSError):
run = False
self.passThruThread = threading.Thread(target=mp_passthru)
self.passThruThread.start()
def protected_send(self, data): def protected_send(self, data):
try: self.conn.protected_send(data)
self.conn.send(data)
# these exception happen when the socket is closed
except OSError:
logger.exception("OSError while sending data")
self.close()
except ValueError:
logger.exception("ValueError while sending data")
self.close()
def close(self): def close(self):
self.conn.close() self.conn.close()
self.multiprocessingPipe.close()
def mp_send(self, data):
self.multiprocessingPipe.put(data, block=False)
class OpenWebRxReceiverClient(Client): class OpenWebRxReceiverClient(Client):
@ -171,10 +182,10 @@ class OpenWebRxReceiverClient(Client):
self.protected_send({"type": "smeter", "value": level}) self.protected_send({"type": "smeter", "value": level})
def write_cpu_usage(self, usage): def write_cpu_usage(self, usage):
self.protected_send({"type": "cpuusage", "value": usage}) self.mp_send({"type": "cpuusage", "value": usage})
def write_clients(self, clients): def write_clients(self, clients):
self.protected_send({"type": "clients", "value": clients}) self.mp_send({"type": "clients", "value": clients})
def write_secondary_fft(self, data): def write_secondary_fft(self, data):
self.protected_send(bytes([0x03]) + data) self.protected_send(bytes([0x03]) + data)
@ -227,7 +238,7 @@ class MapConnection(Client):
self.protected_send({"type": "config", "value": cfg}) self.protected_send({"type": "config", "value": cfg})
def write_update(self, update): def write_update(self, update):
self.protected_send({"type": "update", "value": update}) self.mp_send({"type": "update", "value": update})
class WebSocketMessageHandler(object): class WebSocketMessageHandler(object):
@ -289,3 +300,7 @@ class WebSocketMessageHandler(object):
def handleBinaryMessage(self, conn, data): def handleBinaryMessage(self, conn, data):
logger.error("unsupported binary message, discarding") logger.error("unsupported binary message, discarding")
def handleClose(self):
if self.client:
self.client.close()

View File

@ -3,6 +3,7 @@ import hashlib
import json import json
import os import os
import select import select
import threading
import logging import logging
@ -16,9 +17,11 @@ class IncompleteRead(Exception):
class WebSocketConnection(object): class WebSocketConnection(object):
def __init__(self, handler, messageHandler): def __init__(self, handler, messageHandler):
self.handler = handler self.handler = handler
self.handler.connection.setblocking(0)
self.messageHandler = messageHandler self.messageHandler = messageHandler
(self.interruptPipeRecv, self.interruptPipeSend) = os.pipe() (self.interruptPipeRecv, self.interruptPipeSend) = os.pipe()
self.open = True self.open = True
self.sendLock = threading.Lock()
my_headers = self.handler.headers.items() my_headers = self.handler.headers.items()
my_header_keys = list(map(lambda x: x[0], my_headers)) my_header_keys = list(map(lambda x: x[0], my_headers))
h_key_exists = lambda x: my_header_keys.count(x) h_key_exists = lambda x: my_header_keys.count(x)
@ -79,53 +82,80 @@ class WebSocketConnection(object):
else: else:
header = self.get_header(len(data), 2) header = self.get_header(len(data), 2)
data_to_send = header + data data_to_send = header + data
written = self.handler.wfile.write(data_to_send)
if written != len(data_to_send): def chunks(l, n):
logger.error("incomplete write! closing socket!") """Yield successive n-sized chunks from l."""
self.close() for i in range(0, len(l), n):
else: yield l[i : i + n]
self.handler.wfile.flush()
with self.sendLock:
for chunk in chunks(data_to_send, 1024):
(_, write, _) = select.select([], [self.handler.wfile], [], 10)
if self.handler.wfile in write:
written = self.handler.wfile.write(chunk)
if written != len(chunk):
logger.error("incomplete write! closing socket!")
self.close()
else:
logger.debug("socket not returned from select; closing")
self.close()
def protected_read(self, num): def protected_read(self, num):
data = self.handler.rfile.read(num) data = self.handler.rfile.read(num)
if len(data) != num: if data is None or len(data) != num:
raise IncompleteRead() raise IncompleteRead()
return data return data
def protected_send(self, data):
try:
self.send(data)
# these exception happen when the socket is closed
except OSError:
logger.exception("OSError while writing data")
self.close()
except ValueError:
logger.exception("ValueError while writing data")
self.close()
def interrupt(self): def interrupt(self):
os.write(self.interruptPipeSend, bytes(0x00)) os.write(self.interruptPipeSend, bytes(0x00))
def read_loop(self): def read_loop(self):
self.open = True self.open = True
while self.open: while self.open:
try: (read, _, _) = select.select([self.interruptPipeRecv, self.handler.rfile], [], [])
(read, _, _) = select.select([self.interruptPipeRecv, self.handler.rfile], [], []) if self.handler.rfile in read:
if read[0] == self.handler.rfile: available = True
header = self.protected_read(2) while available:
opcode = header[0] & 0x0F try:
length = header[1] & 0x7F
mask = (header[1] & 0x80) >> 7
if length == 126:
header = self.protected_read(2) header = self.protected_read(2)
length = (header[0] << 8) + header[1] opcode = header[0] & 0x0F
if mask: length = header[1] & 0x7F
masking_key = self.protected_read(4) mask = (header[1] & 0x80) >> 7
data = self.protected_read(length) if length == 126:
if mask: header = self.protected_read(2)
data = bytes([b ^ masking_key[index % 4] for (index, b) in enumerate(data)]) length = (header[0] << 8) + header[1]
if opcode == 1: if mask:
message = data.decode("utf-8") masking_key = self.protected_read(4)
self.messageHandler.handleTextMessage(self, message) data = self.protected_read(length)
elif opcode == 2: if mask:
self.messageHandler.handleBinaryMessage(self, data) data = bytes([b ^ masking_key[index % 4] for (index, b) in enumerate(data)])
elif opcode == 8: if opcode == 1:
logger.debug("websocket close frame received; closing connection") message = data.decode("utf-8")
self.open = False self.messageHandler.handleTextMessage(self, message)
else: elif opcode == 2:
logger.warning("unsupported opcode: {0}".format(opcode)) self.messageHandler.handleBinaryMessage(self, data)
except IncompleteRead: elif opcode == 8:
logger.warning("incomplete websocket read; closing socket") logger.debug("websocket close frame received; closing connection")
self.open = False self.open = False
else:
logger.warning("unsupported opcode: {0}".format(opcode))
except IncompleteRead:
available = False
logger.debug("websocket loop ended; shutting down")
self.messageHandler.handleClose()
logger.debug("websocket loop ended; sending close frame") logger.debug("websocket loop ended; sending close frame")