diff --git a/fps_test.py b/fps_test.py new file mode 100644 index 0000000..10e3f60 --- /dev/null +++ b/fps_test.py @@ -0,0 +1,114 @@ +#!/usr/bin/env python3 +""" +fps_test.py - Misst Frame-Rate, Framegroesse und Durchsatz einer MJPEG-Quelle. + +Nuetzlich zum Diagnostizieren der ESP32-CAM (WLAN-/Bandbreiten-Engpass): +ein flüssiger Stream + zuverlaessige Zaehlung braucht ausreichend FPS. + +Beispiele: + python3 fps_test.py # misst die App-Pipeline (/webcam_feed) + python3 fps_test.py --cam # misst die Kamera direkt (CAMERA_URL aus .env) + python3 fps_test.py http://192.168.10.99:81/stream + python3 fps_test.py --app -t 20 # App-Stream, 20 Sekunden messen +""" +import argparse +import os +import sys +import time + +import requests + +try: + from dotenv import load_dotenv + load_dotenv() +except ImportError: + pass + +APP_URL = "http://localhost:8080/webcam_feed" +CAM_URL = os.environ.get("CAMERA_URL", "http://192.168.10.99:81/stream") + +SOI = b"\xff\xd8" # JPEG Start-of-Image +EOI = b"\xff\xd9" # JPEG End-of-Image + + +def measure(url: str, seconds: float) -> int: + print(f"Messe {seconds:.0f}s an: {url}\n", flush=True) + try: + resp = requests.get(url, stream=True, timeout=(5, 10)) + resp.raise_for_status() + except requests.RequestException as exc: + print(f"FEHLER: {exc}") + return 1 + + buf = b"" + frames = 0 + total_bytes = 0 + sizes = [] + t0 = time.time() + try: + for chunk in resp.iter_content(chunk_size=16384): + buf += chunk + total_bytes += len(chunk) + while True: + s = buf.find(SOI) + e = buf.find(EOI, s + 2) + if s == -1 or e == -1: + break + sizes.append(e + 2 - s) + buf = buf[e + 2:] + frames += 1 + if time.time() - t0 >= seconds: + break + except requests.RequestException as exc: + print(f"Stream-Abbruch nach {frames} Frames: {exc}") + finally: + resp.close() + + dt = time.time() - t0 + if not frames: + print("Keine vollstaendigen Frames empfangen.") + return 1 + + fps = frames / dt + avg_kb = sum(sizes) / len(sizes) / 1024 + min_kb = min(sizes) / 1024 + max_kb = max(sizes) / 1024 + kbs = total_bytes / dt / 1024 + mbit = total_bytes * 8 / dt / 1e6 + + print(f" Frames : {frames} in {dt:.1f}s") + print(f" FPS : {fps:.1f}") + print(f" Framegroesse : Ø {avg_kb:.0f} KB (min {min_kb:.0f} / max {max_kb:.0f})") + print(f" Durchsatz : {kbs:.0f} KB/s ({mbit:.2f} Mbit/s)") + + print("\n Einschaetzung :", end=" ") + if fps >= 10: + print("fluessig - gut fuer Zaehlung. ✓") + elif fps >= 5: + print("brauchbar, aber schnelle Fahrzeuge koennen rutschen.") + else: + print("zu niedrig - WLAN-Signal/Framegroesse pruefen (kleinere Quality, naeher zum AP).") + if mbit < 2 and avg_kb > 40: + print(" Hinweis : grosse Frames + wenig Bandbreite -> Quality-Zahl hoeher setzen ODER WLAN verbessern.") + return 0 + + +def main() -> int: + p = argparse.ArgumentParser(description="Misst FPS/Durchsatz einer MJPEG-Quelle.") + p.add_argument("url", nargs="?", help="MJPEG-URL (Default: App-Stream)") + p.add_argument("--app", action="store_true", help=f"App-Pipeline messen ({APP_URL})") + p.add_argument("--cam", action="store_true", help=f"Kamera direkt messen (CAMERA_URL: {CAM_URL})") + p.add_argument("-t", "--seconds", type=float, default=10, help="Messdauer in Sekunden (Default 10)") + args = p.parse_args() + + if args.url: + url = args.url + elif args.cam: + url = CAM_URL + else: + url = APP_URL + return measure(url, args.seconds) + + +if __name__ == "__main__": + sys.exit(main())