diff --git a/owrx/websocket.py b/owrx/websocket.py index 4908e72..46ec6c8 100644 --- a/owrx/websocket.py +++ b/owrx/websocket.py @@ -31,6 +31,8 @@ class Drained(WebSocketException): class WebSocketClosed(WebSocketException): pass +class InterruptOrTimeout(WebSocketException): + pass class WebSocketConnection(object): connections = [] @@ -175,60 +177,67 @@ class WebSocketConnection(object): def protected_read(num): data = self.handler.rfile.read(num) if data is None: - raise Drained() - if len(data) != num: + data = bytes() + while self.open and len(data) < num: + (read, _, _) = select.select([self.interruptPipeRecv, self.handler.rfile], [], [], 15) + if self.handler.rfile in read: + data += self.handler.rfile.read(num - len(data)) + else: + if len(data) == 0: + raise InterruptOrTimeout() + else: + raise IncompleteRead() + if len(data) < num: raise IncompleteRead() return data self.open = True while self.open: - (read, _, _) = select.select([self.interruptPipeRecv, self.handler.rfile], [], [], 15) - if self.handler.rfile in read: - available = True + try: + header = protected_read(2) self.resetPing() - while self.open and available: + opcode = header[0] & 0x0F + length = header[1] & 0x7F + mask = (header[1] & 0x80) >> 7 + if length == 126: + header = protected_read(2) + length = (header[0] << 8) + header[1] + if mask: + masking_key = protected_read(4) + data = None + if length > 0: + data = protected_read(length) + if mask: + data = bytes([b ^ masking_key[index % 4] for (index, b) in enumerate(data)]) + if opcode == OPCODE_TEXT_MESSAGE: + message = data.decode("utf-8") try: - header = protected_read(2) - opcode = header[0] & 0x0F - length = header[1] & 0x7F - mask = (header[1] & 0x80) >> 7 - if length == 126: - header = protected_read(2) - length = (header[0] << 8) + header[1] - if mask: - masking_key = protected_read(4) - data = protected_read(length) - if mask: - data = bytes([b ^ masking_key[index % 4] for (index, b) in enumerate(data)]) - if opcode == OPCODE_TEXT_MESSAGE: - message = data.decode("utf-8") - try: - self.messageHandler.handleTextMessage(self, message) - except Exception: - logger.exception("Exception in websocket handler handleTextMessage()") - elif opcode == OPCODE_BINARY_MESSAGE: - try: - self.messageHandler.handleBinaryMessage(self, data) - except Exception: - logger.exception("Exception in websocket handler handleBinaryMessage()") - elif opcode == OPCODE_PING: - self.sendPong() - elif opcode == OPCODE_PONG: - # since every read resets the ping timer, there's nothing to do here. - pass - elif opcode == OPCODE_CLOSE: - logger.debug("websocket close frame received; closing connection") - self.open = False - else: - logger.warning("unsupported opcode: {0}".format(opcode)) - except Drained: - available = False - except IncompleteRead: - logger.warning("incomplete read on websocket; closing connection") - self.open = False - except OSError: - logger.exception("OSError while reading data; closing connection") - self.open = False + self.messageHandler.handleTextMessage(self, message) + except Exception: + logger.exception("Exception in websocket handler handleTextMessage()") + elif opcode == OPCODE_BINARY_MESSAGE: + try: + self.messageHandler.handleBinaryMessage(self, data) + except Exception: + logger.exception("Exception in websocket handler handleBinaryMessage()") + elif opcode == OPCODE_PING: + self.sendPong() + elif opcode == OPCODE_PONG: + # since every read resets the ping timer, there's nothing to do here. + pass + elif opcode == OPCODE_CLOSE: + logger.debug("websocket close frame received; closing connection") + self.open = False + else: + logger.warning("unsupported opcode: {0}".format(opcode)) + except InterruptOrTimeout: + pass + except IncompleteRead: + logger.warning("incomplete read on websocket; closing connection") + self.open = False + except OSError: + logger.exception("OSError while reading data; closing connection") + self.open = False self.interruptPipeSend.close() self.interruptPipeSend = None