openwebrx-clone/csdr/chain/digiham.py

134 lines
4.4 KiB
Python
Raw Permalink Normal View History

from csdr.chain.demodulator import BaseDemodulatorChain, FixedAudioRateChain, FixedIfSampleRateChain, DialFrequencyReceiver, MetaProvider, SlotFilterChain, DemodulatorError, ServiceDemodulator
from pycsdr.modules import FmDemod, Agc, Writer, Buffer
from pycsdr.types import Format
from digiham.modules import DstarDecoder, DcBlock, FskDemodulator, GfskDemodulator, DigitalVoiceFilter, MbeSynthesizer, NarrowRrcFilter, NxdnDecoder, DmrDecoder, WideRrcFilter, YsfDecoder, PocsagDecoder
2021-12-13 12:26:47 +00:00
from digiham.ambe import Modes, ServerError
from owrx.meta import MetaParser
from owrx.pocsag import PocsagParser
2021-07-29 22:06:21 +00:00
class DigihamChain(BaseDemodulatorChain, FixedIfSampleRateChain, FixedAudioRateChain, DialFrequencyReceiver, MetaProvider):
def __init__(self, fskDemodulator, decoder, mbeMode, filter=None, codecserver: str = ""):
self.decoder = decoder
if codecserver is None:
codecserver = ""
2021-08-06 18:02:59 +00:00
agc = Agc(Format.SHORT)
agc.setMaxGain(30)
agc.setInitialGain(3)
workers = [FmDemod(), DcBlock()]
if filter is not None:
workers += [filter]
2021-12-13 12:26:47 +00:00
try:
mbeSynthesizer = MbeSynthesizer(mbeMode, codecserver)
except ConnectionError as ce:
raise DemodulatorError("Connection to codecserver failed: {}".format(ce))
except ServerError as se:
raise DemodulatorError("Codecserver error: {}".format(se))
workers += [
fskDemodulator,
decoder,
2021-12-13 12:26:47 +00:00
mbeSynthesizer,
DigitalVoiceFilter(),
2021-08-06 18:02:59 +00:00
agc
2021-07-29 22:06:21 +00:00
]
self.metaParser = None
2021-09-09 20:24:41 +00:00
self.dialFrequency = None
2021-08-23 12:25:28 +00:00
super().__init__(workers)
def getFixedIfSampleRate(self):
return 48000
def getFixedAudioRate(self):
return 8000
2021-07-29 22:06:21 +00:00
def setMetaWriter(self, writer: Writer) -> None:
if self.metaParser is None:
self.metaParser = MetaParser()
buffer = Buffer(Format.CHAR)
self.decoder.setMetaWriter(buffer)
self.metaParser.setReader(buffer.getReader())
2021-09-09 20:24:41 +00:00
if self.dialFrequency is not None:
self.metaParser.setDialFrequency(self.dialFrequency)
self.metaParser.setWriter(writer)
2021-07-29 22:06:21 +00:00
2021-08-23 12:25:28 +00:00
def supportsSquelch(self):
return False
def setDialFrequency(self, frequency: int) -> None:
2021-09-09 20:24:41 +00:00
self.dialFrequency = frequency
if self.metaParser is None:
return
self.metaParser.setDialFrequency(frequency)
def stop(self):
if self.metaParser is not None:
self.metaParser.stop()
super().stop()
2021-07-31 22:49:20 +00:00
class Dstar(DigihamChain):
2021-07-29 22:06:21 +00:00
def __init__(self, codecserver: str = ""):
super().__init__(
fskDemodulator=FskDemodulator(samplesPerSymbol=10),
decoder=DstarDecoder(),
mbeMode=Modes.DStarMode,
codecserver=codecserver
)
2021-07-31 22:49:20 +00:00
class Nxdn(DigihamChain):
def __init__(self, codecserver: str = ""):
super().__init__(
fskDemodulator=GfskDemodulator(samplesPerSymbol=20),
decoder=NxdnDecoder(),
mbeMode=Modes.NxdnMode,
filter=NarrowRrcFilter(),
codecserver=codecserver
)
2021-08-06 18:02:59 +00:00
2021-08-06 22:09:40 +00:00
class Dmr(DigihamChain, SlotFilterChain):
2021-08-06 18:02:59 +00:00
def __init__(self, codecserver: str = ""):
super().__init__(
fskDemodulator=GfskDemodulator(samplesPerSymbol=10),
decoder=DmrDecoder(),
mbeMode=Modes.DmrMode,
filter=WideRrcFilter(),
codecserver=codecserver,
)
def setSlotFilter(self, slotFilter: int) -> None:
self.decoder.setSlotFilter(slotFilter)
2021-08-06 22:09:40 +00:00
class Ysf(DigihamChain):
def __init__(self, codecserver: str = ""):
super().__init__(
fskDemodulator=GfskDemodulator(samplesPerSymbol=10),
decoder=YsfDecoder(),
mbeMode=Modes.YsfMode,
filter=WideRrcFilter(),
codecserver=codecserver
)
class PocsagDemodulator(ServiceDemodulator, DialFrequencyReceiver):
def __init__(self):
self.parser = PocsagParser()
workers = [
FmDemod(),
FskDemodulator(samplesPerSymbol=40, invert=True),
PocsagDecoder(),
self.parser,
]
super().__init__(workers)
def supportsSquelch(self) -> bool:
return False
def getFixedAudioRate(self) -> int:
return 48000
def setDialFrequency(self, frequency: int) -> None:
self.parser.setDialFrequency(frequency)