organize timers and threads to get proper shutdown

This commit is contained in:
Jakob Ketterl 2019-10-31 22:24:31 +01:00
parent af1a99c130
commit 95253e40bd
6 changed files with 49 additions and 29 deletions

View File

@ -9,6 +9,7 @@ from socketserver import ThreadingMixIn
from owrx.sdrhu import SdrHuUpdater
from owrx.service import Services
from owrx.websocket import WebSocketConnection
from owrx.pskreporter import PskReporter
import logging
@ -61,3 +62,4 @@ if __name__ == "__main__":
except KeyboardInterrupt:
WebSocketConnection.closeAll()
Services.stop()
PskReporter.stop()

View File

@ -62,7 +62,7 @@ class DmrMetaEnricher(object):
cache = DmrCache.getSharedInstance()
if not cache.isValid(id):
if not id in self.threads:
self.threads[id] = threading.Thread(target=self.downloadRadioIdData, args=[id])
self.threads[id] = threading.Thread(target=self.downloadRadioIdData, args=[id], daemon=True)
self.threads[id].start()
return None
data = cache.get(id)

View File

@ -3,7 +3,6 @@ import threading
import time
import random
import socket
from sched import scheduler
from owrx.config import PropertyManager
from owrx.version import openwebrx_version
from owrx.locator import Locator
@ -20,6 +19,9 @@ class PskReporterDummy(object):
def spot(self, spot):
pass
def cancelTimer(self):
pass
class PskReporter(object):
sharedInstance = None
@ -37,24 +39,31 @@ class PskReporter(object):
PskReporter.sharedInstance = PskReporterDummy()
return PskReporter.sharedInstance
@staticmethod
def stop():
if PskReporter.sharedInstance:
PskReporter.sharedInstance.cancelTimer()
def __init__(self):
self.spots = []
self.spotLock = threading.Lock()
self.uploader = Uploader()
self.scheduler = scheduler(time.time, time.sleep)
self.scheduleNextUpload()
threading.Thread(target=self.scheduler.run).start()
self.timer = None
def scheduleNextUpload(self):
if self.timer:
return
delay = PskReporter.interval + random.uniform(0, 30)
logger.debug("scheduling next pskreporter upload in %f seconds", delay)
self.scheduler.enter(delay, 1, self.upload)
self.timer = threading.Timer(delay, self.upload)
self.timer.start()
def spot(self, spot):
if not spot["mode"] in PskReporter.supportedModes:
return
with self.spotLock:
self.spots.append(spot)
self.scheduleNextUpload()
def upload(self):
try:
@ -67,8 +76,13 @@ class PskReporter(object):
except Exception:
logger.exception("Failed to upload spots")
self.timer = None
self.scheduleNextUpload()
def cancelTimer(self):
if self.timer:
self.timer.cancel()
class Uploader(object):
receieverDelimiter = [0x99, 0x92]

View File

@ -115,6 +115,10 @@ class ServiceScheduler(object):
self.selectionTimer = None
self.scheduleSelection()
def shutdown(self):
self.cancelTimer()
self.source.removeClient(self)
def scheduleSelection(self, time=None):
seconds = 10
if time is not None:
@ -177,8 +181,9 @@ class ServiceHandler(object):
props.collect("center_freq", "samp_rate").wire(self.onFrequencyChange)
if self.source.isAvailable():
self.scheduleServiceStartup()
self.scheduler = None
if "schedule" in props:
ServiceScheduler(self.source, props["schedule"])
self.scheduler = ServiceScheduler(self.source, props["schedule"])
def isActive(self):
return False
@ -214,6 +219,12 @@ class ServiceHandler(object):
return mode in available
def shutdown(self):
self.stopServices()
self.source.removeClient(self)
if self.scheduler:
self.scheduler.shutdown()
def stopServices(self):
with self.lock:
services = self.services
@ -383,7 +394,7 @@ class Services(object):
@staticmethod
def stop():
for handler in Services.handlers:
handler.stopServices()
handler.shutdown()
Services.handlers = []

View File

@ -136,7 +136,7 @@ class SdrSource(object):
if profile_id is None:
profile_id = list(profiles.keys())[0]
if profile_id == self.profile_id:
return;
return
logger.debug("activating profile {0}".format(profile_id))
self.profile_id = profile_id
profile = profiles[profile_id]

View File

@ -1,8 +1,6 @@
import threading
import wave
from datetime import datetime, timedelta, date, timezone
import time
import sched
from datetime import datetime, timedelta, timezone
import subprocess
import os
from multiprocessing.connection import Pipe
@ -93,8 +91,7 @@ class WsjtChopper(threading.Thread):
self.tmp_dir = PropertyManager.getSharedInstance()["temporary_directory"]
(self.wavefilename, self.wavefile) = self.getWaveFile()
self.switchingLock = threading.Lock()
self.scheduler = sched.scheduler(time.time, time.sleep)
self.schedulerLock = threading.Lock()
self.timer = None
(self.outputReader, self.outputWriter) = Pipe()
self.doRun = True
super().__init__()
@ -110,27 +107,23 @@ class WsjtChopper(threading.Thread):
return filename, wavefile
def getNextDecodingTime(self):
t = datetime.now()
t = datetime.utcnow()
zeroed = t.replace(minute=0, second=0, microsecond=0)
delta = t - zeroed
seconds = (int(delta.total_seconds() / self.interval) + 1) * self.interval
t = zeroed + timedelta(seconds=seconds)
logger.debug("scheduling: {0}".format(t))
return t.timestamp()
return t
def startScheduler(self):
self._scheduleNextSwitch()
threading.Thread(target=self.scheduler.run).start()
def emptyScheduler(self):
with self.schedulerLock:
for event in self.scheduler.queue:
self.scheduler.cancel(event)
def cancelTimer(self):
if self.timer:
self.timer.cancel()
def _scheduleNextSwitch(self):
with self.schedulerLock:
if self.doRun:
self.scheduler.enterabs(self.getNextDecodingTime(), 1, self.switchFiles)
if self.doRun:
delta = self.getNextDecodingTime() - datetime.utcnow()
self.timer = threading.Timer(delta.total_seconds(), self.switchFiles)
self.timer.start()
def switchFiles(self):
self.switchingLock.acquire()
@ -169,7 +162,7 @@ class WsjtChopper(threading.Thread):
def run(self) -> None:
logger.debug("WSJT chopper starting up")
self.startScheduler()
self._scheduleNextSwitch()
while self.doRun:
data = self.source.read(256)
if data is None or (isinstance(data, bytes) and len(data) == 0):
@ -182,7 +175,7 @@ class WsjtChopper(threading.Thread):
logger.debug("WSJT chopper shutting down")
self.outputReader.close()
self.outputWriter.close()
self.emptyScheduler()
self.cancelTimer()
try:
os.unlink(self.wavefilename)
except Exception: