Hilfs-Scripts nach scripts/ verschoben (test.py -> grab_frame.py)
- fps_test.py -> scripts/fps_test.py - test.py -> scripts/grab_frame.py (umbenannt + nutzbar gemacht: CAMERA_URL aus .env oder URL-Arg, --save, sauberes Error-Handling) Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
114
scripts/fps_test.py
Normal file
114
scripts/fps_test.py
Normal file
@@ -0,0 +1,114 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
fps_test.py - Misst Frame-Rate, Framegroesse und Durchsatz einer MJPEG-Quelle.
|
||||
|
||||
Nuetzlich zum Diagnostizieren der ESP32-CAM (WLAN-/Bandbreiten-Engpass):
|
||||
ein flüssiger Stream + zuverlaessige Zaehlung braucht ausreichend FPS.
|
||||
|
||||
Beispiele:
|
||||
python3 fps_test.py # misst die App-Pipeline (/webcam_feed)
|
||||
python3 fps_test.py --cam # misst die Kamera direkt (CAMERA_URL aus .env)
|
||||
python3 fps_test.py http://192.168.10.99:81/stream
|
||||
python3 fps_test.py --app -t 20 # App-Stream, 20 Sekunden messen
|
||||
"""
|
||||
import argparse
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
|
||||
import requests
|
||||
|
||||
try:
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv()
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
APP_URL = "http://localhost:8080/webcam_feed"
|
||||
CAM_URL = os.environ.get("CAMERA_URL", "http://192.168.10.99:81/stream")
|
||||
|
||||
SOI = b"\xff\xd8" # JPEG Start-of-Image
|
||||
EOI = b"\xff\xd9" # JPEG End-of-Image
|
||||
|
||||
|
||||
def measure(url: str, seconds: float) -> int:
|
||||
print(f"Messe {seconds:.0f}s an: {url}\n", flush=True)
|
||||
try:
|
||||
resp = requests.get(url, stream=True, timeout=(5, 10))
|
||||
resp.raise_for_status()
|
||||
except requests.RequestException as exc:
|
||||
print(f"FEHLER: {exc}")
|
||||
return 1
|
||||
|
||||
buf = b""
|
||||
frames = 0
|
||||
total_bytes = 0
|
||||
sizes = []
|
||||
t0 = time.time()
|
||||
try:
|
||||
for chunk in resp.iter_content(chunk_size=16384):
|
||||
buf += chunk
|
||||
total_bytes += len(chunk)
|
||||
while True:
|
||||
s = buf.find(SOI)
|
||||
e = buf.find(EOI, s + 2)
|
||||
if s == -1 or e == -1:
|
||||
break
|
||||
sizes.append(e + 2 - s)
|
||||
buf = buf[e + 2:]
|
||||
frames += 1
|
||||
if time.time() - t0 >= seconds:
|
||||
break
|
||||
except requests.RequestException as exc:
|
||||
print(f"Stream-Abbruch nach {frames} Frames: {exc}")
|
||||
finally:
|
||||
resp.close()
|
||||
|
||||
dt = time.time() - t0
|
||||
if not frames:
|
||||
print("Keine vollstaendigen Frames empfangen.")
|
||||
return 1
|
||||
|
||||
fps = frames / dt
|
||||
avg_kb = sum(sizes) / len(sizes) / 1024
|
||||
min_kb = min(sizes) / 1024
|
||||
max_kb = max(sizes) / 1024
|
||||
kbs = total_bytes / dt / 1024
|
||||
mbit = total_bytes * 8 / dt / 1e6
|
||||
|
||||
print(f" Frames : {frames} in {dt:.1f}s")
|
||||
print(f" FPS : {fps:.1f}")
|
||||
print(f" Framegroesse : Ø {avg_kb:.0f} KB (min {min_kb:.0f} / max {max_kb:.0f})")
|
||||
print(f" Durchsatz : {kbs:.0f} KB/s ({mbit:.2f} Mbit/s)")
|
||||
|
||||
print("\n Einschaetzung :", end=" ")
|
||||
if fps >= 10:
|
||||
print("fluessig - gut fuer Zaehlung. ✓")
|
||||
elif fps >= 5:
|
||||
print("brauchbar, aber schnelle Fahrzeuge koennen rutschen.")
|
||||
else:
|
||||
print("zu niedrig - WLAN-Signal/Framegroesse pruefen (kleinere Quality, naeher zum AP).")
|
||||
if mbit < 2 and avg_kb > 40:
|
||||
print(" Hinweis : grosse Frames + wenig Bandbreite -> Quality-Zahl hoeher setzen ODER WLAN verbessern.")
|
||||
return 0
|
||||
|
||||
|
||||
def main() -> int:
|
||||
p = argparse.ArgumentParser(description="Misst FPS/Durchsatz einer MJPEG-Quelle.")
|
||||
p.add_argument("url", nargs="?", help="MJPEG-URL (Default: App-Stream)")
|
||||
p.add_argument("--app", action="store_true", help=f"App-Pipeline messen ({APP_URL})")
|
||||
p.add_argument("--cam", action="store_true", help=f"Kamera direkt messen (CAMERA_URL: {CAM_URL})")
|
||||
p.add_argument("-t", "--seconds", type=float, default=10, help="Messdauer in Sekunden (Default 10)")
|
||||
args = p.parse_args()
|
||||
|
||||
if args.url:
|
||||
url = args.url
|
||||
elif args.cam:
|
||||
url = CAM_URL
|
||||
else:
|
||||
url = APP_URL
|
||||
return measure(url, args.seconds)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
68
scripts/grab_frame.py
Normal file
68
scripts/grab_frame.py
Normal file
@@ -0,0 +1,68 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
grab_frame.py - Holt EIN Bild aus einem MJPEG-Stream und gibt die Groesse aus.
|
||||
Schneller Check, ob die Kamera ueberhaupt ein dekodierbares Frame liefert.
|
||||
|
||||
python3 scripts/grab_frame.py # CAMERA_URL aus .env
|
||||
python3 scripts/grab_frame.py http://192.168.10.99:81/stream
|
||||
python3 scripts/grab_frame.py --save bild.jpg # Frame zusaetzlich speichern
|
||||
"""
|
||||
import argparse
|
||||
import os
|
||||
import sys
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
import requests
|
||||
|
||||
try:
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv()
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
|
||||
def main() -> int:
|
||||
p = argparse.ArgumentParser(description="Ein Frame aus einem MJPEG-Stream holen.")
|
||||
p.add_argument("url", nargs="?", default=os.environ.get("CAMERA_URL"),
|
||||
help="MJPEG-URL (Default: CAMERA_URL aus .env)")
|
||||
p.add_argument("--save", metavar="DATEI", help="Frame als JPEG speichern")
|
||||
args = p.parse_args()
|
||||
|
||||
if not args.url:
|
||||
print("Keine URL und keine CAMERA_URL in .env gesetzt.")
|
||||
return 1
|
||||
|
||||
try:
|
||||
r = requests.get(args.url, stream=True, timeout=(5, 10))
|
||||
r.raise_for_status()
|
||||
except requests.RequestException as exc:
|
||||
print(f"FEHLER: {exc}")
|
||||
return 1
|
||||
|
||||
buf = b""
|
||||
try:
|
||||
for chunk in r.iter_content(4096):
|
||||
buf += chunk
|
||||
a = buf.find(b"\xff\xd8")
|
||||
b = buf.find(b"\xff\xd9", a + 2)
|
||||
if a != -1 and b != -1:
|
||||
img = cv2.imdecode(np.frombuffer(buf[a:b + 2], np.uint8), cv2.IMREAD_COLOR)
|
||||
if img is None:
|
||||
print("Frame empfangen, aber nicht dekodierbar.")
|
||||
return 1
|
||||
print(f"Frame OK: {img.shape[1]}x{img.shape[0]} px ({b + 2 - a} Bytes JPEG)")
|
||||
if args.save:
|
||||
cv2.imwrite(args.save, img)
|
||||
print(f"gespeichert -> {args.save}")
|
||||
return 0
|
||||
if len(buf) > 5_000_000:
|
||||
break
|
||||
finally:
|
||||
r.close()
|
||||
print("Kein vollstaendiges Frame empfangen.")
|
||||
return 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Reference in New Issue
Block a user