hand over message handling after initial handshake instead of delegating
This commit is contained in:
parent
42c59a3aa0
commit
cbc7b73b1d
@ -39,6 +39,12 @@ class Client(object):
|
|||||||
def mp_send(self, data):
|
def mp_send(self, data):
|
||||||
self.multiprocessingPipe.put(data, block=False)
|
self.multiprocessingPipe.put(data, block=False)
|
||||||
|
|
||||||
|
def handleBinaryMessage(self, conn, data):
|
||||||
|
logger.error("unsupported binary message, discarding")
|
||||||
|
|
||||||
|
def handleClose(self):
|
||||||
|
self.close()
|
||||||
|
|
||||||
|
|
||||||
class OpenWebRxReceiverClient(Client):
|
class OpenWebRxReceiverClient(Client):
|
||||||
config_keys = [
|
config_keys = [
|
||||||
@ -100,6 +106,35 @@ class OpenWebRxReceiverClient(Client):
|
|||||||
|
|
||||||
CpuUsageThread.getSharedInstance().add_client(self)
|
CpuUsageThread.getSharedInstance().add_client(self)
|
||||||
|
|
||||||
|
def handleTextMessage(self, conn, message):
|
||||||
|
try:
|
||||||
|
message = json.loads(message)
|
||||||
|
if "type" in message:
|
||||||
|
if message["type"] == "dspcontrol":
|
||||||
|
if "action" in message and message["action"] == "start":
|
||||||
|
self.startDsp()
|
||||||
|
|
||||||
|
if "params" in message:
|
||||||
|
params = message["params"]
|
||||||
|
self.setDspProperties(params)
|
||||||
|
|
||||||
|
if message["type"] == "config":
|
||||||
|
if "params" in message:
|
||||||
|
self.setParams(message["params"])
|
||||||
|
if message["type"] == "setsdr":
|
||||||
|
if "params" in message:
|
||||||
|
self.setSdr(message["params"]["sdr"])
|
||||||
|
if message["type"] == "selectprofile":
|
||||||
|
if "params" in message and "profile" in message["params"]:
|
||||||
|
profile = message["params"]["profile"].split("|")
|
||||||
|
self.setSdr(profile[0])
|
||||||
|
self.sdr.activateProfile(profile[1])
|
||||||
|
else:
|
||||||
|
logger.warning("received message without type: {0}".format(message))
|
||||||
|
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
logger.warning("message is not json: {0}".format(message))
|
||||||
|
|
||||||
def setSdr(self, id=None):
|
def setSdr(self, id=None):
|
||||||
next = SdrService.getSource(id)
|
next = SdrService.getSource(id)
|
||||||
if next == self.sdr:
|
if next == self.sdr:
|
||||||
@ -229,6 +264,9 @@ class MapConnection(Client):
|
|||||||
|
|
||||||
Map.getSharedInstance().addClient(self)
|
Map.getSharedInstance().addClient(self)
|
||||||
|
|
||||||
|
def handleTextMessage(self, conn, message):
|
||||||
|
pass
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
Map.getSharedInstance().removeClient(self)
|
Map.getSharedInstance().removeClient(self)
|
||||||
super().close()
|
super().close()
|
||||||
@ -243,8 +281,6 @@ class MapConnection(Client):
|
|||||||
class WebSocketMessageHandler(object):
|
class WebSocketMessageHandler(object):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.handshake = None
|
self.handshake = None
|
||||||
self.client = None
|
|
||||||
self.dsp = None
|
|
||||||
|
|
||||||
def handleTextMessage(self, conn, message):
|
def handleTextMessage(self, conn, message):
|
||||||
if message[:16] == "SERVER DE CLIENT":
|
if message[:16] == "SERVER DE CLIENT":
|
||||||
@ -256,50 +292,18 @@ class WebSocketMessageHandler(object):
|
|||||||
|
|
||||||
if "type" in self.handshake:
|
if "type" in self.handshake:
|
||||||
if self.handshake["type"] == "receiver":
|
if self.handshake["type"] == "receiver":
|
||||||
self.client = OpenWebRxReceiverClient(conn)
|
client = OpenWebRxReceiverClient(conn)
|
||||||
if self.handshake["type"] == "map":
|
if self.handshake["type"] == "map":
|
||||||
self.client = MapConnection(conn)
|
client = MapConnection(conn)
|
||||||
# backwards compatibility
|
# backwards compatibility
|
||||||
else:
|
else:
|
||||||
self.client = OpenWebRxReceiverClient(conn)
|
client = OpenWebRxReceiverClient(conn)
|
||||||
|
|
||||||
|
# hand off all further communication to the correspondig connection
|
||||||
|
conn.setMessageHandler(client)
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|
||||||
if not self.handshake:
|
if not self.handshake:
|
||||||
logger.warning("not answering client request since handshake is not complete")
|
logger.warning("not answering client request since handshake is not complete")
|
||||||
return
|
return
|
||||||
|
|
||||||
try:
|
|
||||||
message = json.loads(message)
|
|
||||||
if "type" in message:
|
|
||||||
if message["type"] == "dspcontrol":
|
|
||||||
if "action" in message and message["action"] == "start":
|
|
||||||
self.client.startDsp()
|
|
||||||
|
|
||||||
if "params" in message:
|
|
||||||
params = message["params"]
|
|
||||||
self.client.setDspProperties(params)
|
|
||||||
|
|
||||||
if message["type"] == "config":
|
|
||||||
if "params" in message:
|
|
||||||
self.client.setParams(message["params"])
|
|
||||||
if message["type"] == "setsdr":
|
|
||||||
if "params" in message:
|
|
||||||
self.client.setSdr(message["params"]["sdr"])
|
|
||||||
if message["type"] == "selectprofile":
|
|
||||||
if "params" in message and "profile" in message["params"]:
|
|
||||||
profile = message["params"]["profile"].split("|")
|
|
||||||
self.client.setSdr(profile[0])
|
|
||||||
self.client.sdr.activateProfile(profile[1])
|
|
||||||
else:
|
|
||||||
logger.warning("received message without type: {0}".format(message))
|
|
||||||
|
|
||||||
except json.JSONDecodeError:
|
|
||||||
logger.warning("message is not json: {0}".format(message))
|
|
||||||
|
|
||||||
def handleBinaryMessage(self, conn, data):
|
|
||||||
logger.error("unsupported binary message, discarding")
|
|
||||||
|
|
||||||
def handleClose(self):
|
|
||||||
if self.client:
|
|
||||||
self.client.close()
|
|
||||||
|
@ -38,7 +38,7 @@ class WebSocketConnection(object):
|
|||||||
def __init__(self, handler, messageHandler):
|
def __init__(self, handler, messageHandler):
|
||||||
self.handler = handler
|
self.handler = handler
|
||||||
self.handler.connection.setblocking(0)
|
self.handler.connection.setblocking(0)
|
||||||
self.messageHandler = messageHandler
|
self.setMessageHandler(messageHandler)
|
||||||
(self.interruptPipeRecv, self.interruptPipeSend) = Pipe(duplex=False)
|
(self.interruptPipeRecv, self.interruptPipeSend) = Pipe(duplex=False)
|
||||||
self.open = True
|
self.open = True
|
||||||
self.sendLock = threading.Lock()
|
self.sendLock = threading.Lock()
|
||||||
@ -64,6 +64,9 @@ class WebSocketConnection(object):
|
|||||||
self.pingTimer = None
|
self.pingTimer = None
|
||||||
self.resetPing()
|
self.resetPing()
|
||||||
|
|
||||||
|
def setMessageHandler(self, messageHandler):
|
||||||
|
self.messageHandler = messageHandler
|
||||||
|
|
||||||
def get_header(self, size, opcode):
|
def get_header(self, size, opcode):
|
||||||
ws_first_byte = 0b10000000 | (opcode & 0x0F)
|
ws_first_byte = 0b10000000 | (opcode & 0x0F)
|
||||||
if size > 2 ** 16 - 1:
|
if size > 2 ** 16 - 1:
|
||||||
|
Loading…
Reference in New Issue
Block a user