diff --git a/csdr/csdr.py b/csdr/csdr.py index a720a2b..ccd9776 100644 --- a/csdr/csdr.py +++ b/csdr/csdr.py @@ -26,8 +26,6 @@ import os import signal import threading import math -import time -import select from functools import partial from owrx.kiss import KissClient, DirewolfConfig @@ -35,6 +33,8 @@ from owrx.wsjt import Ft8Profile, WsprProfile, Jt9Profile, Jt65Profile, Ft4Profi from owrx.js8 import Js8Profiles from owrx.audio import AudioChopper +from csdr.pipe import Pipe + import logging logger = logging.getLogger(__name__) @@ -72,120 +72,6 @@ class output(object): return True -class Pipe(object): - READ = "r" - WRITE = "w" - NONE = None - - @staticmethod - def create(path, t, encoding=None): - if t == Pipe.READ: - return ReadingPipe(path, encoding=encoding) - elif t == Pipe.WRITE: - return WritingPipe(path, encoding=encoding) - elif t == Pipe.NONE: - return Pipe(path, None, encoding=encoding) - - def __init__(self, path, direction, encoding=None): - self.doOpen = True - self.path = "{base}_{myid}".format(base=path, myid=id(self)) - self.direction = direction - self.encoding = encoding - self.file = None - os.mkfifo(self.path) - - def open(self): - retries = 0 - - def opener(path, flags): - fd = os.open(path, flags | os.O_NONBLOCK) - os.set_blocking(fd, True) - return fd - - while self.file is None and self.doOpen and retries < 10: - try: - self.file = open(self.path, self.direction, encoding=self.encoding, opener=opener) - except OSError as error: - # ENXIO = FIFO has not been opened for reading - if error.errno == 6: - time.sleep(.1) - retries += 1 - else: - raise - - # if doOpen is false, opening has been canceled, so no warning in that case. - if self.file is None and self.doOpen: - logger.warning("could not open FIFO %s", self.path) - - def close(self): - self.doOpen = False - try: - if self.file is not None: - self.file.close() - os.unlink(self.path) - except FileNotFoundError: - # it seems like we keep calling this twice. no idea why, but we don't need the resulting error. - pass - except Exception: - logger.exception("Pipe.close()") - - def __str__(self): - return self.path - - -class WritingPipe(Pipe): - def __init__(self, path, encoding=None): - self.queue = [] - self.queueLock = threading.Lock() - super().__init__(path, "w", encoding=encoding) - self.open() - - def open_and_dequeue(self): - super().open() - - if self.file is None: - return - - with self.queueLock: - for i in self.queue: - self.file.write(i) - self.file.flush() - self.queue = None - - def open(self): - threading.Thread(target=self.open_and_dequeue).start() - - def write(self, data): - if self.file is None: - with self.queueLock: - self.queue.append(data) - return - r = self.file.write(data) - self.file.flush() - return r - - -class ReadingPipe(Pipe): - def __init__(self, path, encoding=None): - super().__init__(path, "r", encoding=encoding) - - def open(self): - if not self.doOpen: - return - super().open() - select.select([self.file], [], [], 10) - - def read(self): - if self.file is None: - self.open() - return self.file.read() - - def readline(self): - if self.file is None: - self.open() - return self.file.readline() - - class dsp(object): def __init__(self, output): self.samp_rate = 250000 diff --git a/csdr/pipe.py b/csdr/pipe.py new file mode 100644 index 0000000..3cc354a --- /dev/null +++ b/csdr/pipe.py @@ -0,0 +1,122 @@ +import os +import select +import time +import threading + +import logging + +logger = logging.getLogger(__name__) + + +class Pipe(object): + READ = "r" + WRITE = "w" + NONE = None + + @staticmethod + def create(path, t, encoding=None): + if t == Pipe.READ: + return ReadingPipe(path, encoding=encoding) + elif t == Pipe.WRITE: + return WritingPipe(path, encoding=encoding) + elif t == Pipe.NONE: + return Pipe(path, None, encoding=encoding) + + def __init__(self, path, direction, encoding=None): + self.doOpen = True + self.path = "{base}_{myid}".format(base=path, myid=id(self)) + self.direction = direction + self.encoding = encoding + self.file = None + os.mkfifo(self.path) + + def open(self): + retries = 0 + + def opener(path, flags): + fd = os.open(path, flags | os.O_NONBLOCK) + os.set_blocking(fd, True) + return fd + + while self.file is None and self.doOpen and retries < 10: + try: + self.file = open(self.path, self.direction, encoding=self.encoding, opener=opener) + except OSError as error: + # ENXIO = FIFO has not been opened for reading + if error.errno == 6: + time.sleep(.1) + retries += 1 + else: + raise + + # if doOpen is false, opening has been canceled, so no warning in that case. + if self.file is None and self.doOpen: + logger.warning("could not open FIFO %s", self.path) + + def close(self): + self.doOpen = False + try: + if self.file is not None: + self.file.close() + os.unlink(self.path) + except FileNotFoundError: + # it seems like we keep calling this twice. no idea why, but we don't need the resulting error. + pass + except Exception: + logger.exception("Pipe.close()") + + def __str__(self): + return self.path + + +class WritingPipe(Pipe): + def __init__(self, path, encoding=None): + self.queue = [] + self.queueLock = threading.Lock() + super().__init__(path, "w", encoding=encoding) + self.open() + + def open_and_dequeue(self): + super().open() + + if self.file is None: + return + + with self.queueLock: + for i in self.queue: + self.file.write(i) + self.file.flush() + self.queue = None + + def open(self): + threading.Thread(target=self.open_and_dequeue).start() + + def write(self, data): + if self.file is None: + with self.queueLock: + self.queue.append(data) + return + r = self.file.write(data) + self.file.flush() + return r + + +class ReadingPipe(Pipe): + def __init__(self, path, encoding=None): + super().__init__(path, "r", encoding=encoding) + + def open(self): + if not self.doOpen: + return + super().open() + select.select([self.file], [], [], 10) + + def read(self): + if self.file is None: + self.open() + return self.file.read() + + def readline(self): + if self.file is None: + self.open() + return self.file.readline()