distinguish between error condition and normal socket close

This commit is contained in:
Jakob Ketterl 2022-01-11 19:57:52 +01:00
parent f3dcf5c320
commit 5adb53d990
2 changed files with 38 additions and 28 deletions

View File

@ -56,9 +56,10 @@ class Client(Handler, metaclass=ABCMeta):
try: try:
self.conn.send(data) self.conn.send(data)
except IOError: except IOError:
self.close() logger.exception("error in Client::send()")
self.close(error=True)
def close(self): def close(self, error: bool = False):
if self.multithreadingQueue is not None: if self.multithreadingQueue is not None:
while True: while True:
try: try:
@ -70,7 +71,7 @@ class Client(Handler, metaclass=ABCMeta):
except Full: except Full:
# this shouldn't happen, we just emptied the queue, but it's not worth risking the exception # this shouldn't happen, we just emptied the queue, but it's not worth risking the exception
logger.exception("impossible queue state: Full after Empty") logger.exception("impossible queue state: Full after Empty")
self.conn.close() self.conn.close(socketError=error)
def mp_send(self, data): def mp_send(self, data):
if self.multithreadingQueue is None: if self.multithreadingQueue is None:
@ -78,7 +79,7 @@ class Client(Handler, metaclass=ABCMeta):
try: try:
self.multithreadingQueue.put(data, block=False) self.multithreadingQueue.put(data, block=False)
except Full: except Full:
self.close() self.close(error=True)
@abstractmethod @abstractmethod
def handleTextMessage(self, conn, message): def handleTextMessage(self, conn, message):
@ -107,9 +108,9 @@ class OpenWebRxClient(Client, metaclass=ABCMeta):
def write_receiver_details(self, details): def write_receiver_details(self, details):
self.send({"type": "receiver_details", "value": details}) self.send({"type": "receiver_details", "value": details})
def close(self): def close(self, error: bool = False):
self._detailsSubscription.cancel() self._detailsSubscription.cancel()
super().close() super().close(error)
class OpenWebRxReceiverClient(OpenWebRxClient, SdrSourceEventClient): class OpenWebRxReceiverClient(OpenWebRxClient, SdrSourceEventClient):
@ -339,7 +340,7 @@ class OpenWebRxReceiverClient(OpenWebRxClient, SdrSourceEventClient):
def handleNoSdrsAvailable(self): def handleNoSdrsAvailable(self):
self.write_sdr_error("No SDR Devices available") self.write_sdr_error("No SDR Devices available")
def close(self): def close(self, error: bool = False):
if self.sdr is not None: if self.sdr is not None:
self.sdr.removeClient(self) self.sdr.removeClient(self)
self.stopDsp() self.stopDsp()
@ -350,7 +351,7 @@ class OpenWebRxReceiverClient(OpenWebRxClient, SdrSourceEventClient):
if self.bookmarkSub is not None: if self.bookmarkSub is not None:
self.bookmarkSub.cancel() self.bookmarkSub.cancel()
self.bookmarkSub = None self.bookmarkSub = None
super().close() super().close(error)
def stopDsp(self): def stopDsp(self):
with self.dspLock: with self.dspLock:
@ -466,9 +467,9 @@ class MapConnection(OpenWebRxClient):
def handleTextMessage(self, conn, message): def handleTextMessage(self, conn, message):
pass pass
def close(self): def close(self, error: bool = False):
Map.getSharedInstance().removeClient(self) Map.getSharedInstance().removeClient(self)
super().close() super().close(error)
def write_config(self, cfg): def write_config(self, cfg):
self.send({"type": "config", "value": cfg}) self.send({"type": "config", "value": cfg})

View File

@ -62,6 +62,7 @@ class WebSocketConnection(object):
self.setMessageHandler(messageHandler) self.setMessageHandler(messageHandler)
(self.interruptPipeRecv, self.interruptPipeSend) = Pipe(duplex=False) (self.interruptPipeRecv, self.interruptPipeSend) = Pipe(duplex=False)
self.open = True self.open = True
self.socketError = False
self.sendLock = threading.Lock() self.sendLock = threading.Lock()
headers = {key.lower(): value for key, value in self.handler.headers.items()} headers = {key.lower(): value for key, value in self.handler.headers.items()}
@ -136,30 +137,30 @@ class WebSocketConnection(object):
for i in range(0, len(input), n): for i in range(0, len(input), n):
yield input[i: i + n] yield input[i: i + n]
try: with self.sendLock:
with self.sendLock: if self.socketError:
if not self.open: logger.warning("_sendBytes() after socket error, ignoring")
logger.warning("_sendBytes() after connection was closed, ignoring") else:
else: try:
for chunk in chunks(data_to_send, 1024): for chunk in chunks(data_to_send, 1024):
(_, write, _) = select.select([], [self.handler.wfile], [], 10) (_, write, _) = select.select([], [self.handler.wfile], [], 10)
if self.handler.wfile in write: if self.handler.wfile in write:
written = self.handler.wfile.write(chunk) written = self.handler.wfile.write(chunk)
if written != len(chunk): if written != len(chunk):
logger.error("incomplete write! closing socket!") logger.error("incomplete write! closing socket!")
self.close() self.close(socketError=True)
break break
else: else:
logger.debug("socket not returned from select; closing") logger.debug("socket not returned from select; closing")
self.close() self.close(socketError=True)
break break
# these exception happen when the socket is closed # these exception happen when the socket is closed
except OSError: except OSError:
logger.exception("OSError while writing data") logger.exception("OSError while writing data")
self.close() self.close(socketError=True)
except ValueError: except ValueError:
logger.exception("ValueError while writing data") logger.exception("ValueError while writing data")
self.close() self.close(socketError=True)
def interrupt(self): def interrupt(self):
if self.interruptPipeSend is None: if self.interruptPipeSend is None:
@ -177,10 +178,13 @@ class WebSocketConnection(object):
self.messageHandler.handleClose() self.messageHandler.handleClose()
self.cancelPing() self.cancelPing()
logger.debug("websocket loop ended; sending close frame") if self.socketError:
logger.debug("websocket closed in error, skipping close frame")
else:
logger.debug("websocket loop ended; sending close frame")
header = self.get_header(0, OPCODE_CLOSE) header = self.get_header(0, OPCODE_CLOSE)
self._sendBytes(header) self._sendBytes(header)
try: try:
WebSocketConnection.connections.remove(self) WebSocketConnection.connections.remove(self)
@ -242,9 +246,11 @@ class WebSocketConnection(object):
available = False available = False
except IncompleteRead: except IncompleteRead:
logger.warning("incomplete read on websocket; closing connection") logger.warning("incomplete read on websocket; closing connection")
self.socketError = True
self.open = False self.open = False
except OSError: except OSError:
logger.exception("OSError while reading data; closing connection") logger.exception("OSError while reading data; closing connection")
self.socketError = True
self.open = False self.open = False
self.interruptPipeSend.close() self.interruptPipeSend.close()
@ -259,7 +265,10 @@ class WebSocketConnection(object):
self.interruptPipeRecv.close() self.interruptPipeRecv.close()
self.interruptPipeRecv = None self.interruptPipeRecv = None
def close(self): def close(self, socketError: bool = False):
# only set flag if it is True
if socketError:
self.socketError = True
if not self.open: if not self.open:
return return
self.open = False self.open = False