diff --git a/owrx/audio/__init__.py b/owrx/audio/__init__.py index 3c6697a..bbdad6c 100644 --- a/owrx/audio/__init__.py +++ b/owrx/audio/__init__.py @@ -9,6 +9,7 @@ import os from multiprocessing.connection import Pipe, wait from datetime import datetime, timedelta from queue import Queue, Full, Empty +from itertools import groupby import logging @@ -17,13 +18,36 @@ logger.setLevel(logging.INFO) class QueueJob(object): - def __init__(self, decoder, file, freq): - self.decoder = decoder + def __init__(self, profile, writer, file, freq): + self.profile = profile + self.writer = writer self.file = file self.freq = freq def run(self): - self.decoder.decode(self) + logger.debug("processing file %s", self.file) + tmp_dir = CoreConfig().get_temporary_directory() + decoder = subprocess.Popen( + ["nice", "-n", "10"] + self.profile.decoder_commandline(self.file), + stdout=subprocess.PIPE, + cwd=tmp_dir, + close_fds=True, + ) + try: + for line in decoder.stdout: + self.writer.send((self.profile, self.freq, line)) + except (OSError, AttributeError): + decoder.stdout.flush() + # TODO uncouple parsing from the output so that decodes can still go to the map and the spotters + logger.debug("output has gone away while decoding job.") + try: + rc = decoder.wait(timeout=10) + if rc != 0: + raise RuntimeError("decoder return code: {0}".format(rc)) + except subprocess.TimeoutExpired: + logger.warning("subprocess (pid=%i}) did not terminate correctly; sending kill signal.", decoder.pid) + decoder.kill() + raise def unlink(self): try: @@ -167,35 +191,52 @@ class AudioChopperProfile(ABC): pass +class WaveFile(object): + def __init__(self, filename): + self.filename = filename + self.waveFile = wave.open(filename, "wb") + self.waveFile.setnchannels(1) + self.waveFile.setsampwidth(2) + self.waveFile.setframerate(12000) + + def close(self): + self.waveFile.close() + + def getFileName(self): + return self.filename + + def writeframes(self, data): + return self.waveFile.writeframes(data) + + def unlink(self): + os.unlink(self.filename) + self.waveFile = None + + class AudioWriter(object): - def __init__(self, active_dsp, profile: AudioChopperProfile): + def __init__(self, active_dsp, interval, profiles: list[AudioChopperProfile]): self.dsp = active_dsp - self.profile = profile - self.tmp_dir = CoreConfig().get_temporary_directory() + self.interval = interval + self.profiles = profiles self.wavefile = None - self.wavefilename = None self.switchingLock = threading.Lock() self.timer = None (self.outputReader, self.outputWriter) = Pipe() def getWaveFile(self): - filename = "{tmp_dir}/openwebrx-audiochopper-{id}-{timestamp}.wav".format( - tmp_dir=self.tmp_dir, + tmp_dir = CoreConfig().get_temporary_directory() + filename = "{tmp_dir}/openwebrx-audiochopper-master-{id}-{timestamp}.wav".format( + tmp_dir=tmp_dir, id=id(self), - timestamp=datetime.utcnow().strftime(self.profile.getFileTimestampFormat()), + timestamp=datetime.utcnow().strftime("%y%m%d_%H%M%S"), ) - wavefile = wave.open(filename, "wb") - wavefile.setnchannels(1) - wavefile.setsampwidth(2) - wavefile.setframerate(12000) - return filename, wavefile + return WaveFile(filename) def getNextDecodingTime(self): t = datetime.utcnow() zeroed = t.replace(minute=0, second=0, microsecond=0) delta = t - zeroed - interval = self.profile.getInterval() - seconds = (int(delta.total_seconds() / interval) + 1) * interval + seconds = (int(delta.total_seconds() / self.interval) + 1) * self.interval t = zeroed + timedelta(seconds=seconds) logger.debug("scheduling: {0}".format(t)) return t @@ -214,44 +255,34 @@ class AudioWriter(object): def switchFiles(self): with self.switchingLock: file = self.wavefile - filename = self.wavefilename - (self.wavefilename, self.wavefile) = self.getWaveFile() + self.wavefile = self.getWaveFile() file.close() - job = QueueJob(self, filename, self.dsp.get_operating_freq()) - try: - DecoderQueue.getSharedInstance().put(job) - except Full: - logger.warning("decoding queue overflow; dropping one file") - job.unlink() + for profile in self.profiles: + tmp_dir = CoreConfig().get_temporary_directory() + + # create hardlinks for the individual profiles + filename = "{tmp_dir}/openwebrx-audiochopper-{pid}-{timestamp}.wav".format( + tmp_dir=tmp_dir, + pid=id(profile), + timestamp=datetime.utcnow().strftime(profile.getFileTimestampFormat()), + ) + os.link(file.getFileName(), filename) + + job = QueueJob(profile, self.outputWriter, filename, self.dsp.get_operating_freq()) + try: + DecoderQueue.getSharedInstance().put(job) + except Full: + logger.warning("decoding queue overflow; dropping one file") + job.unlink() + + # our master can be deleted now, the profiles will delete their hardlinked copies after processing + file.unlink() + self._scheduleNextSwitch() - def decode(self, job: QueueJob): - logger.debug("processing file %s", job.file) - decoder = subprocess.Popen( - ["nice", "-n", "10"] + self.profile.decoder_commandline(job.file), - stdout=subprocess.PIPE, - cwd=self.tmp_dir, - close_fds=True, - ) - try: - for line in decoder.stdout: - self.outputWriter.send((self.profile, job.freq, line)) - except (OSError, AttributeError): - decoder.stdout.flush() - # TODO uncouple parsing from the output so that decodes can still go to the map and the spotters - logger.debug("output has gone away while decoding job.") - try: - rc = decoder.wait(timeout=10) - if rc != 0: - raise RuntimeError("decoder return code: {0}".format(rc)) - except subprocess.TimeoutExpired: - logger.warning("subprocess (pid=%i}) did not terminate correctly; sending kill signal.", decoder.pid) - decoder.kill() - raise - def start(self): - (self.wavefilename, self.wavefile) = self.getWaveFile() + self.wavefile = self.getWaveFile() self._scheduleNextSwitch() def write(self, data): @@ -279,17 +310,18 @@ class AudioWriter(object): logger.exception("error closing wave file") try: with self.switchingLock: - os.unlink(self.wavefilename) + self.wavefile.unlink() except Exception: logger.exception("error removing undecoded file") self.wavefile = None - self.wavefilename = None class AudioChopper(threading.Thread, metaclass=ABCMeta): def __init__(self, active_dsp, readfn: callable, *profiles: AudioChopperProfile): + sorted_profiles = sorted(profiles, key=lambda p: p.getInterval()) + groups = {interval: list(group) for interval, group in groupby(sorted_profiles, key=lambda p: p.getInterval())} self.readfn = readfn - self.writers = [AudioWriter(active_dsp, p) for p in profiles] + self.writers = [AudioWriter(active_dsp, interval, profiles) for interval, profiles in groups.items()] self.doRun = True super().__init__()