diff --git a/owrx/kiss.py b/owrx/kiss.py index bc3300b..6a85842 100644 --- a/owrx/kiss.py +++ b/owrx/kiss.py @@ -79,14 +79,14 @@ class KissClient(object): yield l[i:i + n] information = ax25frame[control_pid+2:] - # TODO how can we tell if this is an APRS frame at all? - aprsData = self.parseAprsData(information) data = { "destination": self.extractCallsign(ax25frame[0:7]), "source": self.extractCallsign(ax25frame[7:14]), "path": [self.extractCallsign(c) for c in chunks(ax25frame[14:control_pid], 7)] } + # TODO how can we tell if this is an APRS frame at all? + aprsData = self.parseAprsData(data["destination"], information) data.update(aprsData) logger.debug(data) @@ -95,43 +95,94 @@ class KissClient(object): Map.getSharedInstance().updateLocation(data["source"], loc, "APRS") return data - def parseAprsData(self, data): - #hexdump(data) + def hasCompressedCoordinatesx(self, raw): + return raw[0] == "/" or raw[0] == "\\" - # TODO detect MIC-E frame and decode accordingly + def parseUncompressedCoordinates(self, raw): + # TODO parse N/S and E/W + return { + "lat": int(raw[0:2]) + float(raw[2:7]) / 60, + "lon": int(raw[9:12]) + float(raw[12:17]) / 60, + "symbol": raw[18] + } - data = data.decode() - logger.debug(data) - - def parseCoordinates(raw): - # TODO parse N/S and E/W - return { - "lat": int(raw[0:2]) + float(raw[2:7]) / 60, - "lon": int(raw[9:12]) + float(raw[12:17]) / 60 - } - - if data[0] == "!": - # fixed - coords = parseCoordinates(data[1:19]) - coords["comment"] = data[20:] - return coords - elif data[0] == "/": - # APRS TNC - coords = parseCoordinates(data[8:26]) - coords["comment"] = data[27:] - return coords - elif data[0] == "@": - # MOBILE - # TODO CSE, SPD, BRG, 90Q, comments - if data[26] == "\\": - # DF - return parseCoordinates(data[8:26]) - coords = parseCoordinates(data[8:26]) - coords["symbol"] = data[26] - coords["comment"] = data[27:] - return coords + def parseCompressedCoordinates(self, raw): + # TODO parse compressed coordinate formats return {} + def parseMicEFrame(self, destination, information): + # TODO decode MIC-E Frame + + def extractNumber(input): + n = ord(input) + if n >= ord("P"): + return n - ord("P") + if n >= ord("A"): + return n - ord("A") + return n - ord("0") + + def listToNumber(input): + base = listToNumber(input[:-1]) * 10 if len(input) > 1 else 0 + return base + input[-1] + + logger.debug(destination) + rawLatitude = [extractNumber(c) for c in destination[0:6]] + logger.debug(rawLatitude) + lat = listToNumber(rawLatitude[0:2]) + listToNumber(rawLatitude[2:6]) / 6000 + if ord(destination[3]) <= ord("9"): + lat *= -1 + + logger.debug(lat) + + logger.debug(information) + lon = information[1] - 28 + if ord(destination[4]) >= ord("P"): + lon += 100 + if 180 <= lon <= 189: + lon -= 80 + if 190 <= lon <= 199: + lon -= 190 + + minutes = information[2] - 28 + if minutes >= 60: + minutes -= 60 + + lon += minutes / 60 + (information[3] - 28) / 6000 + + if ord(destination[5]) >= ord("P"): + lon *= -1 + + return { + "lat": lat, + "lon": lon, + "comment": information[9:].decode() + } + + def parseAprsData(self, destination, information): + if information[0] == 0x1c or information[0] == 0x60: + return self.parseMicEFrame(destination, information) + + information = information.decode() + logger.debug(information) + + if information[0] == "!" or information[0] == "=": + # position without timestamp + information = information[1:] + elif information[0] == "/" or information[0] == "@": + # position with timestamp + # TODO parse timestamp + information = information[8:] + else: + return {} + + if self.hasCompressedCoordinatesx(information): + coords = self.parseCompressedCoordinates(information[0:9]) + coords["comment"] = information[9:] + else: + coords = self.parseUncompressedCoordinates(information[0:19]) + coords["comment"] = information[19:] + return coords + def extractCallsign(self, input): cs = bytes([b >> 1 for b in input[0:6]]).decode().strip() ssid = (input[6] & 0b00011110) >> 1