use a single connection to avoid the managing overhead

This commit is contained in:
Jakob Ketterl 2021-04-11 21:04:13 +02:00
parent cb3cb50cbd
commit 4993a56235
4 changed files with 83 additions and 96 deletions

View File

@ -22,8 +22,7 @@ class AudioChopper(threading.Thread, Output, ProfileSourceSubscriber):
if mode is None or not isinstance(mode, AudioChopperMode):
raise ValueError("Mode {} is not an audio chopper mode".format(mode_str))
self.profile_source = mode.get_profile_source()
self.writersChangedOut = None
self.writersChangedIn = None
(self.outputReader, self.outputWriter) = Pipe()
super().__init__()
def stop_writers(self):
@ -34,11 +33,10 @@ class AudioChopper(threading.Thread, Output, ProfileSourceSubscriber):
self.stop_writers()
sorted_profiles = sorted(self.profile_source.getProfiles(), key=lambda p: p.getInterval())
groups = {interval: list(group) for interval, group in groupby(sorted_profiles, key=lambda p: p.getInterval())}
writers = [AudioWriter(self.dsp, interval, profiles) for interval, profiles in groups.items()]
writers = [AudioWriter(self.dsp, self.outputWriter, interval, profiles) for interval, profiles in groups.items()]
for w in writers:
w.start()
self.writers = writers
self.writersChangedOut.send(None)
def supports_type(self, t):
return t == "audio"
@ -49,7 +47,6 @@ class AudioChopper(threading.Thread, Output, ProfileSourceSubscriber):
def run(self) -> None:
logger.debug("Audio chopper starting up")
self.writersChangedIn, self.writersChangedOut = Pipe()
self.setup_writers()
self.profile_source.subscribe(self)
while self.doRun:
@ -67,20 +64,25 @@ class AudioChopper(threading.Thread, Output, ProfileSourceSubscriber):
logger.debug("Audio chopper shutting down")
self.profile_source.unsubscribe(self)
self.stop_writers()
self.writersChangedOut.close()
self.writersChangedIn.close()
self.outputWriter.close()
self.outputWriter = None
# drain messages left in the queue so that the queue can be successfully closed
# this is necessary since python keeps the file descriptors open otherwise
try:
while True:
self.outputReader.recv()
except EOFError:
pass
self.outputReader.close()
self.outputReader = None
def onProfilesChanged(self):
logger.debug("profile change received, resetting writers...")
self.setup_writers()
def read(self):
while True:
try:
readers = wait([w.outputReader for w in self.writers] + [self.writersChangedIn])
received = [(r, r.recv()) for r in readers]
data = [d for r, d in received if r is not self.writersChangedIn]
if data:
return data
except (EOFError, OSError):
return None
try:
return self.outputReader.recv()
except (EOFError, OSError):
return None

View File

@ -38,14 +38,14 @@ class WaveFile(object):
class AudioWriter(object):
def __init__(self, active_dsp, interval, profiles: List[AudioChopperProfile]):
def __init__(self, active_dsp, outputWriter, interval, profiles: List[AudioChopperProfile]):
self.dsp = active_dsp
self.outputWriter = outputWriter
self.interval = interval
self.profiles = profiles
self.wavefile = None
self.switchingLock = threading.Lock()
self.timer = None
(self.outputReader, self.outputWriter) = Pipe()
def getWaveFile(self):
tmp_dir = CoreConfig().get_temporary_directory()
@ -114,19 +114,6 @@ class AudioWriter(object):
self.wavefile.writeframes(data)
def stop(self):
self.outputWriter.close()
self.outputWriter = None
# drain messages left in the queue so that the queue can be successfully closed
# this is necessary since python keeps the file descriptors open otherwise
try:
while True:
self.outputReader.recv()
except EOFError:
pass
self.outputReader.close()
self.outputReader = None
self.cancelTimer()
try:
self.wavefile.close()

View File

@ -84,40 +84,39 @@ class Js8TurboProfile(Js8Profile):
class Js8Parser(Parser):
decoderRegex = re.compile(" ?<Decode(Started|Debug|Finished)>")
def parse(self, messages):
for raw in messages:
try:
profile, freq, raw_msg = raw
self.setDialFrequency(freq)
msg = raw_msg.decode().rstrip()
if Js8Parser.decoderRegex.match(msg):
return
if msg.startswith(" EOF on input file"):
return
def parse(self, raw):
try:
profile, freq, raw_msg = raw
self.setDialFrequency(freq)
msg = raw_msg.decode().rstrip()
if Js8Parser.decoderRegex.match(msg):
return
if msg.startswith(" EOF on input file"):
return
frame = Js8().parse_message(msg)
self.handler.write_js8_message(frame, self.dial_freq)
frame = Js8().parse_message(msg)
self.handler.write_js8_message(frame, self.dial_freq)
self.pushDecode()
self.pushDecode()
if (isinstance(frame, Js8FrameHeartbeat) or isinstance(frame, Js8FrameCompound)) and frame.grid:
Map.getSharedInstance().updateLocation(
frame.callsign, LocatorLocation(frame.grid), "JS8", self.band
)
ReportingEngine.getSharedInstance().spot(
{
"callsign": frame.callsign,
"mode": "JS8",
"locator": frame.grid,
"freq": self.dial_freq + frame.freq,
"db": frame.db,
"timestamp": frame.timestamp,
"msg": str(frame),
}
)
if (isinstance(frame, Js8FrameHeartbeat) or isinstance(frame, Js8FrameCompound)) and frame.grid:
Map.getSharedInstance().updateLocation(
frame.callsign, LocatorLocation(frame.grid), "JS8", self.band
)
ReportingEngine.getSharedInstance().spot(
{
"callsign": frame.callsign,
"mode": "JS8",
"locator": frame.grid,
"freq": self.dial_freq + frame.freq,
"db": frame.db,
"timestamp": frame.timestamp,
"msg": str(frame),
}
)
except Exception:
logger.exception("error while parsing js8 message")
except Exception:
logger.exception("error while parsing js8 message")
def pushDecode(self):
metrics = Metrics.getSharedInstance()

View File

@ -246,44 +246,43 @@ class Q65Profile(WsjtProfile):
class WsjtParser(Parser):
def parse(self, messages):
for data in messages:
try:
profile, freq, raw_msg = data
self.setDialFrequency(freq)
msg = raw_msg.decode().rstrip()
# known debug messages we know to skip
if msg.startswith("<DecodeFinished>"):
return
if msg.startswith(" EOF on input file"):
return
def parse(self, data):
try:
profile, freq, raw_msg = data
self.setDialFrequency(freq)
msg = raw_msg.decode().rstrip()
# known debug messages we know to skip
if msg.startswith("<DecodeFinished>"):
return
if msg.startswith(" EOF on input file"):
return
mode = profile.getMode()
if mode in ["WSPR", "FST4W"]:
messageParser = BeaconMessageParser()
else:
messageParser = QsoMessageParser()
if mode == "WSPR":
decoder = WsprDecoder(profile, messageParser)
else:
decoder = Jt9Decoder(profile, messageParser)
out = decoder.parse(msg, freq)
if isinstance(profile, Q65Profile) and not out["msg"]:
# all efforts in vain, it's just a potential signal indicator
return
out["mode"] = mode
out["interval"] = profile.getInterval()
mode = profile.getMode()
if mode in ["WSPR", "FST4W"]:
messageParser = BeaconMessageParser()
else:
messageParser = QsoMessageParser()
if mode == "WSPR":
decoder = WsprDecoder(profile, messageParser)
else:
decoder = Jt9Decoder(profile, messageParser)
out = decoder.parse(msg, freq)
if isinstance(profile, Q65Profile) and not out["msg"]:
# all efforts in vain, it's just a potential signal indicator
return
out["mode"] = mode
out["interval"] = profile.getInterval()
self.pushDecode(mode)
if "callsign" in out and "locator" in out:
Map.getSharedInstance().updateLocation(
out["callsign"], LocatorLocation(out["locator"]), mode, self.band
)
ReportingEngine.getSharedInstance().spot(out)
self.pushDecode(mode)
if "callsign" in out and "locator" in out:
Map.getSharedInstance().updateLocation(
out["callsign"], LocatorLocation(out["locator"]), mode, self.band
)
ReportingEngine.getSharedInstance().spot(out)
self.handler.write_wsjt_message(out)
except Exception:
logger.exception("Exception while parsing wsjt message")
self.handler.write_wsjt_message(out)
except Exception:
logger.exception("Exception while parsing wsjt message")
def pushDecode(self, mode):
metrics = Metrics.getSharedInstance()