From 8df4f9ce5252ac20a7777ec0b9efd1bfebe9de87 Mon Sep 17 00:00:00 2001 From: Jakob Ketterl Date: Mon, 16 Sep 2019 00:31:35 +0200 Subject: [PATCH] add the ability to schedule profiles to be used when sources are idle --- owrx/service.py | 177 +++++++++++++++++++++++++++++++++++++++++------- owrx/source.py | 15 +++- 2 files changed, 165 insertions(+), 27 deletions(-) diff --git a/owrx/service.py b/owrx/service.py index 89e68aa..752d1b3 100644 --- a/owrx/service.py +++ b/owrx/service.py @@ -1,5 +1,6 @@ import threading import socket +from datetime import datetime, timezone, timedelta from owrx.source import SdrService from owrx.bands import Bandplan from csdr import dsp, output @@ -44,32 +45,154 @@ class AprsServiceOutput(ServiceOutput): return t == "packet_demod" +class ScheduleEntry(object): + def __init__(self, startTime, endTime, profile): + self.startTime = startTime + self.endTime = endTime + self.profile = profile + + def isCurrent(self, time): + if self.startTime < self.endTime: + return self.startTime <= time < self.endTime + else: + return self.startTime <= time or time < self.endTime + + def getProfile(self): + return self.profile + + def getScheduledEnd(self): + now = datetime.utcnow() + end = now.combine(date=now.date(), time=self.endTime) + while end < now: + end += timedelta(days=1) + return end + + def getNextActivation(self): + now = datetime.utcnow() + start = now.combine(date=now.date(), time=self.startTime) + while start < now: + start += timedelta(days=1) + return start + + +class Schedule(object): + @staticmethod + def parse(scheduleDict): + entries = [] + for time, profile in scheduleDict.items(): + if len(time) != 9: + logger.warning("invalid schedule spec: %s", time) + continue + + startTime = datetime.strptime(time[0:4], "%H%M").replace(tzinfo=timezone.utc).time() + endTime = datetime.strptime(time[5:9], "%H%M").replace(tzinfo=timezone.utc).time() + entries.append(ScheduleEntry(startTime, endTime, profile)) + return Schedule(entries) + + def __init__(self, entries): + self.entries = entries + + def getCurrentEntry(self): + current = [p for p in self.entries if p.isCurrent(datetime.utcnow().time())] + if current: + return current[0] + return None + + def getNextEntry(self): + s = sorted(self.entries, key=lambda e: e.getNextActivation()) + if s: + return s[0] + return None + + +class ServiceScheduler(object): + def __init__(self, source, schedule): + self.source = source + self.schedule = Schedule.parse(schedule) + self.active = False + self.source.addClient(self) + self.selectionTimer = None + self.scheduleSelection() + + def scheduleSelection(self, time=None): + seconds = 10 + if time is not None: + delta = time - datetime.utcnow() + seconds = delta.total_seconds() + if self.selectionTimer: + self.selectionTimer.cancel() + self.selectionTimer = threading.Timer(seconds, self.selectProfile) + self.selectionTimer.start() + + def isActive(self): + return self.active + + def onSdrAvailable(self): + pass + + def onSdrUnavailable(self): + self.scheduleSelection() + + def selectProfile(self): + self.active = False + if self.source.hasActiveClients(): + logger.debug("source has active clients; not touching") + return + logger.debug("source seems to be idle, selecting profile for background services") + entry = self.schedule.getCurrentEntry() + + if entry is None: + logger.debug("schedule did not return a profile. checking next entry...") + nextEntry = self.schedule.getNextEntry() + if nextEntry is not None: + self.scheduleSelection(nextEntry.getNextActivation()) + return + + logger.debug("scheduling end for current profile: %s", entry.getScheduledEnd()) + self.scheduleSelection(entry.getScheduledEnd()) + + try: + self.active = True + self.source.activateProfile(entry.getProfile()) + self.source.start() + except KeyError: + pass + + class ServiceHandler(object): def __init__(self, source): + self.lock = threading.Lock() self.services = [] self.source = source self.startupTimer = None self.source.addClient(self) - self.source.getProps().collect("center_freq", "samp_rate").wire(self.onFrequencyChange) - self.scheduleServiceStartup() + props = self.source.getProps() + props.collect("center_freq", "samp_rate").wire(self.onFrequencyChange) + if self.source.isAvailable(): + self.scheduleServiceStartup() + if "schedule" in props: + ServiceScheduler(self.source, props["schedule"]) + + def isActive(self): + return False def onSdrAvailable(self): self.scheduleServiceStartup() def onSdrUnavailable(self): + logger.debug("sdr source becoming unavailable; stopping services.") self.stopServices() def isSupported(self, mode): return mode in PropertyManager.getSharedInstance()["services_decoders"] def stopServices(self): - for service in self.services: - service.stop() - self.services = [] + with self.lock: + services = self.services + self.services = [] - def startServices(self): - for service in self.services: - service.start() + for service in services: + service.stop() def onFrequencyChange(self, key, value): self.stopServices() @@ -94,6 +217,9 @@ class ServiceHandler(object): def updateServices(self): logger.debug("re-scheduling services due to sdr changes") self.stopServices() + if not self.source.isAvailable(): + logger.debug("sdr source is unavailable") + return cf = self.source.getProps()["center_freq"] sr = self.source.getProps()["samp_rate"] srh = sr / 2 @@ -109,25 +235,26 @@ class ServiceHandler(object): logger.debug("no services available") return - self.services = [] + with self.lock: + self.services = [] - for group in self.optimizeResampling(dials, sr): - frequencies = sorted([f["frequency"] for f in group]) - min = frequencies[0] - max = frequencies[-1] - cf = (min + max) / 2 - bw = max - min - logger.debug("group center frequency: {0}, bandwidth: {1}".format(cf, bw)) - resampler_props = PropertyManager() - resampler_props["center_freq"] = cf - # TODO the + 24000 is a temporary fix since the resampling optimizer does not account for required bandwidths - resampler_props["samp_rate"] = bw + 24000 - resampler = Resampler(resampler_props, self.getAvailablePort(), self.source) - resampler.start() - self.services.append(resampler) + for group in self.optimizeResampling(dials, sr): + frequencies = sorted([f["frequency"] for f in group]) + min = frequencies[0] + max = frequencies[-1] + cf = (min + max) / 2 + bw = max - min + logger.debug("group center frequency: {0}, bandwidth: {1}".format(cf, bw)) + resampler_props = PropertyManager() + resampler_props["center_freq"] = cf + # TODO the + 24000 is a temporary fix since the resampling optimizer does not account for required bandwidths + resampler_props["samp_rate"] = bw + 24000 + resampler = Resampler(resampler_props, self.getAvailablePort(), self.source) + resampler.start() + self.services.append(resampler) - for dial in group: - self.services.append(self.setupService(dial["mode"], dial["frequency"], resampler)) + for dial in group: + self.services.append(self.setupService(dial["mode"], dial["frequency"], resampler)) def optimizeResampling(self, freqs, bandwidth): freqs = sorted(freqs, key=lambda f: f["frequency"]) diff --git a/owrx/source.py b/owrx/source.py index 0ec65fa..ba31bdc 100644 --- a/owrx/source.py +++ b/owrx/source.py @@ -243,16 +243,21 @@ class SdrSource(object): def sleepOnRestart(self): pass + def hasActiveClients(self): + activeClients = [c for c in self.clients if c.isActive()] + return len(activeClients) > 0 + def addClient(self, c): self.clients.append(c) - self.start() + if self.hasActiveClients(): + self.start() def removeClient(self, c): try: self.clients.remove(c) except ValueError: pass - if not self.clients: + if not self.hasActiveClients(): self.stop() def addSpectrumClient(self, c): @@ -478,6 +483,9 @@ class SpectrumThread(csdr.output): c.cancel() self.subscriptions = [] + def isActive(self): + return True + def onSdrAvailable(self): self.dsp.start() @@ -606,6 +614,9 @@ class DspManager(csdr.output): def setProperty(self, prop, value): self.localProps.getProperty(prop).setValue(value) + def isActive(self): + return True + def onSdrAvailable(self): logger.debug("received onSdrAvailable, attempting DspSource restart") self.dsp.start()