openwebrx-clone/csdr/chain/digiham.py

112 lines
3.8 KiB
Python
Raw Permalink Normal View History

2021-12-13 12:26:47 +00:00
from csdr.chain.demodulator import BaseDemodulatorChain, FixedAudioRateChain, FixedIfSampleRateChain, DialFrequencyReceiver, MetaProvider, SlotFilterChain, DemodulatorError
from pycsdr.modules import FmDemod, Agc, Writer, Buffer
from pycsdr.types import Format
from digiham.modules import DstarDecoder, DcBlock, FskDemodulator, GfskDemodulator, DigitalVoiceFilter, MbeSynthesizer, NarrowRrcFilter, NxdnDecoder, DmrDecoder, WideRrcFilter, YsfDecoder
2021-12-13 12:26:47 +00:00
from digiham.ambe import Modes, ServerError
from owrx.meta import MetaParser
2021-07-29 22:06:21 +00:00
class DigihamChain(BaseDemodulatorChain, FixedIfSampleRateChain, FixedAudioRateChain, DialFrequencyReceiver, MetaProvider):
def __init__(self, fskDemodulator, decoder, mbeMode, filter=None, codecserver: str = ""):
self.decoder = decoder
if codecserver is None:
codecserver = ""
2021-08-06 18:02:59 +00:00
agc = Agc(Format.SHORT)
agc.setMaxGain(30)
agc.setInitialGain(3)
workers = [FmDemod(), DcBlock()]
if filter is not None:
workers += [filter]
2021-12-13 12:26:47 +00:00
try:
mbeSynthesizer = MbeSynthesizer(mbeMode, codecserver)
except ConnectionError as ce:
raise DemodulatorError("Connection to codecserver failed: {}".format(ce))
except ServerError as se:
raise DemodulatorError("Codecserver error: {}".format(se))
workers += [
fskDemodulator,
decoder,
2021-12-13 12:26:47 +00:00
mbeSynthesizer,
DigitalVoiceFilter(),
2021-08-06 18:02:59 +00:00
agc
2021-07-29 22:06:21 +00:00
]
self.metaParser = None
2021-09-09 20:24:41 +00:00
self.dialFrequency = None
2021-08-23 12:25:28 +00:00
super().__init__(workers)
def getFixedIfSampleRate(self):
return 48000
def getFixedAudioRate(self):
return 8000
2021-07-29 22:06:21 +00:00
def setMetaWriter(self, writer: Writer) -> None:
if self.metaParser is None:
self.metaParser = MetaParser()
buffer = Buffer(Format.CHAR)
self.decoder.setMetaWriter(buffer)
self.metaParser.setReader(buffer.getReader())
2021-09-09 20:24:41 +00:00
if self.dialFrequency is not None:
self.metaParser.setDialFrequency(self.dialFrequency)
self.metaParser.setWriter(writer)
2021-07-29 22:06:21 +00:00
2021-08-23 12:25:28 +00:00
def supportsSquelch(self):
return False
def setDialFrequency(self, frequency: int) -> None:
2021-09-09 20:24:41 +00:00
self.dialFrequency = frequency
if self.metaParser is None:
return
self.metaParser.setDialFrequency(frequency)
def stop(self):
if self.metaParser is not None:
self.metaParser.stop()
super().stop()
2021-07-31 22:49:20 +00:00
class Dstar(DigihamChain):
2021-07-29 22:06:21 +00:00
def __init__(self, codecserver: str = ""):
super().__init__(
fskDemodulator=FskDemodulator(samplesPerSymbol=10),
decoder=DstarDecoder(),
mbeMode=Modes.DStarMode,
codecserver=codecserver
)
2021-07-31 22:49:20 +00:00
class Nxdn(DigihamChain):
def __init__(self, codecserver: str = ""):
super().__init__(
fskDemodulator=GfskDemodulator(samplesPerSymbol=20),
decoder=NxdnDecoder(),
mbeMode=Modes.NxdnMode,
filter=NarrowRrcFilter(),
codecserver=codecserver
)
2021-08-06 18:02:59 +00:00
2021-08-06 22:09:40 +00:00
class Dmr(DigihamChain, SlotFilterChain):
2021-08-06 18:02:59 +00:00
def __init__(self, codecserver: str = ""):
super().__init__(
fskDemodulator=GfskDemodulator(samplesPerSymbol=10),
decoder=DmrDecoder(),
mbeMode=Modes.DmrMode,
filter=WideRrcFilter(),
codecserver=codecserver,
)
def setSlotFilter(self, slotFilter: int) -> None:
self.decoder.setSlotFilter(slotFilter)
2021-08-06 22:09:40 +00:00
class Ysf(DigihamChain):
def __init__(self, codecserver: str = ""):
super().__init__(
fskDemodulator=GfskDemodulator(samplesPerSymbol=10),
decoder=YsfDecoder(),
mbeMode=Modes.YsfMode,
filter=WideRrcFilter(),
codecserver=codecserver
)