diff --git a/owrx/service.py b/owrx/service/__init__.py similarity index 67% rename from owrx/service.py rename to owrx/service/__init__.py index 5936f8f..3e1a337 100644 --- a/owrx/service.py +++ b/owrx/service/__init__.py @@ -1,5 +1,4 @@ import threading -from datetime import datetime, timezone, timedelta from owrx.source import SdrSource from owrx.sdr import SdrService from owrx.bands import Bandplan @@ -9,16 +8,19 @@ from owrx.aprs import AprsParser from owrx.config import PropertyManager from owrx.source.resampler import Resampler from owrx.feature import FeatureDetector +from abc import ABCMeta, abstractmethod +from .schedule import ServiceScheduler import logging logger = logging.getLogger(__name__) -class ServiceOutput(output): +class ServiceOutput(output, metaclass=ABCMeta): def __init__(self, frequency): self.frequency = frequency + @abstractmethod def getParser(self): # abstract method; implement in subclasses pass @@ -46,134 +48,6 @@ class AprsServiceOutput(ServiceOutput): return t == "packet_demod" -class ScheduleEntry(object): - def __init__(self, startTime, endTime, profile): - self.startTime = startTime - self.endTime = endTime - self.profile = profile - - def isCurrent(self, time): - if self.startTime < self.endTime: - return self.startTime <= time < self.endTime - else: - return self.startTime <= time or time < self.endTime - - def getProfile(self): - return self.profile - - def getScheduledEnd(self): - now = datetime.utcnow() - end = now.combine(date=now.date(), time=self.endTime) - while end < now: - end += timedelta(days=1) - return end - - def getNextActivation(self): - now = datetime.utcnow() - start = now.combine(date=now.date(), time=self.startTime) - while start < now: - start += timedelta(days=1) - return start - - -class Schedule(object): - @staticmethod - def parse(scheduleDict): - entries = [] - for time, profile in scheduleDict.items(): - if len(time) != 9: - logger.warning("invalid schedule spec: %s", time) - continue - - startTime = datetime.strptime(time[0:4], "%H%M").replace(tzinfo=timezone.utc).time() - endTime = datetime.strptime(time[5:9], "%H%M").replace(tzinfo=timezone.utc).time() - entries.append(ScheduleEntry(startTime, endTime, profile)) - return Schedule(entries) - - def __init__(self, entries): - self.entries = entries - - def getCurrentEntry(self): - current = [p for p in self.entries if p.isCurrent(datetime.utcnow().time())] - if current: - return current[0] - return None - - def getNextEntry(self): - s = sorted(self.entries, key=lambda e: e.getNextActivation()) - if s: - return s[0] - return None - - -class ServiceScheduler(object): - def __init__(self, source, schedule): - self.source = source - self.selectionTimer = None - self.schedule = Schedule.parse(schedule) - self.source.addClient(self) - self.source.getProps().collect("center_freq", "samp_rate").wire(self.onFrequencyChange) - self.scheduleSelection() - - def shutdown(self): - self.cancelTimer() - self.source.removeClient(self) - - def scheduleSelection(self, time=None): - if self.source.getState() == SdrSource.STATE_FAILED: - return - seconds = 10 - if time is not None: - delta = time - datetime.utcnow() - seconds = delta.total_seconds() - self.cancelTimer() - self.selectionTimer = threading.Timer(seconds, self.selectProfile) - self.selectionTimer.start() - - def cancelTimer(self): - if self.selectionTimer: - self.selectionTimer.cancel() - - def getClientClass(self): - return SdrSource.CLIENT_BACKGROUND - - def onStateChange(self, state): - if state == SdrSource.STATE_STOPPING: - self.scheduleSelection() - elif state == SdrSource.STATE_FAILED: - self.cancelTimer() - - def onBusyStateChange(self, state): - if state == SdrSource.BUSYSTATE_IDLE: - self.scheduleSelection() - - def onFrequencyChange(self, name, value): - self.scheduleSelection() - - def selectProfile(self): - if self.source.hasClients(SdrSource.CLIENT_USER): - logger.debug("source has active users; not touching") - return - logger.debug("source seems to be idle, selecting profile for background services") - entry = self.schedule.getCurrentEntry() - - if entry is None: - logger.debug("schedule did not return a profile. checking next entry...") - nextEntry = self.schedule.getNextEntry() - if nextEntry is not None: - self.scheduleSelection(nextEntry.getNextActivation()) - return - - logger.debug("scheduling end for current profile: %s", entry.getScheduledEnd()) - self.scheduleSelection(entry.getScheduledEnd()) - - try: - self.source.activateProfile(entry.getProfile()) - self.source.start() - except KeyError: - pass - - class ServiceHandler(object): def __init__(self, source): self.lock = threading.Lock() @@ -186,8 +60,8 @@ class ServiceHandler(object): if self.source.isAvailable(): self.scheduleServiceStartup() self.scheduler = None - if "schedule" in props: - self.scheduler = ServiceScheduler(self.source, props["schedule"]) + if "schedule" in props or "scheduler" in props: + self.scheduler = ServiceScheduler(self.source) def getClientClass(self): return SdrSource.CLIENT_INACTIVE @@ -390,7 +264,7 @@ class Services(object): return for source in SdrService.getSources().values(): props = source.getProps() - if "services" not in props or props["services"] != False: + if "services" not in props or props["services"] is not False: Services.handlers.append(ServiceHandler(source)) @staticmethod @@ -398,11 +272,3 @@ class Services(object): for handler in Services.handlers: handler.shutdown() Services.handlers = [] - - -class Service(object): - pass - - -class WsjtService(Service): - pass diff --git a/owrx/service/schedule.py b/owrx/service/schedule.py new file mode 100644 index 0000000..282133c --- /dev/null +++ b/owrx/service/schedule.py @@ -0,0 +1,174 @@ +from datetime import datetime, timezone, timedelta +from owrx.source import SdrSource +import threading +from abc import ABC, ABCMeta, abstractmethod + +import logging + +logger = logging.getLogger(__name__) + + +class ScheduleEntry(object): + def __init__(self, startTime, endTime, profile): + self.startTime = startTime + self.endTime = endTime + self.profile = profile + + def isCurrent(self, time): + if self.startTime < self.endTime: + return self.startTime <= time < self.endTime + else: + return self.startTime <= time or time < self.endTime + + def getProfile(self): + return self.profile + + def getScheduledEnd(self): + now = datetime.utcnow() + end = now.combine(date=now.date(), time=self.endTime) + while end < now: + end += timedelta(days=1) + return end + + def getNextActivation(self): + now = datetime.utcnow() + start = now.combine(date=now.date(), time=self.startTime) + while start < now: + start += timedelta(days=1) + return start + + +class Schedule(ABC): + @staticmethod + def parse(props): + # downwards compatibility + if "schedule" in props: + return StaticSchedule(props["schedule"]) + elif "scheduler" in props: + sc = props["scheduler"] + t = sc["type"] if "type" in sc else "static" + if t == "static": + return StaticSchedule(sc["schedule"]) + elif t == "sunlight": + return SunlightSchedule(sc["schedule"]) + else: + logger.warning("Invalid scheduler type: %s", t) + + @abstractmethod + def getCurrentEntry(self): + pass + + @abstractmethod + def getNextEntry(self): + pass + + +class TimerangeSchedule(Schedule, metaclass=ABCMeta): + @abstractmethod + def getEntries(self): + pass + + def getCurrentEntry(self): + current = [p for p in self.getEntries() if p.isCurrent(datetime.utcnow().time())] + if current: + return current[0] + return None + + def getNextEntry(self): + s = sorted(self.getEntries(), key=lambda e: e.getNextActivation()) + if s: + return s[0] + return None + + +class StaticSchedule(TimerangeSchedule): + def __init__(self, scheduleDict): + self.entries = [] + for time, profile in scheduleDict.items(): + if len(time) != 9: + logger.warning("invalid schedule spec: %s", time) + continue + + startTime = datetime.strptime(time[0:4], "%H%M").replace(tzinfo=timezone.utc).time() + endTime = datetime.strptime(time[5:9], "%H%M").replace(tzinfo=timezone.utc).time() + self.entries.append(ScheduleEntry(startTime, endTime, profile)) + + def getEntries(self): + return self.entries + + +class SunlightSchedule(TimerangeSchedule): + def __init__(self, scheduleDict): + self.schedule = scheduleDict + + def getEntries(self): + return [] + + +class ServiceScheduler(object): + def __init__(self, source): + self.source = source + self.selectionTimer = None + self.source.addClient(self) + props = self.source.getProps() + self.schedule = Schedule.parse(props) + props.collect("center_freq", "samp_rate").wire(self.onFrequencyChange) + self.scheduleSelection() + + def shutdown(self): + self.cancelTimer() + self.source.removeClient(self) + + def scheduleSelection(self, time=None): + if self.source.getState() == SdrSource.STATE_FAILED: + return + seconds = 10 + if time is not None: + delta = time - datetime.utcnow() + seconds = delta.total_seconds() + self.cancelTimer() + self.selectionTimer = threading.Timer(seconds, self.selectProfile) + self.selectionTimer.start() + + def cancelTimer(self): + if self.selectionTimer: + self.selectionTimer.cancel() + + def getClientClass(self): + return SdrSource.CLIENT_BACKGROUND + + def onStateChange(self, state): + if state == SdrSource.STATE_STOPPING: + self.scheduleSelection() + elif state == SdrSource.STATE_FAILED: + self.cancelTimer() + + def onBusyStateChange(self, state): + if state == SdrSource.BUSYSTATE_IDLE: + self.scheduleSelection() + + def onFrequencyChange(self, name, value): + self.scheduleSelection() + + def selectProfile(self): + if self.source.hasClients(SdrSource.CLIENT_USER): + logger.debug("source has active users; not touching") + return + logger.debug("source seems to be idle, selecting profile for background services") + entry = self.schedule.getCurrentEntry() + + if entry is None: + logger.debug("schedule did not return a profile. checking next entry...") + nextEntry = self.schedule.getNextEntry() + if nextEntry is not None: + self.scheduleSelection(nextEntry.getNextActivation()) + return + + logger.debug("scheduling end for current profile: %s", entry.getScheduledEnd()) + self.scheduleSelection(entry.getScheduledEnd()) + + try: + self.source.activateProfile(entry.getProfile()) + self.source.start() + except KeyError: + pass diff --git a/setup.py b/setup.py index 03c0a30..dd4fe70 100644 --- a/setup.py +++ b/setup.py @@ -11,7 +11,7 @@ except ImportError: setup( name="OpenWebRX", version=str(strictversion), - packages=find_namespace_packages(include=["owrx", "owrx.source", "csdr", "htdocs"]), + packages=find_namespace_packages(include=["owrx", "owrx.source", "owrx.service", "csdr", "htdocs"]), package_data={"htdocs": [f[len("htdocs/") :] for f in glob("htdocs/**/*", recursive=True)]}, entry_points={"console_scripts": ["openwebrx=owrx.__main__:main"]}, # use the github page for now