openwebrx-clone/csdr/chain/digimodes.py

69 lines
2.4 KiB
Python
Raw Normal View History

2021-09-23 16:43:41 +00:00
from csdr.chain.demodulator import ServiceDemodulator, SecondaryDemodulator, DialFrequencyReceiver, SecondarySelectorChain
2021-09-27 22:27:01 +00:00
from owrx.audio.chopper import AudioChopper, AudioChopperParser
2021-09-06 13:05:33 +00:00
from owrx.aprs.kiss import KissDeframer
from owrx.aprs import Ax25Parser, AprsParser
2021-09-23 16:43:41 +00:00
from pycsdr.modules import Convert, FmDemod, Agc, TimingRecovery, DBPskDecoder, VaricodeDecoder
2021-08-31 14:54:37 +00:00
from pycsdr.types import Format
2021-09-06 13:05:33 +00:00
from owrx.aprs.module import DirewolfModule
2021-08-31 14:54:37 +00:00
2021-09-23 16:43:41 +00:00
class AudioChopperDemodulator(ServiceDemodulator, DialFrequencyReceiver):
2021-09-27 22:27:01 +00:00
def __init__(self, mode: str, parser: AudioChopperParser):
2021-08-31 20:46:11 +00:00
self.chopper = AudioChopper(mode, parser)
workers = [Convert(Format.FLOAT, Format.SHORT), self.chopper]
2021-08-31 14:54:37 +00:00
super().__init__(workers)
def getFixedAudioRate(self):
return 12000
2021-08-31 20:46:11 +00:00
def setDialFrequency(self, frequency: int) -> None:
self.chopper.setDialFrequency(frequency)
2021-09-06 13:05:33 +00:00
2021-09-23 16:43:41 +00:00
class PacketDemodulator(ServiceDemodulator, DialFrequencyReceiver):
2021-09-06 13:05:33 +00:00
def __init__(self, service: bool = False):
self.parser = AprsParser()
workers = [
FmDemod(),
Convert(Format.FLOAT, Format.SHORT),
DirewolfModule(service=service),
KissDeframer(),
Ax25Parser(),
self.parser,
]
super().__init__(workers)
def supportsSquelch(self) -> bool:
return False
def getFixedAudioRate(self) -> int:
return 48000
def setDialFrequency(self, frequency: int) -> None:
2021-09-30 22:52:32 +00:00
self.parser.setDialFrequency(frequency)
2021-09-23 16:43:41 +00:00
class PskDemodulator(SecondaryDemodulator, SecondarySelectorChain):
def __init__(self, baudRate: float):
self.baudRate = baudRate
# this is an assumption, we will adjust in setSampleRate
self.sampleRate = 12000
secondary_samples_per_bits = int(round(self.sampleRate / self.baudRate)) & ~3
workers = [
Agc(Format.COMPLEX_FLOAT),
TimingRecovery(secondary_samples_per_bits, 0.5, 2, useQ=True),
DBPskDecoder(),
VaricodeDecoder(),
]
super().__init__(workers)
def getBandwidth(self):
return self.baudRate
def setSampleRate(self, sampleRate: int) -> None:
if sampleRate == self.sampleRate:
return
self.sampleRate = sampleRate
secondary_samples_per_bits = int(round(self.sampleRate / self.baudRate)) & ~3
self.replace(1, TimingRecovery(secondary_samples_per_bits, 0.5, 2, useQ=True))