shut down multiprocessing queue explicitly using a poison pill
This commit is contained in:
		@@ -22,6 +22,8 @@ import logging
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
logger = logging.getLogger(__name__)
 | 
					logger = logging.getLogger(__name__)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					PoisonPill = object()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class Client(ABC):
 | 
					class Client(ABC):
 | 
				
			||||||
    def __init__(self, conn):
 | 
					    def __init__(self, conn):
 | 
				
			||||||
@@ -33,13 +35,15 @@ class Client(ABC):
 | 
				
			|||||||
            while run:
 | 
					            while run:
 | 
				
			||||||
                try:
 | 
					                try:
 | 
				
			||||||
                    data = self.multithreadingQueue.get()
 | 
					                    data = self.multithreadingQueue.get()
 | 
				
			||||||
                    self.send(data)
 | 
					                    if data is PoisonPill:
 | 
				
			||||||
 | 
					                        run = False
 | 
				
			||||||
 | 
					                    else:
 | 
				
			||||||
 | 
					                        self.send(data)
 | 
				
			||||||
 | 
					                    self.multithreadingQueue.task_done()
 | 
				
			||||||
                except (EOFError, OSError, ValueError):
 | 
					                except (EOFError, OSError, ValueError):
 | 
				
			||||||
                    run = False
 | 
					                    run = False
 | 
				
			||||||
                except Exception:
 | 
					                except Exception:
 | 
				
			||||||
                    logger.exception("Exception on client multithreading queue")
 | 
					                    logger.exception("Exception on client multithreading queue")
 | 
				
			||||||
                finally:
 | 
					 | 
				
			||||||
                    self.multithreadingQueue.task_done()
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # unset the queue object to free shared memory file descriptors
 | 
					            # unset the queue object to free shared memory file descriptors
 | 
				
			||||||
            self.multithreadingQueue = None
 | 
					            self.multithreadingQueue = None
 | 
				
			||||||
@@ -53,6 +57,8 @@ class Client(ABC):
 | 
				
			|||||||
            self.close()
 | 
					            self.close()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def close(self):
 | 
					    def close(self):
 | 
				
			||||||
 | 
					        if self.multithreadingQueue is not None:
 | 
				
			||||||
 | 
					            self.multithreadingQueue.put(PoisonPill)
 | 
				
			||||||
        self.conn.close()
 | 
					        self.conn.close()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def mp_send(self, data):
 | 
					    def mp_send(self, data):
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user