Improve web socket implementation to wait for message body after receiving header

This commit is contained in:
Max Sasaga 2020-11-16 23:37:36 +02:00
parent 2579b9be26
commit 14f6c25eb6

View File

@ -31,6 +31,8 @@ class Drained(WebSocketException):
class WebSocketClosed(WebSocketException): class WebSocketClosed(WebSocketException):
pass pass
class InterruptOrTimeout(WebSocketException):
pass
class WebSocketConnection(object): class WebSocketConnection(object):
connections = [] connections = []
@ -175,60 +177,67 @@ class WebSocketConnection(object):
def protected_read(num): def protected_read(num):
data = self.handler.rfile.read(num) data = self.handler.rfile.read(num)
if data is None: if data is None:
raise Drained() data = bytes()
if len(data) != num: while self.open and len(data) < num:
(read, _, _) = select.select([self.interruptPipeRecv, self.handler.rfile], [], [], 15)
if self.handler.rfile in read:
data += self.handler.rfile.read(num - len(data))
else:
if len(data) == 0:
raise InterruptOrTimeout()
else:
raise IncompleteRead()
if len(data) < num:
raise IncompleteRead() raise IncompleteRead()
return data return data
self.open = True self.open = True
while self.open: while self.open:
(read, _, _) = select.select([self.interruptPipeRecv, self.handler.rfile], [], [], 15) try:
if self.handler.rfile in read: header = protected_read(2)
available = True
self.resetPing() self.resetPing()
while self.open and available: opcode = header[0] & 0x0F
length = header[1] & 0x7F
mask = (header[1] & 0x80) >> 7
if length == 126:
header = protected_read(2)
length = (header[0] << 8) + header[1]
if mask:
masking_key = protected_read(4)
data = None
if length > 0:
data = protected_read(length)
if mask:
data = bytes([b ^ masking_key[index % 4] for (index, b) in enumerate(data)])
if opcode == OPCODE_TEXT_MESSAGE:
message = data.decode("utf-8")
try: try:
header = protected_read(2) self.messageHandler.handleTextMessage(self, message)
opcode = header[0] & 0x0F except Exception:
length = header[1] & 0x7F logger.exception("Exception in websocket handler handleTextMessage()")
mask = (header[1] & 0x80) >> 7 elif opcode == OPCODE_BINARY_MESSAGE:
if length == 126: try:
header = protected_read(2) self.messageHandler.handleBinaryMessage(self, data)
length = (header[0] << 8) + header[1] except Exception:
if mask: logger.exception("Exception in websocket handler handleBinaryMessage()")
masking_key = protected_read(4) elif opcode == OPCODE_PING:
data = protected_read(length) self.sendPong()
if mask: elif opcode == OPCODE_PONG:
data = bytes([b ^ masking_key[index % 4] for (index, b) in enumerate(data)]) # since every read resets the ping timer, there's nothing to do here.
if opcode == OPCODE_TEXT_MESSAGE: pass
message = data.decode("utf-8") elif opcode == OPCODE_CLOSE:
try: logger.debug("websocket close frame received; closing connection")
self.messageHandler.handleTextMessage(self, message) self.open = False
except Exception: else:
logger.exception("Exception in websocket handler handleTextMessage()") logger.warning("unsupported opcode: {0}".format(opcode))
elif opcode == OPCODE_BINARY_MESSAGE: except InterruptOrTimeout:
try: pass
self.messageHandler.handleBinaryMessage(self, data) except IncompleteRead:
except Exception: logger.warning("incomplete read on websocket; closing connection")
logger.exception("Exception in websocket handler handleBinaryMessage()") self.open = False
elif opcode == OPCODE_PING: except OSError:
self.sendPong() logger.exception("OSError while reading data; closing connection")
elif opcode == OPCODE_PONG: self.open = False
# since every read resets the ping timer, there's nothing to do here.
pass
elif opcode == OPCODE_CLOSE:
logger.debug("websocket close frame received; closing connection")
self.open = False
else:
logger.warning("unsupported opcode: {0}".format(opcode))
except Drained:
available = False
except IncompleteRead:
logger.warning("incomplete read on websocket; closing connection")
self.open = False
except OSError:
logger.exception("OSError while reading data; closing connection")
self.open = False
self.interruptPipeSend.close() self.interruptPipeSend.close()
self.interruptPipeSend = None self.interruptPipeSend = None