openwebrx-clone/csdr/chain/digiham.py

106 lines
3.5 KiB
Python
Raw Permalink Normal View History

from csdr.chain.demodulator import BaseDemodulatorChain, FixedAudioRateChain, FixedIfSampleRateChain, DialFrequencyReceiver, MetaProvider, SlotFilterChain
from pycsdr.modules import FmDemod, Agc, Writer, Buffer
from pycsdr.types import Format
from digiham.modules import DstarDecoder, DcBlock, FskDemodulator, GfskDemodulator, DigitalVoiceFilter, MbeSynthesizer, NarrowRrcFilter, NxdnDecoder, DmrDecoder, WideRrcFilter, YsfDecoder
from digiham.ambe import Modes
from owrx.meta import MetaParser
2021-07-29 22:06:21 +00:00
class DigihamChain(BaseDemodulatorChain, FixedIfSampleRateChain, FixedAudioRateChain, DialFrequencyReceiver, MetaProvider):
def __init__(self, fskDemodulator, decoder, mbeMode, filter=None, codecserver: str = ""):
self.decoder = decoder
if codecserver is None:
codecserver = ""
2021-08-06 18:02:59 +00:00
agc = Agc(Format.SHORT)
agc.setMaxGain(30)
agc.setInitialGain(3)
workers = [FmDemod(), DcBlock()]
if filter is not None:
workers += [filter]
workers += [
fskDemodulator,
decoder,
MbeSynthesizer(mbeMode, codecserver),
DigitalVoiceFilter(),
2021-08-06 18:02:59 +00:00
agc
2021-07-29 22:06:21 +00:00
]
self.metaParser = None
2021-09-09 20:24:41 +00:00
self.dialFrequency = None
2021-08-23 12:25:28 +00:00
super().__init__(workers)
def getFixedIfSampleRate(self):
return 48000
def getFixedAudioRate(self):
return 8000
2021-07-29 22:06:21 +00:00
def setMetaWriter(self, writer: Writer) -> None:
if self.metaParser is None:
self.metaParser = MetaParser()
buffer = Buffer(Format.CHAR)
self.decoder.setMetaWriter(buffer)
self.metaParser.setReader(buffer.getReader())
2021-09-09 20:24:41 +00:00
if self.dialFrequency is not None:
self.metaParser.setDialFrequency(self.dialFrequency)
self.metaParser.setWriter(writer)
2021-07-29 22:06:21 +00:00
2021-08-23 12:25:28 +00:00
def supportsSquelch(self):
return False
def setDialFrequency(self, frequency: int) -> None:
2021-09-09 20:24:41 +00:00
self.dialFrequency = frequency
if self.metaParser is None:
return
self.metaParser.setDialFrequency(frequency)
def stop(self):
if self.metaParser is not None:
self.metaParser.stop()
super().stop()
2021-07-31 22:49:20 +00:00
class Dstar(DigihamChain):
2021-07-29 22:06:21 +00:00
def __init__(self, codecserver: str = ""):
super().__init__(
fskDemodulator=FskDemodulator(samplesPerSymbol=10),
decoder=DstarDecoder(),
mbeMode=Modes.DStarMode,
codecserver=codecserver
)
2021-07-31 22:49:20 +00:00
class Nxdn(DigihamChain):
def __init__(self, codecserver: str = ""):
super().__init__(
fskDemodulator=GfskDemodulator(samplesPerSymbol=20),
decoder=NxdnDecoder(),
mbeMode=Modes.NxdnMode,
filter=NarrowRrcFilter(),
codecserver=codecserver
)
2021-08-06 18:02:59 +00:00
2021-08-06 22:09:40 +00:00
class Dmr(DigihamChain, SlotFilterChain):
2021-08-06 18:02:59 +00:00
def __init__(self, codecserver: str = ""):
super().__init__(
fskDemodulator=GfskDemodulator(samplesPerSymbol=10),
decoder=DmrDecoder(),
mbeMode=Modes.DmrMode,
filter=WideRrcFilter(),
codecserver=codecserver,
)
def setSlotFilter(self, slotFilter: int) -> None:
self.decoder.setSlotFilter(slotFilter)
2021-08-06 22:09:40 +00:00
class Ysf(DigihamChain):
def __init__(self, codecserver: str = ""):
super().__init__(
fskDemodulator=GfskDemodulator(samplesPerSymbol=10),
decoder=YsfDecoder(),
mbeMode=Modes.YsfMode,
filter=WideRrcFilter(),
codecserver=codecserver
)