ft8 message parsing
This commit is contained in:
		
							
								
								
									
										1
									
								
								csdr.py
									
									
									
									
									
								
							
							
						
						
									
										1
									
								
								csdr.py
									
									
									
									
									
								
							@@ -263,6 +263,7 @@ class dsp(object):
 | 
			
		||||
        if self.get_secondary_demodulator() == "ft8":
 | 
			
		||||
            chopper = Ft8Chopper(self.secondary_process_demod.stdout)
 | 
			
		||||
            chopper.start()
 | 
			
		||||
            self.output.add_output("wsjt_demod", chopper.read)
 | 
			
		||||
        else:
 | 
			
		||||
            self.output.add_output("secondary_demod", partial(self.secondary_process_demod.stdout.read, 1))
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -159,6 +159,9 @@ class OpenWebRxReceiverClient(Client):
 | 
			
		||||
    def write_metadata(self, metadata):
 | 
			
		||||
        self.protected_send({"type":"metadata","value":metadata})
 | 
			
		||||
 | 
			
		||||
    def write_wsjt_message(self, message):
 | 
			
		||||
        self.protected_send({"type": "wsjt_message", "value": message})
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class MapConnection(Client):
 | 
			
		||||
    def __init__(self, conn):
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										64
									
								
								owrx/wsjt.py
									
									
									
									
									
								
							
							
						
						
									
										64
									
								
								owrx/wsjt.py
									
									
									
									
									
								
							@@ -1,9 +1,11 @@
 | 
			
		||||
import threading
 | 
			
		||||
import wave
 | 
			
		||||
from datetime import datetime, timedelta
 | 
			
		||||
from datetime import datetime, timedelta, date
 | 
			
		||||
import time
 | 
			
		||||
import sched
 | 
			
		||||
import subprocess
 | 
			
		||||
import os
 | 
			
		||||
from multiprocessing.connection import Pipe
 | 
			
		||||
 | 
			
		||||
import logging
 | 
			
		||||
logger = logging.getLogger(__name__)
 | 
			
		||||
@@ -14,7 +16,8 @@ class Ft8Chopper(threading.Thread):
 | 
			
		||||
        self.source = source
 | 
			
		||||
        (self.wavefilename, self.wavefile) = self.getWaveFile()
 | 
			
		||||
        self.scheduler = sched.scheduler(time.time, time.sleep)
 | 
			
		||||
        self.queue = []
 | 
			
		||||
        self.fileQueue = []
 | 
			
		||||
        (self.outputReader, self.outputWriter) = Pipe()
 | 
			
		||||
        self.doRun = True
 | 
			
		||||
        super().__init__()
 | 
			
		||||
 | 
			
		||||
@@ -53,15 +56,28 @@ class Ft8Chopper(threading.Thread):
 | 
			
		||||
        (self.wavefilename, self.wavefile) = self.getWaveFile()
 | 
			
		||||
 | 
			
		||||
        file.close()
 | 
			
		||||
        self.queue.append(filename)
 | 
			
		||||
        self.fileQueue.append(filename)
 | 
			
		||||
        self._scheduleNextSwitch()
 | 
			
		||||
 | 
			
		||||
    def decode(self):
 | 
			
		||||
        if self.queue:
 | 
			
		||||
            file = self.queue.pop()
 | 
			
		||||
            logger.debug("processing file {0}".format(file))
 | 
			
		||||
        def decode_and_unlink(file):
 | 
			
		||||
            #TODO expose decoding quality parameters through config
 | 
			
		||||
            self.decoder = subprocess.Popen(["jt9", "--ft8", "-d", "3", file])
 | 
			
		||||
            decoder = subprocess.Popen(["jt9", "--ft8", "-d", "3", file], stdout=subprocess.PIPE)
 | 
			
		||||
            while True:
 | 
			
		||||
                line = decoder.stdout.readline()
 | 
			
		||||
                if line is None or (isinstance(line, bytes) and len(line) == 0):
 | 
			
		||||
                    break
 | 
			
		||||
                self.outputWriter.send(line)
 | 
			
		||||
            rc = decoder.wait()
 | 
			
		||||
            logger.debug("decoder return code: %i", rc)
 | 
			
		||||
            os.unlink(file)
 | 
			
		||||
 | 
			
		||||
            self.decoder = decoder
 | 
			
		||||
 | 
			
		||||
        if self.fileQueue:
 | 
			
		||||
            file = self.fileQueue.pop()
 | 
			
		||||
            logger.debug("processing file {0}".format(file))
 | 
			
		||||
            threading.Thread(target=decode_and_unlink, args=[file]).start()
 | 
			
		||||
 | 
			
		||||
    def run(self) -> None:
 | 
			
		||||
        logger.debug("FT8 chopper starting up")
 | 
			
		||||
@@ -76,4 +92,38 @@ class Ft8Chopper(threading.Thread):
 | 
			
		||||
 | 
			
		||||
            self.decode()
 | 
			
		||||
        logger.debug("FT8 chopper shutting down")
 | 
			
		||||
        self.outputReader.close()
 | 
			
		||||
        self.outputWriter.close()
 | 
			
		||||
        self.emptyScheduler()
 | 
			
		||||
 | 
			
		||||
    def read(self):
 | 
			
		||||
        try:
 | 
			
		||||
            return self.outputReader.recv()
 | 
			
		||||
        except EOFError:
 | 
			
		||||
            return None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class WsjtParser(object):
 | 
			
		||||
    def __init__(self, handler):
 | 
			
		||||
        self.handler = handler
 | 
			
		||||
 | 
			
		||||
    def parse(self, data):
 | 
			
		||||
        try:
 | 
			
		||||
            msg = data.decode().rstrip()
 | 
			
		||||
            # known debug messages we know to skip
 | 
			
		||||
            if msg.startswith("<DecodeFinished>"):
 | 
			
		||||
                return
 | 
			
		||||
            if msg.startswith(" EOF on input file"):
 | 
			
		||||
                return
 | 
			
		||||
 | 
			
		||||
            out = {}
 | 
			
		||||
            time = datetime.strptime(msg[0:6], "%H%M%S")
 | 
			
		||||
            out["timestamp"] = datetime.combine(date.today(), time.time()).timestamp()
 | 
			
		||||
            out["db"] = float(msg[7:10])
 | 
			
		||||
            out["dt"] = float(msg[11:15])
 | 
			
		||||
            out["freq"] = int(msg[16:20])
 | 
			
		||||
            out["msg"] = msg[24:]
 | 
			
		||||
 | 
			
		||||
            self.handler.write_wsjt_message(out)
 | 
			
		||||
        except ValueError:
 | 
			
		||||
            logger.exception("error while parsing wsjt message")
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user