CAMERA_URL kann Credentials enthalten (z.B. rtsp://user:pass@host). Diese werden in Hilfetext, Lauf-Ausgabe und Fehlermeldung jetzt zu ***:*** maskiert. Default im Code auf neutralen Platzhalter gesetzt. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
130 lines
4.1 KiB
Python
130 lines
4.1 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
fps_test.py - Misst Frame-Rate, Framegroesse und Durchsatz einer MJPEG-Quelle.
|
|
|
|
Nuetzlich zum Diagnostizieren der ESP32-CAM (WLAN-/Bandbreiten-Engpass):
|
|
ein flüssiger Stream + zuverlaessige Zaehlung braucht ausreichend FPS.
|
|
|
|
Beispiele:
|
|
python3 fps_test.py # misst die App-Pipeline (/webcam_feed)
|
|
python3 fps_test.py --cam # misst die Kamera direkt (CAMERA_URL aus .env)
|
|
python3 fps_test.py http://192.168.10.99:81/stream
|
|
python3 fps_test.py --app -t 20 # App-Stream, 20 Sekunden messen
|
|
"""
|
|
import argparse
|
|
import os
|
|
import sys
|
|
import time
|
|
from urllib.parse import urlparse, urlunparse
|
|
|
|
import requests
|
|
|
|
try:
|
|
from dotenv import load_dotenv
|
|
load_dotenv()
|
|
except ImportError:
|
|
pass
|
|
|
|
APP_URL = "http://localhost:8080/webcam_feed"
|
|
CAM_URL = os.environ.get("CAMERA_URL", "http://CAMERA-IP:81/stream")
|
|
|
|
SOI = b"\xff\xd8" # JPEG Start-of-Image
|
|
EOI = b"\xff\xd9" # JPEG End-of-Image
|
|
|
|
|
|
def redact(url: str) -> str:
|
|
"""Zugangsdaten (user:pass@) in einer URL maskieren -> nie im Klartext anzeigen."""
|
|
try:
|
|
p = urlparse(url)
|
|
if p.username or p.password:
|
|
host = p.hostname or ""
|
|
if p.port:
|
|
host += f":{p.port}"
|
|
return urlunparse(p._replace(netloc=f"***:***@{host}"))
|
|
except Exception:
|
|
pass
|
|
return url
|
|
|
|
|
|
def measure(url: str, seconds: float) -> int:
|
|
print(f"Messe {seconds:.0f}s an: {redact(url)}\n", flush=True)
|
|
try:
|
|
resp = requests.get(url, stream=True, timeout=(5, 10))
|
|
resp.raise_for_status()
|
|
except requests.RequestException as exc:
|
|
print(f"FEHLER: {str(exc).replace(url, redact(url))}")
|
|
return 1
|
|
|
|
buf = b""
|
|
frames = 0
|
|
total_bytes = 0
|
|
sizes = []
|
|
t0 = time.time()
|
|
try:
|
|
for chunk in resp.iter_content(chunk_size=16384):
|
|
buf += chunk
|
|
total_bytes += len(chunk)
|
|
while True:
|
|
s = buf.find(SOI)
|
|
e = buf.find(EOI, s + 2)
|
|
if s == -1 or e == -1:
|
|
break
|
|
sizes.append(e + 2 - s)
|
|
buf = buf[e + 2:]
|
|
frames += 1
|
|
if time.time() - t0 >= seconds:
|
|
break
|
|
except requests.RequestException as exc:
|
|
print(f"Stream-Abbruch nach {frames} Frames: {exc}")
|
|
finally:
|
|
resp.close()
|
|
|
|
dt = time.time() - t0
|
|
if not frames:
|
|
print("Keine vollstaendigen Frames empfangen.")
|
|
return 1
|
|
|
|
fps = frames / dt
|
|
avg_kb = sum(sizes) / len(sizes) / 1024
|
|
min_kb = min(sizes) / 1024
|
|
max_kb = max(sizes) / 1024
|
|
kbs = total_bytes / dt / 1024
|
|
mbit = total_bytes * 8 / dt / 1e6
|
|
|
|
print(f" Frames : {frames} in {dt:.1f}s")
|
|
print(f" FPS : {fps:.1f}")
|
|
print(f" Framegroesse : Ø {avg_kb:.0f} KB (min {min_kb:.0f} / max {max_kb:.0f})")
|
|
print(f" Durchsatz : {kbs:.0f} KB/s ({mbit:.2f} Mbit/s)")
|
|
|
|
print("\n Einschaetzung :", end=" ")
|
|
if fps >= 10:
|
|
print("fluessig - gut fuer Zaehlung. ✓")
|
|
elif fps >= 5:
|
|
print("brauchbar, aber schnelle Fahrzeuge koennen rutschen.")
|
|
else:
|
|
print("zu niedrig - WLAN-Signal/Framegroesse pruefen (kleinere Quality, naeher zum AP).")
|
|
if mbit < 2 and avg_kb > 40:
|
|
print(" Hinweis : grosse Frames + wenig Bandbreite -> Quality-Zahl hoeher setzen ODER WLAN verbessern.")
|
|
return 0
|
|
|
|
|
|
def main() -> int:
|
|
p = argparse.ArgumentParser(description="Misst FPS/Durchsatz einer MJPEG-Quelle.")
|
|
p.add_argument("url", nargs="?", help="MJPEG-URL (Default: App-Stream)")
|
|
p.add_argument("--app", action="store_true", help=f"App-Pipeline messen ({APP_URL})")
|
|
p.add_argument("--cam", action="store_true", help=f"Kamera direkt messen (CAMERA_URL: {redact(CAM_URL)})")
|
|
p.add_argument("-t", "--seconds", type=float, default=10, help="Messdauer in Sekunden (Default 10)")
|
|
args = p.parse_args()
|
|
|
|
if args.url:
|
|
url = args.url
|
|
elif args.cam:
|
|
url = CAM_URL
|
|
else:
|
|
url = APP_URL
|
|
return measure(url, args.seconds)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|