#!/usr/bin/env python3 """ fps_test.py - Misst Frame-Rate, Framegroesse und Durchsatz einer MJPEG-Quelle. Nuetzlich zum Diagnostizieren der ESP32-CAM (WLAN-/Bandbreiten-Engpass): ein flüssiger Stream + zuverlaessige Zaehlung braucht ausreichend FPS. Beispiele: python3 fps_test.py # misst die App-Pipeline (/webcam_feed) python3 fps_test.py --cam # misst die Kamera direkt (CAMERA_URL aus .env) python3 fps_test.py http://192.168.10.99:81/stream python3 fps_test.py --app -t 20 # App-Stream, 20 Sekunden messen """ import argparse import os import sys import time import requests try: from dotenv import load_dotenv load_dotenv() except ImportError: pass APP_URL = "http://localhost:8080/webcam_feed" CAM_URL = os.environ.get("CAMERA_URL", "http://192.168.10.99:81/stream") SOI = b"\xff\xd8" # JPEG Start-of-Image EOI = b"\xff\xd9" # JPEG End-of-Image def measure(url: str, seconds: float) -> int: print(f"Messe {seconds:.0f}s an: {url}\n", flush=True) try: resp = requests.get(url, stream=True, timeout=(5, 10)) resp.raise_for_status() except requests.RequestException as exc: print(f"FEHLER: {exc}") return 1 buf = b"" frames = 0 total_bytes = 0 sizes = [] t0 = time.time() try: for chunk in resp.iter_content(chunk_size=16384): buf += chunk total_bytes += len(chunk) while True: s = buf.find(SOI) e = buf.find(EOI, s + 2) if s == -1 or e == -1: break sizes.append(e + 2 - s) buf = buf[e + 2:] frames += 1 if time.time() - t0 >= seconds: break except requests.RequestException as exc: print(f"Stream-Abbruch nach {frames} Frames: {exc}") finally: resp.close() dt = time.time() - t0 if not frames: print("Keine vollstaendigen Frames empfangen.") return 1 fps = frames / dt avg_kb = sum(sizes) / len(sizes) / 1024 min_kb = min(sizes) / 1024 max_kb = max(sizes) / 1024 kbs = total_bytes / dt / 1024 mbit = total_bytes * 8 / dt / 1e6 print(f" Frames : {frames} in {dt:.1f}s") print(f" FPS : {fps:.1f}") print(f" Framegroesse : Ø {avg_kb:.0f} KB (min {min_kb:.0f} / max {max_kb:.0f})") print(f" Durchsatz : {kbs:.0f} KB/s ({mbit:.2f} Mbit/s)") print("\n Einschaetzung :", end=" ") if fps >= 10: print("fluessig - gut fuer Zaehlung. ✓") elif fps >= 5: print("brauchbar, aber schnelle Fahrzeuge koennen rutschen.") else: print("zu niedrig - WLAN-Signal/Framegroesse pruefen (kleinere Quality, naeher zum AP).") if mbit < 2 and avg_kb > 40: print(" Hinweis : grosse Frames + wenig Bandbreite -> Quality-Zahl hoeher setzen ODER WLAN verbessern.") return 0 def main() -> int: p = argparse.ArgumentParser(description="Misst FPS/Durchsatz einer MJPEG-Quelle.") p.add_argument("url", nargs="?", help="MJPEG-URL (Default: App-Stream)") p.add_argument("--app", action="store_true", help=f"App-Pipeline messen ({APP_URL})") p.add_argument("--cam", action="store_true", help=f"Kamera direkt messen (CAMERA_URL: {CAM_URL})") p.add_argument("-t", "--seconds", type=float, default=10, help="Messdauer in Sekunden (Default 10)") args = p.parse_args() if args.url: url = args.url elif args.cam: url = CAM_URL else: url = APP_URL return measure(url, args.seconds) if __name__ == "__main__": sys.exit(main())