- fps_test.py -> scripts/fps_test.py - test.py -> scripts/grab_frame.py (umbenannt + nutzbar gemacht: CAMERA_URL aus .env oder URL-Arg, --save, sauberes Error-Handling) Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
69 lines
2.1 KiB
Python
69 lines
2.1 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
grab_frame.py - Holt EIN Bild aus einem MJPEG-Stream und gibt die Groesse aus.
|
|
Schneller Check, ob die Kamera ueberhaupt ein dekodierbares Frame liefert.
|
|
|
|
python3 scripts/grab_frame.py # CAMERA_URL aus .env
|
|
python3 scripts/grab_frame.py http://192.168.10.99:81/stream
|
|
python3 scripts/grab_frame.py --save bild.jpg # Frame zusaetzlich speichern
|
|
"""
|
|
import argparse
|
|
import os
|
|
import sys
|
|
|
|
import cv2
|
|
import numpy as np
|
|
import requests
|
|
|
|
try:
|
|
from dotenv import load_dotenv
|
|
load_dotenv()
|
|
except ImportError:
|
|
pass
|
|
|
|
|
|
def main() -> int:
|
|
p = argparse.ArgumentParser(description="Ein Frame aus einem MJPEG-Stream holen.")
|
|
p.add_argument("url", nargs="?", default=os.environ.get("CAMERA_URL"),
|
|
help="MJPEG-URL (Default: CAMERA_URL aus .env)")
|
|
p.add_argument("--save", metavar="DATEI", help="Frame als JPEG speichern")
|
|
args = p.parse_args()
|
|
|
|
if not args.url:
|
|
print("Keine URL und keine CAMERA_URL in .env gesetzt.")
|
|
return 1
|
|
|
|
try:
|
|
r = requests.get(args.url, stream=True, timeout=(5, 10))
|
|
r.raise_for_status()
|
|
except requests.RequestException as exc:
|
|
print(f"FEHLER: {exc}")
|
|
return 1
|
|
|
|
buf = b""
|
|
try:
|
|
for chunk in r.iter_content(4096):
|
|
buf += chunk
|
|
a = buf.find(b"\xff\xd8")
|
|
b = buf.find(b"\xff\xd9", a + 2)
|
|
if a != -1 and b != -1:
|
|
img = cv2.imdecode(np.frombuffer(buf[a:b + 2], np.uint8), cv2.IMREAD_COLOR)
|
|
if img is None:
|
|
print("Frame empfangen, aber nicht dekodierbar.")
|
|
return 1
|
|
print(f"Frame OK: {img.shape[1]}x{img.shape[0]} px ({b + 2 - a} Bytes JPEG)")
|
|
if args.save:
|
|
cv2.imwrite(args.save, img)
|
|
print(f"gespeichert -> {args.save}")
|
|
return 0
|
|
if len(buf) > 5_000_000:
|
|
break
|
|
finally:
|
|
r.close()
|
|
print("Kein vollstaendiges Frame empfangen.")
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|