add the ability to schedule profiles to be used when sources are idle
This commit is contained in:
		
							
								
								
									
										177
									
								
								owrx/service.py
									
									
									
									
									
								
							
							
						
						
									
										177
									
								
								owrx/service.py
									
									
									
									
									
								
							| @@ -1,5 +1,6 @@ | ||||
| import threading | ||||
| import socket | ||||
| from datetime import datetime, timezone, timedelta | ||||
| from owrx.source import SdrService | ||||
| from owrx.bands import Bandplan | ||||
| from csdr import dsp, output | ||||
| @@ -44,32 +45,154 @@ class AprsServiceOutput(ServiceOutput): | ||||
|         return t == "packet_demod" | ||||
|  | ||||
|  | ||||
| class ScheduleEntry(object): | ||||
|     def __init__(self, startTime, endTime, profile): | ||||
|         self.startTime = startTime | ||||
|         self.endTime = endTime | ||||
|         self.profile = profile | ||||
|  | ||||
|     def isCurrent(self, time): | ||||
|         if self.startTime < self.endTime: | ||||
|             return self.startTime <= time < self.endTime | ||||
|         else: | ||||
|             return self.startTime <= time or time < self.endTime | ||||
|  | ||||
|     def getProfile(self): | ||||
|         return self.profile | ||||
|  | ||||
|     def getScheduledEnd(self): | ||||
|         now = datetime.utcnow() | ||||
|         end = now.combine(date=now.date(), time=self.endTime) | ||||
|         while end < now: | ||||
|             end += timedelta(days=1) | ||||
|         return end | ||||
|  | ||||
|     def getNextActivation(self): | ||||
|         now = datetime.utcnow() | ||||
|         start = now.combine(date=now.date(), time=self.startTime) | ||||
|         while start < now: | ||||
|             start += timedelta(days=1) | ||||
|         return start | ||||
|  | ||||
|  | ||||
| class Schedule(object): | ||||
|     @staticmethod | ||||
|     def parse(scheduleDict): | ||||
|         entries = [] | ||||
|         for time, profile in scheduleDict.items(): | ||||
|             if len(time) != 9: | ||||
|                 logger.warning("invalid schedule spec: %s", time) | ||||
|                 continue | ||||
|  | ||||
|             startTime = datetime.strptime(time[0:4], "%H%M").replace(tzinfo=timezone.utc).time() | ||||
|             endTime = datetime.strptime(time[5:9], "%H%M").replace(tzinfo=timezone.utc).time() | ||||
|             entries.append(ScheduleEntry(startTime, endTime, profile)) | ||||
|         return Schedule(entries) | ||||
|  | ||||
|     def __init__(self, entries): | ||||
|         self.entries = entries | ||||
|  | ||||
|     def getCurrentEntry(self): | ||||
|         current = [p for p in self.entries if p.isCurrent(datetime.utcnow().time())] | ||||
|         if current: | ||||
|             return current[0] | ||||
|         return None | ||||
|  | ||||
|     def getNextEntry(self): | ||||
|         s = sorted(self.entries, key=lambda e: e.getNextActivation()) | ||||
|         if s: | ||||
|             return s[0] | ||||
|         return None | ||||
|  | ||||
|  | ||||
| class ServiceScheduler(object): | ||||
|     def __init__(self, source, schedule): | ||||
|         self.source = source | ||||
|         self.schedule = Schedule.parse(schedule) | ||||
|         self.active = False | ||||
|         self.source.addClient(self) | ||||
|         self.selectionTimer = None | ||||
|         self.scheduleSelection() | ||||
|  | ||||
|     def scheduleSelection(self, time=None): | ||||
|         seconds = 10 | ||||
|         if time is not None: | ||||
|             delta = time - datetime.utcnow() | ||||
|             seconds = delta.total_seconds() | ||||
|         if self.selectionTimer: | ||||
|             self.selectionTimer.cancel() | ||||
|         self.selectionTimer = threading.Timer(seconds, self.selectProfile) | ||||
|         self.selectionTimer.start() | ||||
|  | ||||
|     def isActive(self): | ||||
|         return self.active | ||||
|  | ||||
|     def onSdrAvailable(self): | ||||
|         pass | ||||
|  | ||||
|     def onSdrUnavailable(self): | ||||
|         self.scheduleSelection() | ||||
|  | ||||
|     def selectProfile(self): | ||||
|         self.active = False | ||||
|         if self.source.hasActiveClients(): | ||||
|             logger.debug("source has active clients; not touching") | ||||
|             return | ||||
|         logger.debug("source seems to be idle, selecting profile for background services") | ||||
|         entry = self.schedule.getCurrentEntry() | ||||
|  | ||||
|         if entry is None: | ||||
|             logger.debug("schedule did not return a profile. checking next entry...") | ||||
|             nextEntry = self.schedule.getNextEntry() | ||||
|             if nextEntry is not None: | ||||
|                 self.scheduleSelection(nextEntry.getNextActivation()) | ||||
|             return | ||||
|  | ||||
|         logger.debug("scheduling end for current profile: %s", entry.getScheduledEnd()) | ||||
|         self.scheduleSelection(entry.getScheduledEnd()) | ||||
|  | ||||
|         try: | ||||
|             self.active = True | ||||
|             self.source.activateProfile(entry.getProfile()) | ||||
|             self.source.start() | ||||
|         except KeyError: | ||||
|             pass | ||||
|  | ||||
|  | ||||
| class ServiceHandler(object): | ||||
|     def __init__(self, source): | ||||
|         self.lock = threading.Lock() | ||||
|         self.services = [] | ||||
|         self.source = source | ||||
|         self.startupTimer = None | ||||
|         self.source.addClient(self) | ||||
|         self.source.getProps().collect("center_freq", "samp_rate").wire(self.onFrequencyChange) | ||||
|         self.scheduleServiceStartup() | ||||
|         props = self.source.getProps() | ||||
|         props.collect("center_freq", "samp_rate").wire(self.onFrequencyChange) | ||||
|         if self.source.isAvailable(): | ||||
|             self.scheduleServiceStartup() | ||||
|         if "schedule" in props: | ||||
|             ServiceScheduler(self.source, props["schedule"]) | ||||
|  | ||||
|     def isActive(self): | ||||
|         return False | ||||
|  | ||||
|     def onSdrAvailable(self): | ||||
|         self.scheduleServiceStartup() | ||||
|  | ||||
|     def onSdrUnavailable(self): | ||||
|         logger.debug("sdr source becoming unavailable; stopping services.") | ||||
|         self.stopServices() | ||||
|  | ||||
|     def isSupported(self, mode): | ||||
|         return mode in PropertyManager.getSharedInstance()["services_decoders"] | ||||
|  | ||||
|     def stopServices(self): | ||||
|         for service in self.services: | ||||
|             service.stop() | ||||
|         self.services = [] | ||||
|         with self.lock: | ||||
|             services = self.services | ||||
|             self.services = [] | ||||
|  | ||||
|     def startServices(self): | ||||
|         for service in self.services: | ||||
|             service.start() | ||||
|         for service in services: | ||||
|             service.stop() | ||||
|  | ||||
|     def onFrequencyChange(self, key, value): | ||||
|         self.stopServices() | ||||
| @@ -94,6 +217,9 @@ class ServiceHandler(object): | ||||
|     def updateServices(self): | ||||
|         logger.debug("re-scheduling services due to sdr changes") | ||||
|         self.stopServices() | ||||
|         if not self.source.isAvailable(): | ||||
|             logger.debug("sdr source is unavailable") | ||||
|             return | ||||
|         cf = self.source.getProps()["center_freq"] | ||||
|         sr = self.source.getProps()["samp_rate"] | ||||
|         srh = sr / 2 | ||||
| @@ -109,25 +235,26 @@ class ServiceHandler(object): | ||||
|             logger.debug("no services available") | ||||
|             return | ||||
|  | ||||
|         self.services = [] | ||||
|         with self.lock: | ||||
|             self.services = [] | ||||
|  | ||||
|         for group in self.optimizeResampling(dials, sr): | ||||
|             frequencies = sorted([f["frequency"] for f in group]) | ||||
|             min = frequencies[0] | ||||
|             max = frequencies[-1] | ||||
|             cf = (min + max) / 2 | ||||
|             bw = max - min | ||||
|             logger.debug("group center frequency: {0}, bandwidth: {1}".format(cf, bw)) | ||||
|             resampler_props = PropertyManager() | ||||
|             resampler_props["center_freq"] = cf | ||||
|             # TODO the + 24000 is a temporary fix since the resampling optimizer does not account for required bandwidths | ||||
|             resampler_props["samp_rate"] = bw + 24000 | ||||
|             resampler = Resampler(resampler_props, self.getAvailablePort(), self.source) | ||||
|             resampler.start() | ||||
|             self.services.append(resampler) | ||||
|             for group in self.optimizeResampling(dials, sr): | ||||
|                 frequencies = sorted([f["frequency"] for f in group]) | ||||
|                 min = frequencies[0] | ||||
|                 max = frequencies[-1] | ||||
|                 cf = (min + max) / 2 | ||||
|                 bw = max - min | ||||
|                 logger.debug("group center frequency: {0}, bandwidth: {1}".format(cf, bw)) | ||||
|                 resampler_props = PropertyManager() | ||||
|                 resampler_props["center_freq"] = cf | ||||
|                 # TODO the + 24000 is a temporary fix since the resampling optimizer does not account for required bandwidths | ||||
|                 resampler_props["samp_rate"] = bw + 24000 | ||||
|                 resampler = Resampler(resampler_props, self.getAvailablePort(), self.source) | ||||
|                 resampler.start() | ||||
|                 self.services.append(resampler) | ||||
|  | ||||
|             for dial in group: | ||||
|                 self.services.append(self.setupService(dial["mode"], dial["frequency"], resampler)) | ||||
|                 for dial in group: | ||||
|                     self.services.append(self.setupService(dial["mode"], dial["frequency"], resampler)) | ||||
|  | ||||
|     def optimizeResampling(self, freqs, bandwidth): | ||||
|         freqs = sorted(freqs, key=lambda f: f["frequency"]) | ||||
|   | ||||
| @@ -243,16 +243,21 @@ class SdrSource(object): | ||||
|     def sleepOnRestart(self): | ||||
|         pass | ||||
|  | ||||
|     def hasActiveClients(self): | ||||
|         activeClients = [c for c in self.clients if c.isActive()] | ||||
|         return len(activeClients) > 0 | ||||
|  | ||||
|     def addClient(self, c): | ||||
|         self.clients.append(c) | ||||
|         self.start() | ||||
|         if self.hasActiveClients(): | ||||
|             self.start() | ||||
|  | ||||
|     def removeClient(self, c): | ||||
|         try: | ||||
|             self.clients.remove(c) | ||||
|         except ValueError: | ||||
|             pass | ||||
|         if not self.clients: | ||||
|         if not self.hasActiveClients(): | ||||
|             self.stop() | ||||
|  | ||||
|     def addSpectrumClient(self, c): | ||||
| @@ -478,6 +483,9 @@ class SpectrumThread(csdr.output): | ||||
|             c.cancel() | ||||
|         self.subscriptions = [] | ||||
|  | ||||
|     def isActive(self): | ||||
|         return True | ||||
|  | ||||
|     def onSdrAvailable(self): | ||||
|         self.dsp.start() | ||||
|  | ||||
| @@ -606,6 +614,9 @@ class DspManager(csdr.output): | ||||
|     def setProperty(self, prop, value): | ||||
|         self.localProps.getProperty(prop).setValue(value) | ||||
|  | ||||
|     def isActive(self): | ||||
|         return True | ||||
|  | ||||
|     def onSdrAvailable(self): | ||||
|         logger.debug("received onSdrAvailable, attempting DspSource restart") | ||||
|         self.dsp.start() | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Jakob Ketterl
					Jakob Ketterl