From 95253e40bd1fe8a94084d18d0db0f2aa370360dd Mon Sep 17 00:00:00 2001 From: Jakob Ketterl Date: Thu, 31 Oct 2019 22:24:31 +0100 Subject: [PATCH] organize timers and threads to get proper shutdown --- openwebrx.py | 2 ++ owrx/meta.py | 2 +- owrx/pskreporter.py | 24 +++++++++++++++++++----- owrx/service.py | 15 +++++++++++++-- owrx/source.py | 2 +- owrx/wsjt.py | 33 +++++++++++++-------------------- 6 files changed, 49 insertions(+), 29 deletions(-) diff --git a/openwebrx.py b/openwebrx.py index 4d755be..a4d33dd 100755 --- a/openwebrx.py +++ b/openwebrx.py @@ -9,6 +9,7 @@ from socketserver import ThreadingMixIn from owrx.sdrhu import SdrHuUpdater from owrx.service import Services from owrx.websocket import WebSocketConnection +from owrx.pskreporter import PskReporter import logging @@ -61,3 +62,4 @@ if __name__ == "__main__": except KeyboardInterrupt: WebSocketConnection.closeAll() Services.stop() + PskReporter.stop() diff --git a/owrx/meta.py b/owrx/meta.py index f4979cf..f8f85a0 100644 --- a/owrx/meta.py +++ b/owrx/meta.py @@ -62,7 +62,7 @@ class DmrMetaEnricher(object): cache = DmrCache.getSharedInstance() if not cache.isValid(id): if not id in self.threads: - self.threads[id] = threading.Thread(target=self.downloadRadioIdData, args=[id]) + self.threads[id] = threading.Thread(target=self.downloadRadioIdData, args=[id], daemon=True) self.threads[id].start() return None data = cache.get(id) diff --git a/owrx/pskreporter.py b/owrx/pskreporter.py index cb9797b..40caa58 100644 --- a/owrx/pskreporter.py +++ b/owrx/pskreporter.py @@ -3,7 +3,6 @@ import threading import time import random import socket -from sched import scheduler from owrx.config import PropertyManager from owrx.version import openwebrx_version from owrx.locator import Locator @@ -20,6 +19,9 @@ class PskReporterDummy(object): def spot(self, spot): pass + def cancelTimer(self): + pass + class PskReporter(object): sharedInstance = None @@ -37,24 +39,31 @@ class PskReporter(object): PskReporter.sharedInstance = PskReporterDummy() return PskReporter.sharedInstance + @staticmethod + def stop(): + if PskReporter.sharedInstance: + PskReporter.sharedInstance.cancelTimer() + def __init__(self): self.spots = [] self.spotLock = threading.Lock() self.uploader = Uploader() - self.scheduler = scheduler(time.time, time.sleep) - self.scheduleNextUpload() - threading.Thread(target=self.scheduler.run).start() + self.timer = None def scheduleNextUpload(self): + if self.timer: + return delay = PskReporter.interval + random.uniform(0, 30) logger.debug("scheduling next pskreporter upload in %f seconds", delay) - self.scheduler.enter(delay, 1, self.upload) + self.timer = threading.Timer(delay, self.upload) + self.timer.start() def spot(self, spot): if not spot["mode"] in PskReporter.supportedModes: return with self.spotLock: self.spots.append(spot) + self.scheduleNextUpload() def upload(self): try: @@ -67,8 +76,13 @@ class PskReporter(object): except Exception: logger.exception("Failed to upload spots") + self.timer = None self.scheduleNextUpload() + def cancelTimer(self): + if self.timer: + self.timer.cancel() + class Uploader(object): receieverDelimiter = [0x99, 0x92] diff --git a/owrx/service.py b/owrx/service.py index 16a6359..1bc5a8a 100644 --- a/owrx/service.py +++ b/owrx/service.py @@ -115,6 +115,10 @@ class ServiceScheduler(object): self.selectionTimer = None self.scheduleSelection() + def shutdown(self): + self.cancelTimer() + self.source.removeClient(self) + def scheduleSelection(self, time=None): seconds = 10 if time is not None: @@ -177,8 +181,9 @@ class ServiceHandler(object): props.collect("center_freq", "samp_rate").wire(self.onFrequencyChange) if self.source.isAvailable(): self.scheduleServiceStartup() + self.scheduler = None if "schedule" in props: - ServiceScheduler(self.source, props["schedule"]) + self.scheduler = ServiceScheduler(self.source, props["schedule"]) def isActive(self): return False @@ -214,6 +219,12 @@ class ServiceHandler(object): return mode in available + def shutdown(self): + self.stopServices() + self.source.removeClient(self) + if self.scheduler: + self.scheduler.shutdown() + def stopServices(self): with self.lock: services = self.services @@ -383,7 +394,7 @@ class Services(object): @staticmethod def stop(): for handler in Services.handlers: - handler.stopServices() + handler.shutdown() Services.handlers = [] diff --git a/owrx/source.py b/owrx/source.py index 2d576ce..bb7c6bc 100644 --- a/owrx/source.py +++ b/owrx/source.py @@ -136,7 +136,7 @@ class SdrSource(object): if profile_id is None: profile_id = list(profiles.keys())[0] if profile_id == self.profile_id: - return; + return logger.debug("activating profile {0}".format(profile_id)) self.profile_id = profile_id profile = profiles[profile_id] diff --git a/owrx/wsjt.py b/owrx/wsjt.py index 6539f84..fc8dd7d 100644 --- a/owrx/wsjt.py +++ b/owrx/wsjt.py @@ -1,8 +1,6 @@ import threading import wave -from datetime import datetime, timedelta, date, timezone -import time -import sched +from datetime import datetime, timedelta, timezone import subprocess import os from multiprocessing.connection import Pipe @@ -93,8 +91,7 @@ class WsjtChopper(threading.Thread): self.tmp_dir = PropertyManager.getSharedInstance()["temporary_directory"] (self.wavefilename, self.wavefile) = self.getWaveFile() self.switchingLock = threading.Lock() - self.scheduler = sched.scheduler(time.time, time.sleep) - self.schedulerLock = threading.Lock() + self.timer = None (self.outputReader, self.outputWriter) = Pipe() self.doRun = True super().__init__() @@ -110,27 +107,23 @@ class WsjtChopper(threading.Thread): return filename, wavefile def getNextDecodingTime(self): - t = datetime.now() + t = datetime.utcnow() zeroed = t.replace(minute=0, second=0, microsecond=0) delta = t - zeroed seconds = (int(delta.total_seconds() / self.interval) + 1) * self.interval t = zeroed + timedelta(seconds=seconds) logger.debug("scheduling: {0}".format(t)) - return t.timestamp() + return t - def startScheduler(self): - self._scheduleNextSwitch() - threading.Thread(target=self.scheduler.run).start() - - def emptyScheduler(self): - with self.schedulerLock: - for event in self.scheduler.queue: - self.scheduler.cancel(event) + def cancelTimer(self): + if self.timer: + self.timer.cancel() def _scheduleNextSwitch(self): - with self.schedulerLock: - if self.doRun: - self.scheduler.enterabs(self.getNextDecodingTime(), 1, self.switchFiles) + if self.doRun: + delta = self.getNextDecodingTime() - datetime.utcnow() + self.timer = threading.Timer(delta.total_seconds(), self.switchFiles) + self.timer.start() def switchFiles(self): self.switchingLock.acquire() @@ -169,7 +162,7 @@ class WsjtChopper(threading.Thread): def run(self) -> None: logger.debug("WSJT chopper starting up") - self.startScheduler() + self._scheduleNextSwitch() while self.doRun: data = self.source.read(256) if data is None or (isinstance(data, bytes) and len(data) == 0): @@ -182,7 +175,7 @@ class WsjtChopper(threading.Thread): logger.debug("WSJT chopper shutting down") self.outputReader.close() self.outputWriter.close() - self.emptyScheduler() + self.cancelTimer() try: os.unlink(self.wavefilename) except Exception: