import os import json import uuid import time import threading from datetime import datetime from zoneinfo import ZoneInfo from threading import Event, Lock, Condition from urllib.parse import urlparse import cv2 import numpy as np import requests import paho.mqtt.client as mqtt from flask import ( Flask, render_template, Response, request, redirect, url_for, send_from_directory, session, jsonify, abort, ) from werkzeug.utils import secure_filename from ultralytics import YOLO # .env laden, BEVOR os.environ ausgelesen wird (Broker-/Kamera-/MQTT-Konfig). # load_dotenv versteht auch "export VAR=..."-Zeilen und Inline-Kommentare. try: from dotenv import load_dotenv load_dotenv() except ImportError: print("[config] python-dotenv nicht installiert - .env wird ignoriert", flush=True) # --------------------------------------------------------------------------- # Konfiguration # --------------------------------------------------------------------------- # Kamera-URL ueber Env setzen (.env). ESP32-CAM: Port 81, Pfad /stream. CAMERA_URL = os.environ.get("CAMERA_URL", "http://CAMERA-IP:81/stream") # --- Inferenz-Tuning (alles per Env ueberschreibbar) ----------------------- # Webcam nutzt das leichte Nano-Modell; Video-Upload bleibt auf yolo11s. WEBCAM_MODEL = os.environ.get("WEBCAM_MODEL", "yolo11n.pt") # Kleinere imgsz = weniger Rechenarbeit pro Frame (Default YOLO waere 640). YOLO_IMGSZ = int(os.environ.get("YOLO_IMGSZ", "480")) # Motion-Gate: ab so vielen veraenderten Pixeln (320x180-Graubild) gilt # die Szene als "in Bewegung" und YOLO laeuft. Sonst kein Inferenz-Call. MOTION_PIXELS = int(os.environ.get("MOTION_PIXELS", "500")) # Always-on: Grabber laeuft unabhaengig von Zuschauern durch (24/7-Counter). # 0 = nur zaehlen/streamen, wenn ein Browser zuschaut (gibt ESP32-Slot frei). # 1 = dauerhaft verbinden + zaehlen, egal ob jemand zuschaut. GRABBER_ALWAYS_ON = os.environ.get("GRABBER_ALWAYS_ON", "0") == "1" # Karenzzeit: nach dem letzten Zuschauer noch so viele Sekunden weiterstreamen, # bevor die Kameraverbindung freigegeben wird. Verhindert, dass kurzes # Zuschauer-Aus (z.B. -Reload durch die Offline-Erkennung) staendig # Reconnects samt Tracker-/Zaehlzustand-Reset ausloest. VIEWER_GRACE_SEC = float(os.environ.get("VIEWER_GRACE_SEC", "15")) # FP16 nur auf CUDA sinnvoll -> automatisch erkennen, per Env erzwingbar. try: import torch _CUDA = torch.cuda.is_available() except Exception: _CUDA = False YOLO_HALF = os.environ.get("YOLO_HALF", "1" if _CUDA else "0") == "1" # COCO-Klassen: 2=car, 3=motorcycle, 5=bus, 7=truck. Filtert Person/Couch etc. VEHICLE_CLASS_IDS = [2, 3, 5, 7] # --- Zaehl-Robustheit fuer schnelle Fahrzeuge ------------------------------ # Band um die Zaehllinie (in Pixeln, bezogen auf das 1020x600-Bild): # Ein Fahrzeug wird schon gezaehlt, wenn sein Mittelpunkt naeher als # COUNT_BAND_PX an der Linie liegt - es muss NICHT mehr ein Punkt davor UND # einer dahinter erfasst werden. Faengt schnelle Fahrzeuge ab, die zwischen # zwei Frames weit springen. 0 = aus (nur klassischer Segment-Schnitt). COUNT_BAND_PX = int(os.environ.get("COUNT_BAND_PX", "45")) # Entprellung gegen Doppelzaehlung bei Track-ID-Wechseln: ein neuer Zaehl- # punkt wird verworfen, wenn er naeher als COUNT_DEDUP_PX an einem in den # letzten COUNT_DEDUP_FRAMES Frames gezaehlten Punkt liegt. COUNT_DEDUP_PX = int(os.environ.get("COUNT_DEDUP_PX", "60")) COUNT_DEDUP_FRAMES = int(os.environ.get("COUNT_DEDUP_FRAMES", "12")) # 24/7-Betrieb: Track-IDs, die so viele Frames nicht mehr gesehen wurden, # werden aus track_positions UND counted_ids entfernt. Verhindert das # unbegrenzte Wachsen der Zustaende (Speicherleck) und falsches Unterdruecken # bei spaeterer Track-ID-Wiederverwendung. Wert > Verweildauer eines Fahrzeugs. COUNT_FORGET_FRAMES = int(os.environ.get("COUNT_FORGET_FRAMES", "150")) # Diagnose: pro verarbeitetem Frame mit Fahrzeugen eine Log-Zeile ausgeben # (Bewegung? Detektionen? IDs? Abstand zur Linie? gezaehlt?). COUNT_DEBUG=1. COUNT_DEBUG = os.environ.get("COUNT_DEBUG", "0") == "1" # --- MQTT: ein Event pro Linienueberquerung (fuer n8n -> NocoDB) ----------- # Komplett abschaltbar: MQTT_ENABLED=false -> App laeuft ohne Broker/Events. MQTT_ENABLED = os.environ.get("MQTT_ENABLED", "true").strip().lower() in ( "1", "true", "yes", "on", ) MQTT_HOST = os.environ.get("MQTT_HOST", "127.0.0.1") MQTT_PORT = int(os.environ.get("MQTT_PORT", "1883")) MQTT_USER = os.environ.get("MQTT_USER") MQTT_PASS = os.environ.get("MQTT_PASS") MQTT_TOPIC = os.environ.get("MQTT_TOPIC", "vehiclecounter/cam1") CAMERA_ID = os.environ.get("CAMERA_ID", "cam1") # Availability-/Status-Topic: "online" beim Verbinden (Birth-Message), # "offline" automatisch via Last Will (LWT), falls die Verbindung abreisst. STATUS_TOPIC = f"{MQTT_TOPIC}/status" # Zeitzone fuer den Zeitstempel (DST-aware). Standard Europe/Berlin. LOCAL_TZ = ZoneInfo(os.environ.get("TZ_NAME", "Europe/Berlin")) def _on_mqtt_connect(client, userdata, flags, *args): """Birth-Message: nach jedem (Re-)Connect 'online' (retained) senden.""" client.publish(STATUS_TOPIC, "online", qos=1, retain=True) print(f"[mqtt] connected -> {STATUS_TOPIC} online", flush=True) _mqtt = None if not MQTT_ENABLED: print("[mqtt] deaktiviert (MQTT_ENABLED=false) - keine Events", flush=True) else: # paho-mqtt 2.x verlangt die CallbackAPIVersion, 1.x kennt sie nicht. # getattr statt direktem Zugriff -> kein Type-Checker-Fehler bei 1.x-Stubs. _cb_api = getattr(mqtt, "CallbackAPIVersion", None) if _cb_api is not None: _mqtt = mqtt.Client(_cb_api.VERSION2) else: _mqtt = mqtt.Client() if MQTT_USER: _mqtt.username_pw_set(MQTT_USER, MQTT_PASS) _mqtt.on_connect = _on_mqtt_connect # Last Will: Broker publiziert das, sobald die Verbindung unsauber abbricht. _mqtt.will_set(STATUS_TOPIC, "offline", qos=1, retain=True) try: # async + loop_start -> blockiert den App-Start nicht, wenn der Broker weg ist _mqtt.connect_async(MQTT_HOST, MQTT_PORT, keepalive=60) _mqtt.loop_start() except Exception as exc: print(f"[mqtt] init failed: {exc}", flush=True) def publish_crossing(vehicle_type, track_id, source): """Crossing-Event auf {MQTT_TOPIC}/crossing (QoS 1, nicht retained).""" if _mqtt is None: return payload = { "event": "crossing", "camera": CAMERA_ID, "source": source, "type": vehicle_type, "track_id": int(track_id), "ts": datetime.now(LOCAL_TZ).isoformat(), } try: _mqtt.publish(f"{MQTT_TOPIC}/crossing", json.dumps(payload), qos=1, retain=False) except Exception as exc: print(f"[mqtt] publish failed: {exc}", flush=True) app = Flask(__name__) app.secret_key = os.environ.get("SECRET_KEY", "vehicle_dev_secret") app.config["MAX_CONTENT_LENGTH"] = 200 * 1024 * 1024 # 200MB Upload-Limit # Globales Modell fuer den Video-Upload-Pfad (per Request). model = YOLO("yolo11s.pt") names = model.names VEHICLE_CLASSES = {"car", "truck", "bus", "motorcycle"} ALLOWED_EXTENSIONS = {"mp4", "mov", "avi", "mkv"} UPLOAD_DIR = "uploads" DEFAULT_LINE = {"x1": 0, "y1": 300, "x2": 1020, "y2": 300} FRAME_SIZE = (1020, 600) # Persistente Zaehllinie: wird als JSON gespeichert und beim Start geladen, # damit sie einen Neustart ueberlebt (Pfad per Env ueberschreibbar). LINE_FILE = os.environ.get("LINE_FILE", "counting_line.json") def _valid_line(d): """Validiert/normalisiert ein Linien-Dict zu int-Koordinaten oder None.""" try: return {k: int(d[k]) for k in ("x1", "y1", "x2", "y2")} except (KeyError, ValueError, TypeError): return None def load_saved_line() -> dict: """Laedt die gespeicherte Linie oder faellt auf DEFAULT_LINE zurueck.""" try: with open(LINE_FILE) as fh: line = _valid_line(json.load(fh)) if line: return line except (OSError, json.JSONDecodeError): pass return dict(DEFAULT_LINE) def save_line(line: dict) -> None: """Speichert die Linie atomar als JSON (ueberlebt App-Neustart).""" try: tmp = f"{LINE_FILE}.tmp" with open(tmp, "w") as fh: json.dump(line, fh) os.replace(tmp, LINE_FILE) except OSError as exc: print(f"[line] save failed: {exc}", flush=True) # Beim Start einmal laden -> Default fuer Session UND Webcam-Grabber. SAVED_LINE = load_saved_line() # --------------------------------------------------------------------------- # Hilfsfunktionen # --------------------------------------------------------------------------- def allowed_file(filename: str) -> bool: return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS def ensure_upload_dir() -> None: if not os.path.exists(UPLOAD_DIR): os.makedirs(UPLOAD_DIR) def get_line_from_session(): if "counting_line" not in session: session["counting_line"] = dict(SAVED_LINE) return session["counting_line"] def fresh_vehicle_counts() -> dict[str, int]: return {vehicle: 0 for vehicle in VEHICLE_CLASSES} def new_state() -> dict: """Frischer Zaehl-/Tracking-Zustand fuer einen Stream.""" return { "track_positions": {}, "counted_ids": set(), "count": 0, "types": fresh_vehicle_counts(), "frame_idx": 0, "recent_counts": [], # [(cx, cy, frame_idx)] fuer die Entprellung } def reset_state(state: dict) -> None: state["track_positions"].clear() state["counted_ids"].clear() state["count"] = 0 state["types"] = fresh_vehicle_counts() state["frame_idx"] = 0 state["recent_counts"].clear() def line_intersect(p1, p2, p3, p4) -> bool: """True, wenn sich die Strecken p1-p2 und p3-p4 schneiden.""" x1, y1 = p1 x2, y2 = p2 x3, y3 = p3 x4, y4 = p4 denom = (x1 - x2) * (y3 - y4) - (y1 - y2) * (x3 - x4) if abs(denom) < 1e-10: return False t = ((x1 - x3) * (y3 - y4) - (y1 - y3) * (x3 - x4)) / denom u = -((x1 - x2) * (y1 - y3) - (y1 - y2) * (x1 - x3)) / denom return 0 <= t <= 1 and 0 <= u <= 1 def crossed_line(prev_pos, curr_pos, line_start, line_end) -> bool: return line_intersect(prev_pos, curr_pos, line_start, line_end) def point_to_segment_dist(px, py, ax, ay, bx, by) -> float: """Kuerzester Abstand des Punkts (px,py) zur Strecke (ax,ay)-(bx,by).""" dx, dy = bx - ax, by - ay if dx == 0 and dy == 0: return ((px - ax) ** 2 + (py - ay) ** 2) ** 0.5 t = ((px - ax) * dx + (py - ay) * dy) / (dx * dx + dy * dy) t = max(0.0, min(1.0, t)) cx, cy = ax + t * dx, ay + t * dy return ((px - cx) ** 2 + (py - cy) ** 2) ** 0.5 def _recently_counted(recent_counts, cx, cy) -> bool: """True, wenn nahe (cx,cy) kuerzlich schon gezaehlt wurde (ID-Wechsel).""" for rx, ry, _ in recent_counts: if (cx - rx) ** 2 + (cy - ry) ** 2 <= COUNT_DEDUP_PX ** 2: return True return False def draw_overlay(frame, line_start, line_end, state): """Zeichnet Zaehllinie (gestrichelt) + Zaehler-Box. Kein YOLO.""" types = state["types"] cv2.line(frame, line_start, line_end, (0, 255, 255), 3, cv2.LINE_AA) line_length = int(np.sqrt((line_end[0] - line_start[0]) ** 2 + (line_end[1] - line_start[1]) ** 2)) dash_length = 20 for i in range(0, max(line_length, 1), dash_length * 2): t1 = i / line_length if line_length else 0 t2 = min((i + dash_length) / line_length, 1.0) if line_length else 0 x1_dash = int(line_start[0] + t1 * (line_end[0] - line_start[0])) y1_dash = int(line_start[1] + t1 * (line_end[1] - line_start[1])) x2_dash = int(line_start[0] + t2 * (line_end[0] - line_start[0])) y2_dash = int(line_start[1] + t2 * (line_end[1] - line_start[1])) cv2.line(frame, (x1_dash, y1_dash), (x2_dash, y2_dash), (0, 0, 0), 3) cv2.rectangle(frame, (10, 10), (350, 140), (0, 0, 0), -1) cv2.putText(frame, f"Gesamt: {state['count']}", (20, 35), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) cv2.putText(frame, f"Autos: {types.get('car', 0)}", (20, 65), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (200, 200, 200), 1) cv2.putText(frame, f"LKW: {types.get('truck', 0)}", (20, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (200, 200, 200), 1) cv2.putText(frame, f"Busse: {types.get('bus', 0)}", (20, 115), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (200, 200, 200), 1) cv2.putText(frame, f"Motorraeder: {types.get('motorcycle', 0)}", (20, 135), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (200, 200, 200), 1) return frame def render_static(img, line_start, line_end, state): """Nur Overlay auf den Rohframe -> wird genutzt, wenn nichts in Bewegung ist (Motion-Gate). Laeuft kein YOLO -> GPU bleibt idle.""" frame = cv2.resize(img, FRAME_SIZE) return draw_overlay(frame, line_start, line_end, state) def process_frame(frame, det_model, det_names, line_start, line_end, state, source="video"): """ Skaliert den Frame, fuehrt YOLO-Tracking aus, zaehlt Linienueberquerungen und zeichnet alle Overlays. Mutiert `state` in-place und gibt den annotierten Frame zurueck. Wird von Webcam-Grabber UND Video-Pfad genutzt. """ frame = cv2.resize(frame, FRAME_SIZE) results = det_model.track( frame, persist=True, imgsz=YOLO_IMGSZ, half=YOLO_HALF, classes=VEHICLE_CLASS_IDS, verbose=False, ) track_positions = state["track_positions"] counted_ids = state["counted_ids"] types = state["types"] recent_counts = state["recent_counts"] state["frame_idx"] += 1 frame_idx = state["frame_idx"] # Entprell-Historie auf das Zeitfenster eindampfen. recent_counts[:] = [ c for c in recent_counts if frame_idx - c[2] <= COUNT_DEDUP_FRAMES ] # Garbage Collection: laengst verschwundene Track-IDs vergessen, damit # track_positions/counted_ids im 24/7-Betrieb nicht unbegrenzt wachsen. stale = [ tid for tid, pos in track_positions.items() if frame_idx - pos[2] > COUNT_FORGET_FRAMES ] for tid in stale: track_positions.pop(tid, None) counted_ids.discard(tid) dbg_vehicles = 0 dbg_counted = 0 dbg_dists = [] if results and results[0].boxes is not None and results[0].boxes.id is not None: boxes = results[0].boxes.xyxy.int().cpu().tolist() class_ids = results[0].boxes.cls.int().cpu().tolist() track_ids = results[0].boxes.id.int().cpu().tolist() for box, class_id, track_id in zip(boxes, class_ids, track_ids): label_name = det_names[class_id] x1, y1, x2, y2 = box center_x = (x1 + x2) // 2 center_y = (y1 + y2) // 2 if label_name in VEHICLE_CLASSES: if COUNT_DEBUG: dbg_vehicles += 1 dbg_dists.append(int(point_to_segment_dist( center_x, center_y, line_start[0], line_start[1], line_end[0], line_end[1]))) if track_id not in counted_ids: crossed = False # (a) Klassischer Segment-Schnitt: Punkt davor UND dahinter. if track_id in track_positions: prev_x, prev_y, _ = track_positions[track_id] cv2.line(frame, (prev_x, prev_y), (center_x, center_y), (255, 100, 0), 2) if crossed_line((prev_x, prev_y), (center_x, center_y), line_start, line_end): crossed = True # (b) Band um die Linie: ein einzelnes Sample nah an der # Linie reicht -> faengt schnelle Fahrzeuge ab. if not crossed and COUNT_BAND_PX > 0: dist = point_to_segment_dist( center_x, center_y, line_start[0], line_start[1], line_end[0], line_end[1], ) if dist <= COUNT_BAND_PX: crossed = True if crossed and not _recently_counted(recent_counts, center_x, center_y): counted_ids.add(track_id) recent_counts.append((center_x, center_y, frame_idx)) state["count"] += 1 types[label_name] += 1 publish_crossing(label_name, track_id, source) cv2.circle(frame, (center_x, center_y), 25, (0, 255, 0), 5) dbg_counted += 1 track_positions[track_id] = (center_x, center_y, frame_idx) box_color = (0, 255, 0) if label_name in VEHICLE_CLASSES else (255, 0, 0) cv2.rectangle(frame, (x1, y1), (x2, y2), box_color, 2) label = f"{track_id} - {label_name}" if track_id in counted_ids: label += " \u2713" cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 255), 1) cv2.circle(frame, (center_x, center_y), 3, (0, 255, 255), -1) if COUNT_DEBUG and dbg_vehicles: print( f"[count-debug] frame#{frame_idx} fahrzeuge={dbg_vehicles} " f"abstand_zur_linie={sorted(dbg_dists)} band={COUNT_BAND_PX} " f"jetzt_gezaehlt={dbg_counted} gesamt={state['count']}", flush=True, ) return draw_overlay(frame, line_start, line_end, state) # --------------------------------------------------------------------------- # Webcam-Grabber: EINE Verbindung zur (ESP32-)Cam, Fan-out an viele Viewer # --------------------------------------------------------------------------- class WebcamGrabber: """ Haelt genau eine Verbindung zur Netzwerk-Kamera. Ein Hintergrund-Thread liest Frames, laeuft YOLO drauf und legt das jeweils neueste annotierte JPEG in einen gemeinsamen Slot. Jeder /webcam_feed-Request konsumiert nur diesen Slot -> beliebig viele Zuschauer, aber nur EINE Cam-Verbindung. Liest den MJPEG-Stream MANUELL ueber requests (SOI/EOI-Parsing) statt ueber cv2.VideoCapture -> umgeht den FFmpeg-Demuxer, der bei ESP32-CAM scheitert. Motion-Gate: YOLO laeuft nur, wenn sich im Bild genug aendert. Steht die Szene still (z.B. nur parkende Autos), wird kein Inferenz-Call gemacht und die GPU bleibt idle. """ MAX_BUFFER = 4 * 1024 * 1024 # Schutz gegen unbegrenztes Puffer-Wachstum def __init__(self, url: str): self.url = url self.lock = Lock() self.frame_cond = Condition() self.latest_jpeg: bytes | None = None self.frame_seq = 0 self.last_frame_ts = 0.0 # time.time() des letzten gelieferten Frames self.viewers = 0 self._last_active_ts = 0.0 # letzter Zeitpunkt mit Zuschauer (Karenzzeit) self.paused = False # manuell freigegeben -> ESP32-Slot freigeben self.reset_flag = Event() self.line = dict(SAVED_LINE) # Eigenes Modell -> isolierter Tracker, getrennt vom Video-Pfad. # Lazy: wird erst beim ersten aktiven Stream geladen. self.model = None self.names = None # Motion-Gate-Referenzbild (Graustufen, herunterskaliert). self._prev_gray = None self.thread = None # -- Steuerung ----------------------------------------------------------- def start(self): if self.thread is None: self.thread = threading.Thread(target=self._run, daemon=True) self.thread.start() def set_line(self, line: dict): with self.lock: self.line = dict(line) def reset(self): self.reset_flag.set() def add_viewer(self): with self.lock: self.viewers += 1 def remove_viewer(self): with self.lock: self.viewers = max(0, self.viewers - 1) def _viewer_count(self) -> int: with self.lock: return self.viewers def _wants_stream(self) -> bool: """Soll der Grabber gerade die Kamera bedienen? Mit Karenzzeit, damit kurzes Zuschauer-Aus (z.B. -Reload) die Verbindung nicht kappt.""" if self.paused: return False # manuell freigegeben -> ESP32-Slot freigeben if GRABBER_ALWAYS_ON: return True if self._viewer_count() > 0: self._last_active_ts = time.time() return True return (time.time() - self._last_active_ts) < VIEWER_GRACE_SEC def set_paused(self, paused: bool) -> bool: """Grabber manuell pausieren (Kamera freigeben) bzw. wieder aufnehmen.""" self.paused = bool(paused) print(f"[webcam-grabber] {'pausiert (Slot frei)' if self.paused else 'aktiv'}", flush=True) return self.paused def _current_line(self): with self.lock: line = dict(self.line) return (line["x1"], line["y1"]), (line["x2"], line["y2"]) @staticmethod def _extract_latest(buf: bytes): """ Zieht den ZULETZT vollstaendigen JPEG aus dem Puffer und verwirft aeltere -> haelt die Latenz niedrig (Backlog wird uebersprungen). Gibt (jpeg_or_None, rest_buffer) zurueck. """ latest = None while True: start = buf.find(b"\xff\xd8") # SOI if start == -1: buf = b"" break end = buf.find(b"\xff\xd9", start + 2) # EOI if end == -1: buf = buf[start:] # unvollstaendig -> Tail behalten break latest = buf[start:end + 2] buf = buf[end + 2:] return latest, buf def _ensure_model(self): if self.model is None: self.model = YOLO(WEBCAM_MODEL) self.names = self.model.names def _publish(self, jpeg: bytes): with self.frame_cond: self.latest_jpeg = jpeg self.frame_seq += 1 self.last_frame_ts = time.time() self.frame_cond.notify_all() def _clear(self): with self.frame_cond: self.latest_jpeg = None self.frame_seq += 1 self.frame_cond.notify_all() def is_online(self) -> bool: """True, wenn zuletzt vor < 5s ein Frame kam (Kamera liefert).""" return self.latest_jpeg is not None and (time.time() - self.last_frame_ts) < 5.0 def _reset_tracker(self): """Persistenten YOLO-Tracker leeren -> frische Track-IDs nach Reconnect. Ohne Reset traegt der Tracker nach langer Pause (z.B. Kamera ueber Nacht aus) alte/verlorene Tracks mit; neue Fahrzeuge bekommen dann verzoegert oder keine ID -> werden nicht gezaehlt.""" try: predictor = getattr(self.model, "predictor", None) trackers = getattr(predictor, "trackers", None) if predictor else None for tr in trackers or []: tr.reset() except Exception as exc: print(f"[webcam-grabber] tracker reset skipped: {exc}", flush=True) # -- Hintergrund-Thread (laeuft die ganze Prozess-Lebensdauer) ---------- def _run(self): state = new_state() while True: # Ohne Always-on: nur verbinden, wenn jemand zuschaut (mit Karenzzeit). if not self._wants_stream(): if self.latest_jpeg is not None: self._clear() time.sleep(0.3) continue resp = None try: self._ensure_model() resp = requests.get(self.url, stream=True, timeout=(5, 10)) resp.raise_for_status() # Frische Verbindung -> Track-IDs neu, Gesamtzaehler bleibt. state["track_positions"].clear() state["counted_ids"].clear() state["recent_counts"].clear() state["frame_idx"] = 0 self._reset_tracker() # stale Tracker-Zustand nach Pause verwerfen self._prev_gray = None print("[webcam-grabber] verbunden -> Tracker/Zaehlzustand frisch", flush=True) buf = b"" for chunk in resp.iter_content(chunk_size=8192): if not self._wants_stream(): break # Zuschauer (inkl. Karenz) weg -> Verbindung freigeben if not chunk: continue buf += chunk if len(buf) > self.MAX_BUFFER: buf = buf[-self.MAX_BUFFER:] jpeg, buf = self._extract_latest(buf) if jpeg is None: continue img = cv2.imdecode(np.frombuffer(jpeg, np.uint8), cv2.IMREAD_COLOR) if img is None: continue if self.reset_flag.is_set(): reset_state(state) self.reset_flag.clear() # --- Motion-Gate: YOLO nur bei Bewegung (spart GPU) --- gray = cv2.resize(cv2.cvtColor(img, cv2.COLOR_BGR2GRAY), (320, 180)) moving = True if self._prev_gray is not None: diff = cv2.absdiff(gray, self._prev_gray) _, mask = cv2.threshold(diff, 25, 255, cv2.THRESH_BINARY) moving = cv2.countNonZero(mask) > MOTION_PIXELS self._prev_gray = gray line_start, line_end = self._current_line() if moving: frame = process_frame(img, self.model, self.names, line_start, line_end, state, source="webcam") else: frame = render_static(img, line_start, line_end, state) ok, out = cv2.imencode(".jpg", frame) if ok: self._publish(out.tobytes()) except Exception as exc: # Timeout / Verbindungsabbruch / HTTP-/Modell-Fehler -> sichtbar + Backoff print(f"[webcam-grabber] {type(exc).__name__}: {exc}", flush=True) time.sleep(1.0) finally: if resp is not None: resp.close() # -- Pro-Viewer-Generator ------------------------------------------------ def frames(self): self.add_viewer() last_seq = -1 got_any = False try: while True: with self.frame_cond: ok = self.frame_cond.wait_for( lambda: self.latest_jpeg is not None and self.frame_seq != last_seq, timeout=20, ) jpeg = self.latest_jpeg seq = self.frame_seq if not ok: if got_any: break # hatten Frames, jetzt 20s nichts -> Cam weg continue # noch nie ein Frame (Modell laedt / Connect) -> weiter warten if jpeg is None: continue got_any = True last_seq = seq yield ( b"--frame\r\n" b"Content-Type: image/jpeg\r\n\r\n" + jpeg + b"\r\n" ) finally: self.remove_viewer() webcam = WebcamGrabber(CAMERA_URL) webcam.start() # --------------------------------------------------------------------------- # Reset-Events fuer den Video-Pfad (per Stream-ID) # --------------------------------------------------------------------------- reset_events: dict[str, Event] = {} reset_lock = Lock() def get_reset_event(stream_id: str) -> Event: with reset_lock: event = reset_events.get(stream_id) if event is None: event = Event() reset_events[stream_id] = event return event def release_reset_event(stream_id: str) -> None: with reset_lock: reset_events.pop(stream_id, None) def get_webcam_stream_id() -> str: stream_id = session.get("webcam_stream_id") if not stream_id: stream_id = f"webcam-{uuid.uuid4().hex}" session["webcam_stream_id"] = stream_id return stream_id def get_video_stream_id(filename: str) -> str: video_streams = session.get("video_stream_ids", {}) stream_id = video_streams.get(filename) if not stream_id: stream_id = f"video-{uuid.uuid4().hex}" video_streams[filename] = stream_id session["video_stream_ids"] = video_streams return stream_id # --------------------------------------------------------------------------- # Video-Pfad (per Request, unveraendert in der Logik) # --------------------------------------------------------------------------- def generate_frames(capture, line_data, stream_id: str): state = new_state() line_start = (line_data["x1"], line_data["y1"]) line_end = (line_data["x2"], line_data["y2"]) frame_idx = 0 reset_event = get_reset_event(stream_id) try: while True: ret, frame = capture.read() if not ret: break frame_idx += 1 if frame_idx % 2 != 0: continue if reset_event.is_set(): reset_state(state) reset_event.clear() frame = process_frame(frame, model, names, line_start, line_end, state, source="video") ok, buffer = cv2.imencode(".jpg", frame) if not ok: continue yield ( b"--frame\r\n" b"Content-Type: image/jpeg\r\n\r\n" + buffer.tobytes() + b"\r\n" ) finally: capture.release() release_reset_event(stream_id) def detect_objects_from_video(video_path, line_data, stream_id): cap = cv2.VideoCapture(video_path) if not cap.isOpened(): cap.release() raise RuntimeError("Video konnte nicht geoeffnet werden") return generate_frames(cap, line_data, stream_id) # --------------------------------------------------------------------------- # Routen # --------------------------------------------------------------------------- @app.route("/") def index(): return render_template("index.html") @app.route("/start_webcam") def start_webcam(): get_line_from_session() stream_id = get_webcam_stream_id() return render_template("webcam.html", stream_id=stream_id) @app.route("/webcam_feed") def webcam_feed(): # Keine eigene Cam-Verbindung mehr pro Request -> Fan-out vom Grabber. return Response( webcam.frames(), mimetype="multipart/x-mixed-replace; boundary=frame", ) @app.route("/api/set_line", methods=["POST"]) def set_counting_line(): """Setzt die Zaehllinie (gilt fuer Video-Session UND Webcam-Grabber).""" data = request.get_json(silent=True) or {} line = _valid_line(data) if line is None: abort(400, description="Ungueltige Linienkoordinaten") global SAVED_LINE SAVED_LINE = line # Default fuer kuenftige Sessions save_line(line) # persistent -> ueberlebt Neustart session["counting_line"] = line webcam.set_line(line) # Webcam nutzt eine globale Linie (eine Kamera) return jsonify({"status": "success", "line": line}) @app.route("/api/get_line", methods=["GET"]) def get_counting_line(): return jsonify(get_line_from_session()) @app.route("/api/webcam_status", methods=["GET"]) def webcam_status(): """Liefert, ob der Webcam-Grabber gerade Frames bekommt (Kamera online).""" return jsonify({"online": webcam.is_online(), "paused": webcam.paused}) @app.route("/api/grabber_toggle", methods=["POST"]) def grabber_toggle(): """Grabber pausieren/aufnehmen: pausiert gibt den ESP32-Slot frei (Zaehlung aus), aktiv verbindet wieder zur Kamera (Zaehlung an).""" paused = webcam.set_paused(not webcam.paused) return jsonify({"paused": paused}) # Erlaubte framesize-Werte der ESP32-CAM (OV2640): Wert -> Label. FRAMESIZES = {10: "VGA 640x480", 11: "SVGA 800x600"} @app.route("/api/cam_framesize", methods=["POST"]) def cam_framesize(): """Setzt die Aufloesung der ESP32-CAM ueber deren /control-Endpoint. Der Server proxyt den Aufruf (Browser braucht keinen Direktzugriff).""" data = request.get_json(silent=True) or {} try: val = int(data.get("val")) except (TypeError, ValueError): abort(400, description="val (int) erforderlich") if val not in FRAMESIZES: abort(400, description="nur 10 (VGA) oder 11 (SVGA)") host = urlparse(CAMERA_URL).hostname if not host: abort(500, description="CAMERA_URL ohne gueltigen Host") # Steuer-Endpoint liegt auf Port 80 (Stream laeuft separat auf :81). ctrl_url = f"http://{host}/control?var=framesize&val={val}" try: r = requests.get(ctrl_url, timeout=5) r.raise_for_status() except requests.RequestException as exc: print(f"[cam] framesize set failed: {exc}", flush=True) return jsonify({"ok": False, "error": str(exc)}), 502 print(f"[cam] framesize -> {FRAMESIZES[val]}", flush=True) return jsonify({"ok": True, "val": val, "label": FRAMESIZES[val]}) @app.route("/api/reset_count", methods=["POST"]) def reset_count(): data = request.get_json(silent=True) or {} stream_id = str(data.get("stream_id") or "") if not stream_id: abort(400, description="stream_id ist erforderlich") valid_ids = {session.get("webcam_stream_id")} valid_ids.update(session.get("video_stream_ids", {}).values()) valid_ids.discard(None) if stream_id not in valid_ids: abort(403, description="Stream gehoert nicht zur aktuellen Sitzung") if stream_id == session.get("webcam_stream_id"): webcam.reset() else: get_reset_event(stream_id).set() return jsonify({"status": "success", "message": "Zaehler wird zurueckgesetzt"}) @app.route("/upload", methods=["POST"]) def upload_video(): if "file" not in request.files: abort(400, description="Keine Datei erhalten") file = request.files["file"] if not file or not file.filename: abort(400, description="Keine Datei ausgewaehlt") filename = secure_filename(file.filename or "") if not filename: abort(400, description="Ungueltiger Dateiname") if not allowed_file(filename): abort(400, description="Ungueltiger Dateityp") ensure_upload_dir() name, ext = os.path.splitext(filename) stored_filename = f"{name}_{uuid.uuid4().hex}{ext.lower()}" file_path = os.path.join(UPLOAD_DIR, stored_filename) file.save(file_path) return redirect(url_for("play_video", filename=stored_filename)) @app.route("/uploads/") def play_video(filename): safe_filename = os.path.basename(filename) if safe_filename != filename: abort(400, description="Ungueltiger Dateiname") file_path = os.path.join(UPLOAD_DIR, safe_filename) if not os.path.isfile(file_path): abort(404) get_line_from_session() stream_id = get_video_stream_id(safe_filename) return render_template("play_video.html", filename=safe_filename, stream_id=stream_id) @app.route("/video/") def send_video(filename): return send_from_directory(UPLOAD_DIR, filename) @app.route("/video_feed/") def video_feed(filename): safe_filename = os.path.basename(filename) if safe_filename != filename: abort(400, description="Ungueltiger Dateiname") video_path = os.path.join(UPLOAD_DIR, safe_filename) if not os.path.isfile(video_path): abort(404) line_data = get_line_from_session() stream_id = get_video_stream_id(safe_filename) try: generator = detect_objects_from_video(video_path, line_data, stream_id) except RuntimeError as exc: abort(503, description=str(exc)) return Response(generator, mimetype="multipart/x-mixed-replace; boundary=frame") if __name__ == "__main__": app.run("0.0.0.0", debug=False, port=8080)