openwebrx-clone/csdr/module/__init__.py

137 lines
3.4 KiB
Python

from pycsdr.modules import Module as BaseModule
from pycsdr.modules import Reader, Writer
from pycsdr.types import Format
from abc import ABCMeta, abstractmethod
from threading import Thread
from io import BytesIO
from subprocess import Popen, PIPE
from functools import partial
import pickle
class Module(BaseModule, metaclass=ABCMeta):
def __init__(self):
self.reader = None
self.writer = None
super().__init__()
def setReader(self, reader: Reader) -> None:
self.reader = reader
def setWriter(self, writer: Writer) -> None:
self.writer = writer
@abstractmethod
def getInputFormat(self) -> Format:
pass
@abstractmethod
def getOutputFormat(self) -> Format:
pass
def pump(self, read, write):
def copy():
while True:
data = None
try:
data = read()
except ValueError:
pass
except BrokenPipeError:
break
if data is None or isinstance(data, bytes) and len(data) == 0:
break
write(data)
return copy
class AutoStartModule(Module, metaclass=ABCMeta):
def _checkStart(self) -> None:
if self.reader is not None and self.writer is not None:
self.start()
def setReader(self, reader: Reader) -> None:
super().setReader(reader)
self._checkStart()
def setWriter(self, writer: Writer) -> None:
super().setWriter(writer)
self._checkStart()
@abstractmethod
def start(self):
pass
class ThreadModule(AutoStartModule, Thread, metaclass=ABCMeta):
def __init__(self):
self.doRun = True
super().__init__()
Thread.__init__(self)
@abstractmethod
def run(self):
pass
def stop(self):
self.doRun = False
self.reader.stop()
def start(self):
Thread.start(self)
class PickleModule(ThreadModule):
def getInputFormat(self) -> Format:
return Format.CHAR
def getOutputFormat(self) -> Format:
return Format.CHAR
def run(self):
while self.doRun:
data = self.reader.read()
if data is None:
self.doRun = False
break
io = BytesIO(data.tobytes())
try:
while True:
output = self.process(pickle.load(io))
if output is not None:
self.writer.write(pickle.dumps(output))
except EOFError:
pass
@abstractmethod
def process(self, input):
pass
class PopenModule(AutoStartModule, metaclass=ABCMeta):
def __init__(self):
self.process = None
super().__init__()
@abstractmethod
def getCommand(self):
pass
def _getProcess(self):
return Popen(self.getCommand(), stdin=PIPE, stdout=PIPE)
def start(self):
self.process = self._getProcess()
# resume in case the reader has been stop()ed before
self.reader.resume()
Thread(target=self.pump(self.reader.read, self.process.stdin.write)).start()
Thread(target=self.pump(partial(self.process.stdout.read1, 1024), self.writer.write)).start()
def stop(self):
if self.process is not None:
self.process.terminate()
self.process.wait()
self.process = None
self.reader.stop()