try to avoid stressing out the cpu by using a proper queue

This commit is contained in:
Jakob Ketterl 2019-08-22 21:16:43 +02:00
parent faaef9d9f8
commit 24d134ad6c

View File

@ -8,6 +8,7 @@ import os
from multiprocessing.connection import Pipe
from owrx.map import Map, LocatorLocation
import re
from queue import Queue
from owrx.config import PropertyManager
from owrx.bands import Bandplan
from owrx.metrics import Metrics
@ -17,6 +18,42 @@ import logging
logger = logging.getLogger(__name__)
class WsjtQueueWorker(threading.Thread):
def __init__(self, queue):
self.queue = queue
self.doRun = True
super().__init__(daemon=True)
def run(self) -> None:
while self.doRun:
(processor, file) = self.queue.get()
logger.debug("processing file %s", file)
processor.decode(file)
self.queue.task_done()
class WsjtQueue(Queue):
sharedInstance = None
@staticmethod
def getSharedInstance():
if WsjtQueue.sharedInstance is None:
WsjtQueue.sharedInstance = WsjtQueue(maxsize=10, workers=2)
return WsjtQueue.sharedInstance
def __init__(self, maxsize, workers):
super().__init__(maxsize)
self.workers = [self.newWorker() for _ in range(0, workers)]
def put(self, item):
super(WsjtQueue, self).put(item, block=False)
def newWorker(self):
worker = WsjtQueueWorker(self)
worker.start()
return worker
class WsjtChopper(threading.Thread):
def __init__(self, source):
self.source = source
@ -24,7 +61,6 @@ class WsjtChopper(threading.Thread):
(self.wavefilename, self.wavefile) = self.getWaveFile()
self.switchingLock = threading.Lock()
self.scheduler = sched.scheduler(time.time, time.sleep)
self.fileQueue = []
(self.outputReader, self.outputWriter) = Pipe()
self.doRun = True
super().__init__()
@ -67,7 +103,7 @@ class WsjtChopper(threading.Thread):
self.switchingLock.release()
file.close()
self.fileQueue.append(filename)
WsjtQueue.getSharedInstance().put((self, filename))
self._scheduleNextSwitch()
def decoder_commandline(self, file):
@ -76,23 +112,17 @@ class WsjtChopper(threading.Thread):
"""
return []
def decode(self):
def decode_and_unlink(file):
decoder = subprocess.Popen(self.decoder_commandline(file), stdout=subprocess.PIPE, cwd=self.tmp_dir, preexec_fn=lambda : os.nice(10))
while True:
line = decoder.stdout.readline()
if line is None or (isinstance(line, bytes) and len(line) == 0):
break
self.outputWriter.send(line)
rc = decoder.wait()
if rc != 0:
logger.warning("decoder return code: %i", rc)
os.unlink(file)
if self.fileQueue:
file = self.fileQueue.pop()
logger.debug("processing file {0}".format(file))
threading.Thread(target=decode_and_unlink, args=[file]).start()
def decode(self, file):
decoder = subprocess.Popen(self.decoder_commandline(file), stdout=subprocess.PIPE, cwd=self.tmp_dir, preexec_fn=lambda : os.nice(10))
while True:
line = decoder.stdout.readline()
if line is None or (isinstance(line, bytes) and len(line) == 0):
break
self.outputWriter.send(line)
rc = decoder.wait()
if rc != 0:
logger.warning("decoder return code: %i", rc)
os.unlink(file)
def run(self) -> None:
logger.debug("WSJT chopper starting up")
@ -106,7 +136,6 @@ class WsjtChopper(threading.Thread):
self.wavefile.writeframes(data)
self.switchingLock.release()
self.decode()
logger.debug("WSJT chopper shutting down")
self.outputReader.close()
self.outputWriter.close()