import os import json import uuid import time import threading from datetime import datetime from zoneinfo import ZoneInfo from threading import Event, Lock, Condition import cv2 import numpy as np import requests import paho.mqtt.client as mqtt from flask import ( Flask, render_template, Response, request, redirect, url_for, send_from_directory, session, jsonify, abort, ) from werkzeug.utils import secure_filename from ultralytics import YOLO # .env laden, BEVOR os.environ ausgelesen wird (Broker-/Kamera-/MQTT-Konfig). # load_dotenv versteht auch "export VAR=..."-Zeilen und Inline-Kommentare. try: from dotenv import load_dotenv load_dotenv() except ImportError: print("[config] python-dotenv nicht installiert - .env wird ignoriert", flush=True) # --------------------------------------------------------------------------- # Konfiguration # --------------------------------------------------------------------------- # Kamera-URL ueber Env setzen (.env). ESP32-CAM: Port 81, Pfad /stream. CAMERA_URL = os.environ.get("CAMERA_URL", "http://CAMERA-IP:81/stream") # --- Inferenz-Tuning (alles per Env ueberschreibbar) ----------------------- # Webcam nutzt das leichte Nano-Modell; Video-Upload bleibt auf yolo11s. WEBCAM_MODEL = os.environ.get("WEBCAM_MODEL", "yolo11n.pt") # Kleinere imgsz = weniger Rechenarbeit pro Frame (Default YOLO waere 640). YOLO_IMGSZ = int(os.environ.get("YOLO_IMGSZ", "480")) # Motion-Gate: ab so vielen veraenderten Pixeln (320x180-Graubild) gilt # die Szene als "in Bewegung" und YOLO laeuft. Sonst kein Inferenz-Call. MOTION_PIXELS = int(os.environ.get("MOTION_PIXELS", "500")) # Always-on: Grabber laeuft unabhaengig von Zuschauern durch (24/7-Counter). # 0 = nur zaehlen/streamen, wenn ein Browser zuschaut (gibt ESP32-Slot frei). # 1 = dauerhaft verbinden + zaehlen, egal ob jemand zuschaut. GRABBER_ALWAYS_ON = os.environ.get("GRABBER_ALWAYS_ON", "0") == "1" # FP16 nur auf CUDA sinnvoll -> automatisch erkennen, per Env erzwingbar. try: import torch _CUDA = torch.cuda.is_available() except Exception: _CUDA = False YOLO_HALF = os.environ.get("YOLO_HALF", "1" if _CUDA else "0") == "1" # COCO-Klassen: 2=car, 3=motorcycle, 5=bus, 7=truck. Filtert Person/Couch etc. VEHICLE_CLASS_IDS = [2, 3, 5, 7] # --- Zaehl-Robustheit fuer schnelle Fahrzeuge ------------------------------ # Band um die Zaehllinie (in Pixeln, bezogen auf das 1020x600-Bild): # Ein Fahrzeug wird schon gezaehlt, wenn sein Mittelpunkt naeher als # COUNT_BAND_PX an der Linie liegt - es muss NICHT mehr ein Punkt davor UND # einer dahinter erfasst werden. Faengt schnelle Fahrzeuge ab, die zwischen # zwei Frames weit springen. 0 = aus (nur klassischer Segment-Schnitt). COUNT_BAND_PX = int(os.environ.get("COUNT_BAND_PX", "24")) # Entprellung gegen Doppelzaehlung bei Track-ID-Wechseln: ein neuer Zaehl- # punkt wird verworfen, wenn er naeher als COUNT_DEDUP_PX an einem in den # letzten COUNT_DEDUP_FRAMES Frames gezaehlten Punkt liegt. COUNT_DEDUP_PX = int(os.environ.get("COUNT_DEDUP_PX", "60")) COUNT_DEDUP_FRAMES = int(os.environ.get("COUNT_DEDUP_FRAMES", "12")) # --- MQTT: ein Event pro Linienueberquerung (fuer n8n -> NocoDB) ----------- # Komplett abschaltbar: MQTT_ENABLED=false -> App laeuft ohne Broker/Events. MQTT_ENABLED = os.environ.get("MQTT_ENABLED", "true").strip().lower() in ( "1", "true", "yes", "on", ) MQTT_HOST = os.environ.get("MQTT_HOST", "127.0.0.1") MQTT_PORT = int(os.environ.get("MQTT_PORT", "1883")) MQTT_USER = os.environ.get("MQTT_USER") MQTT_PASS = os.environ.get("MQTT_PASS") MQTT_TOPIC = os.environ.get("MQTT_TOPIC", "vehiclecounter/cam1") CAMERA_ID = os.environ.get("CAMERA_ID", "cam1") # Availability-/Status-Topic: "online" beim Verbinden (Birth-Message), # "offline" automatisch via Last Will (LWT), falls die Verbindung abreisst. STATUS_TOPIC = f"{MQTT_TOPIC}/status" # Zeitzone fuer den Zeitstempel (DST-aware). Standard Europe/Berlin. LOCAL_TZ = ZoneInfo(os.environ.get("TZ_NAME", "Europe/Berlin")) def _on_mqtt_connect(client, userdata, flags, *args): """Birth-Message: nach jedem (Re-)Connect 'online' (retained) senden.""" client.publish(STATUS_TOPIC, "online", qos=1, retain=True) print(f"[mqtt] connected -> {STATUS_TOPIC} online", flush=True) _mqtt = None if not MQTT_ENABLED: print("[mqtt] deaktiviert (MQTT_ENABLED=false) - keine Events", flush=True) else: # paho-mqtt 2.x verlangt die CallbackAPIVersion, 1.x kennt sie nicht. try: _mqtt = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) except AttributeError: _mqtt = mqtt.Client() if MQTT_USER: _mqtt.username_pw_set(MQTT_USER, MQTT_PASS) _mqtt.on_connect = _on_mqtt_connect # Last Will: Broker publiziert das, sobald die Verbindung unsauber abbricht. _mqtt.will_set(STATUS_TOPIC, "offline", qos=1, retain=True) try: # async + loop_start -> blockiert den App-Start nicht, wenn der Broker weg ist _mqtt.connect_async(MQTT_HOST, MQTT_PORT, keepalive=60) _mqtt.loop_start() except Exception as exc: print(f"[mqtt] init failed: {exc}", flush=True) def publish_crossing(vehicle_type, track_id, source): """Crossing-Event auf {MQTT_TOPIC}/crossing (QoS 1, nicht retained).""" if _mqtt is None: return payload = { "event": "crossing", "camera": CAMERA_ID, "source": source, "type": vehicle_type, "track_id": int(track_id), "ts": datetime.now(LOCAL_TZ).isoformat(), } try: _mqtt.publish(f"{MQTT_TOPIC}/crossing", json.dumps(payload), qos=1, retain=False) except Exception as exc: print(f"[mqtt] publish failed: {exc}", flush=True) app = Flask(__name__) app.secret_key = os.environ.get("SECRET_KEY", "vehicle_dev_secret") app.config["MAX_CONTENT_LENGTH"] = 200 * 1024 * 1024 # 200MB Upload-Limit # Globales Modell fuer den Video-Upload-Pfad (per Request). model = YOLO("yolo11s.pt") names = model.model.names VEHICLE_CLASSES = {"car", "truck", "bus", "motorcycle"} ALLOWED_EXTENSIONS = {"mp4", "mov", "avi", "mkv"} UPLOAD_DIR = "uploads" DEFAULT_LINE = {"x1": 0, "y1": 300, "x2": 1020, "y2": 300} FRAME_SIZE = (1020, 600) # Persistente Zaehllinie: wird als JSON gespeichert und beim Start geladen, # damit sie einen Neustart ueberlebt (Pfad per Env ueberschreibbar). LINE_FILE = os.environ.get("LINE_FILE", "counting_line.json") def _valid_line(d): """Validiert/normalisiert ein Linien-Dict zu int-Koordinaten oder None.""" try: return {k: int(d[k]) for k in ("x1", "y1", "x2", "y2")} except (KeyError, ValueError, TypeError): return None def load_saved_line() -> dict: """Laedt die gespeicherte Linie oder faellt auf DEFAULT_LINE zurueck.""" try: with open(LINE_FILE) as fh: line = _valid_line(json.load(fh)) if line: return line except (OSError, json.JSONDecodeError): pass return dict(DEFAULT_LINE) def save_line(line: dict) -> None: """Speichert die Linie atomar als JSON (ueberlebt App-Neustart).""" try: tmp = f"{LINE_FILE}.tmp" with open(tmp, "w") as fh: json.dump(line, fh) os.replace(tmp, LINE_FILE) except OSError as exc: print(f"[line] save failed: {exc}", flush=True) # Beim Start einmal laden -> Default fuer Session UND Webcam-Grabber. SAVED_LINE = load_saved_line() # --------------------------------------------------------------------------- # Hilfsfunktionen # --------------------------------------------------------------------------- def allowed_file(filename: str) -> bool: return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS def ensure_upload_dir() -> None: if not os.path.exists(UPLOAD_DIR): os.makedirs(UPLOAD_DIR) def get_line_from_session(): if "counting_line" not in session: session["counting_line"] = dict(SAVED_LINE) return session["counting_line"] def fresh_vehicle_counts() -> dict[str, int]: return {vehicle: 0 for vehicle in VEHICLE_CLASSES} def new_state() -> dict: """Frischer Zaehl-/Tracking-Zustand fuer einen Stream.""" return { "track_positions": {}, "counted_ids": set(), "count": 0, "types": fresh_vehicle_counts(), "frame_idx": 0, "recent_counts": [], # [(cx, cy, frame_idx)] fuer die Entprellung } def reset_state(state: dict) -> None: state["track_positions"].clear() state["counted_ids"].clear() state["count"] = 0 state["types"] = fresh_vehicle_counts() state["frame_idx"] = 0 state["recent_counts"].clear() def line_intersect(p1, p2, p3, p4) -> bool: """True, wenn sich die Strecken p1-p2 und p3-p4 schneiden.""" x1, y1 = p1 x2, y2 = p2 x3, y3 = p3 x4, y4 = p4 denom = (x1 - x2) * (y3 - y4) - (y1 - y2) * (x3 - x4) if abs(denom) < 1e-10: return False t = ((x1 - x3) * (y3 - y4) - (y1 - y3) * (x3 - x4)) / denom u = -((x1 - x2) * (y1 - y3) - (y1 - y2) * (x1 - x3)) / denom return 0 <= t <= 1 and 0 <= u <= 1 def crossed_line(prev_pos, curr_pos, line_start, line_end) -> bool: return line_intersect(prev_pos, curr_pos, line_start, line_end) def point_to_segment_dist(px, py, ax, ay, bx, by) -> float: """Kuerzester Abstand des Punkts (px,py) zur Strecke (ax,ay)-(bx,by).""" dx, dy = bx - ax, by - ay if dx == 0 and dy == 0: return ((px - ax) ** 2 + (py - ay) ** 2) ** 0.5 t = ((px - ax) * dx + (py - ay) * dy) / (dx * dx + dy * dy) t = max(0.0, min(1.0, t)) cx, cy = ax + t * dx, ay + t * dy return ((px - cx) ** 2 + (py - cy) ** 2) ** 0.5 def _recently_counted(recent_counts, cx, cy) -> bool: """True, wenn nahe (cx,cy) kuerzlich schon gezaehlt wurde (ID-Wechsel).""" for rx, ry, _ in recent_counts: if (cx - rx) ** 2 + (cy - ry) ** 2 <= COUNT_DEDUP_PX ** 2: return True return False def draw_overlay(frame, line_start, line_end, state): """Zeichnet Zaehllinie (gestrichelt) + Zaehler-Box. Kein YOLO.""" types = state["types"] cv2.line(frame, line_start, line_end, (0, 255, 255), 3, cv2.LINE_AA) line_length = int(np.sqrt((line_end[0] - line_start[0]) ** 2 + (line_end[1] - line_start[1]) ** 2)) dash_length = 20 for i in range(0, max(line_length, 1), dash_length * 2): t1 = i / line_length if line_length else 0 t2 = min((i + dash_length) / line_length, 1.0) if line_length else 0 x1_dash = int(line_start[0] + t1 * (line_end[0] - line_start[0])) y1_dash = int(line_start[1] + t1 * (line_end[1] - line_start[1])) x2_dash = int(line_start[0] + t2 * (line_end[0] - line_start[0])) y2_dash = int(line_start[1] + t2 * (line_end[1] - line_start[1])) cv2.line(frame, (x1_dash, y1_dash), (x2_dash, y2_dash), (0, 0, 0), 3) cv2.rectangle(frame, (10, 10), (350, 140), (0, 0, 0), -1) cv2.putText(frame, f"Gesamt: {state['count']}", (20, 35), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) cv2.putText(frame, f"Autos: {types.get('car', 0)}", (20, 65), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (200, 200, 200), 1) cv2.putText(frame, f"LKW: {types.get('truck', 0)}", (20, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (200, 200, 200), 1) cv2.putText(frame, f"Busse: {types.get('bus', 0)}", (20, 115), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (200, 200, 200), 1) cv2.putText(frame, f"Motorraeder: {types.get('motorcycle', 0)}", (20, 135), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (200, 200, 200), 1) return frame def render_static(img, line_start, line_end, state): """Nur Overlay auf den Rohframe -> wird genutzt, wenn nichts in Bewegung ist (Motion-Gate). Laeuft kein YOLO -> GPU bleibt idle.""" frame = cv2.resize(img, FRAME_SIZE) return draw_overlay(frame, line_start, line_end, state) def process_frame(frame, det_model, det_names, line_start, line_end, state, source="video"): """ Skaliert den Frame, fuehrt YOLO-Tracking aus, zaehlt Linienueberquerungen und zeichnet alle Overlays. Mutiert `state` in-place und gibt den annotierten Frame zurueck. Wird von Webcam-Grabber UND Video-Pfad genutzt. """ frame = cv2.resize(frame, FRAME_SIZE) results = det_model.track( frame, persist=True, imgsz=YOLO_IMGSZ, half=YOLO_HALF, classes=VEHICLE_CLASS_IDS, verbose=False, ) track_positions = state["track_positions"] counted_ids = state["counted_ids"] types = state["types"] recent_counts = state["recent_counts"] state["frame_idx"] += 1 frame_idx = state["frame_idx"] # Entprell-Historie auf das Zeitfenster eindampfen. recent_counts[:] = [ c for c in recent_counts if frame_idx - c[2] <= COUNT_DEDUP_FRAMES ] if results and results[0].boxes is not None and results[0].boxes.id is not None: boxes = results[0].boxes.xyxy.int().cpu().tolist() class_ids = results[0].boxes.cls.int().cpu().tolist() track_ids = results[0].boxes.id.int().cpu().tolist() for box, class_id, track_id in zip(boxes, class_ids, track_ids): label_name = det_names[class_id] x1, y1, x2, y2 = box center_x = (x1 + x2) // 2 center_y = (y1 + y2) // 2 if label_name in VEHICLE_CLASSES: if track_id not in counted_ids: crossed = False # (a) Klassischer Segment-Schnitt: Punkt davor UND dahinter. if track_id in track_positions: prev_x, prev_y = track_positions[track_id] cv2.line(frame, (prev_x, prev_y), (center_x, center_y), (255, 100, 0), 2) if crossed_line((prev_x, prev_y), (center_x, center_y), line_start, line_end): crossed = True # (b) Band um die Linie: ein einzelnes Sample nah an der # Linie reicht -> faengt schnelle Fahrzeuge ab. if not crossed and COUNT_BAND_PX > 0: dist = point_to_segment_dist( center_x, center_y, line_start[0], line_start[1], line_end[0], line_end[1], ) if dist <= COUNT_BAND_PX: crossed = True if crossed and not _recently_counted(recent_counts, center_x, center_y): counted_ids.add(track_id) recent_counts.append((center_x, center_y, frame_idx)) state["count"] += 1 types[label_name] += 1 publish_crossing(label_name, track_id, source) cv2.circle(frame, (center_x, center_y), 25, (0, 255, 0), 5) track_positions[track_id] = (center_x, center_y) box_color = (0, 255, 0) if label_name in VEHICLE_CLASSES else (255, 0, 0) cv2.rectangle(frame, (x1, y1), (x2, y2), box_color, 2) label = f"{track_id} - {label_name}" if track_id in counted_ids: label += " \u2713" cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 255), 1) cv2.circle(frame, (center_x, center_y), 3, (0, 255, 255), -1) return draw_overlay(frame, line_start, line_end, state) # --------------------------------------------------------------------------- # Webcam-Grabber: EINE Verbindung zur (ESP32-)Cam, Fan-out an viele Viewer # --------------------------------------------------------------------------- class WebcamGrabber: """ Haelt genau eine Verbindung zur Netzwerk-Kamera. Ein Hintergrund-Thread liest Frames, laeuft YOLO drauf und legt das jeweils neueste annotierte JPEG in einen gemeinsamen Slot. Jeder /webcam_feed-Request konsumiert nur diesen Slot -> beliebig viele Zuschauer, aber nur EINE Cam-Verbindung. Liest den MJPEG-Stream MANUELL ueber requests (SOI/EOI-Parsing) statt ueber cv2.VideoCapture -> umgeht den FFmpeg-Demuxer, der bei ESP32-CAM scheitert. Motion-Gate: YOLO laeuft nur, wenn sich im Bild genug aendert. Steht die Szene still (z.B. nur parkende Autos), wird kein Inferenz-Call gemacht und die GPU bleibt idle. """ MAX_BUFFER = 4 * 1024 * 1024 # Schutz gegen unbegrenztes Puffer-Wachstum def __init__(self, url: str): self.url = url self.lock = Lock() self.frame_cond = Condition() self.latest_jpeg: bytes | None = None self.frame_seq = 0 self.viewers = 0 self.reset_flag = Event() self.line = dict(SAVED_LINE) # Eigenes Modell -> isolierter Tracker, getrennt vom Video-Pfad. # Lazy: wird erst beim ersten aktiven Stream geladen. self.model = None self.names = None # Motion-Gate-Referenzbild (Graustufen, herunterskaliert). self._prev_gray = None self.thread = None # -- Steuerung ----------------------------------------------------------- def start(self): if self.thread is None: self.thread = threading.Thread(target=self._run, daemon=True) self.thread.start() def set_line(self, line: dict): with self.lock: self.line = dict(line) def reset(self): self.reset_flag.set() def add_viewer(self): with self.lock: self.viewers += 1 def remove_viewer(self): with self.lock: self.viewers = max(0, self.viewers - 1) def _viewer_count(self) -> int: with self.lock: return self.viewers def _current_line(self): with self.lock: line = dict(self.line) return (line["x1"], line["y1"]), (line["x2"], line["y2"]) @staticmethod def _extract_latest(buf: bytes): """ Zieht den ZULETZT vollstaendigen JPEG aus dem Puffer und verwirft aeltere -> haelt die Latenz niedrig (Backlog wird uebersprungen). Gibt (jpeg_or_None, rest_buffer) zurueck. """ latest = None while True: start = buf.find(b"\xff\xd8") # SOI if start == -1: buf = b"" break end = buf.find(b"\xff\xd9", start + 2) # EOI if end == -1: buf = buf[start:] # unvollstaendig -> Tail behalten break latest = buf[start:end + 2] buf = buf[end + 2:] return latest, buf def _ensure_model(self): if self.model is None: self.model = YOLO(WEBCAM_MODEL) self.names = self.model.model.names def _publish(self, jpeg: bytes): with self.frame_cond: self.latest_jpeg = jpeg self.frame_seq += 1 self.frame_cond.notify_all() def _clear(self): with self.frame_cond: self.latest_jpeg = None self.frame_seq += 1 self.frame_cond.notify_all() # -- Hintergrund-Thread (laeuft die ganze Prozess-Lebensdauer) ---------- def _run(self): state = new_state() while True: # Ohne Always-on: nur verbinden, wenn jemand zuschaut (gibt ESP32-Slot frei). if not GRABBER_ALWAYS_ON and self._viewer_count() == 0: if self.latest_jpeg is not None: self._clear() time.sleep(0.3) continue resp = None try: self._ensure_model() resp = requests.get(self.url, stream=True, timeout=(5, 10)) resp.raise_for_status() # Frische Verbindung -> Track-IDs neu, Gesamtzaehler bleibt. state["track_positions"].clear() state["counted_ids"].clear() state["recent_counts"].clear() self._prev_gray = None buf = b"" for chunk in resp.iter_content(chunk_size=8192): if not GRABBER_ALWAYS_ON and self._viewer_count() == 0: break # letzter Viewer weg -> Verbindung freigeben if not chunk: continue buf += chunk if len(buf) > self.MAX_BUFFER: buf = buf[-self.MAX_BUFFER:] jpeg, buf = self._extract_latest(buf) if jpeg is None: continue img = cv2.imdecode(np.frombuffer(jpeg, np.uint8), cv2.IMREAD_COLOR) if img is None: continue if self.reset_flag.is_set(): reset_state(state) self.reset_flag.clear() # --- Motion-Gate: YOLO nur bei Bewegung (spart GPU) --- gray = cv2.resize(cv2.cvtColor(img, cv2.COLOR_BGR2GRAY), (320, 180)) moving = True if self._prev_gray is not None: diff = cv2.absdiff(gray, self._prev_gray) _, mask = cv2.threshold(diff, 25, 255, cv2.THRESH_BINARY) moving = cv2.countNonZero(mask) > MOTION_PIXELS self._prev_gray = gray line_start, line_end = self._current_line() if moving: frame = process_frame(img, self.model, self.names, line_start, line_end, state, source=CAMERA_ID) else: frame = render_static(img, line_start, line_end, state) ok, out = cv2.imencode(".jpg", frame) if ok: self._publish(out.tobytes()) except Exception as exc: # Timeout / Verbindungsabbruch / HTTP-/Modell-Fehler -> sichtbar + Backoff print(f"[webcam-grabber] {type(exc).__name__}: {exc}", flush=True) time.sleep(1.0) finally: if resp is not None: resp.close() # -- Pro-Viewer-Generator ------------------------------------------------ def frames(self): self.add_viewer() last_seq = -1 got_any = False try: while True: with self.frame_cond: ok = self.frame_cond.wait_for( lambda: self.latest_jpeg is not None and self.frame_seq != last_seq, timeout=20, ) jpeg = self.latest_jpeg seq = self.frame_seq if not ok: if got_any: break # hatten Frames, jetzt 20s nichts -> Cam weg continue # noch nie ein Frame (Modell laedt / Connect) -> weiter warten if jpeg is None: continue got_any = True last_seq = seq yield ( b"--frame\r\n" b"Content-Type: image/jpeg\r\n\r\n" + jpeg + b"\r\n" ) finally: self.remove_viewer() webcam = WebcamGrabber(CAMERA_URL) webcam.start() # --------------------------------------------------------------------------- # Reset-Events fuer den Video-Pfad (per Stream-ID) # --------------------------------------------------------------------------- reset_events: dict[str, Event] = {} reset_lock = Lock() def get_reset_event(stream_id: str) -> Event: with reset_lock: event = reset_events.get(stream_id) if event is None: event = Event() reset_events[stream_id] = event return event def release_reset_event(stream_id: str) -> None: with reset_lock: reset_events.pop(stream_id, None) def get_webcam_stream_id() -> str: stream_id = session.get("webcam_stream_id") if not stream_id: stream_id = f"webcam-{uuid.uuid4().hex}" session["webcam_stream_id"] = stream_id return stream_id def get_video_stream_id(filename: str) -> str: video_streams = session.get("video_stream_ids", {}) stream_id = video_streams.get(filename) if not stream_id: stream_id = f"video-{uuid.uuid4().hex}" video_streams[filename] = stream_id session["video_stream_ids"] = video_streams return stream_id # --------------------------------------------------------------------------- # Video-Pfad (per Request, unveraendert in der Logik) # --------------------------------------------------------------------------- def generate_frames(capture, line_data, stream_id: str): state = new_state() line_start = (line_data["x1"], line_data["y1"]) line_end = (line_data["x2"], line_data["y2"]) frame_idx = 0 reset_event = get_reset_event(stream_id) try: while True: ret, frame = capture.read() if not ret: break frame_idx += 1 if frame_idx % 2 != 0: continue if reset_event.is_set(): reset_state(state) reset_event.clear() frame = process_frame(frame, model, names, line_start, line_end, state, source="video") ok, buffer = cv2.imencode(".jpg", frame) if not ok: continue yield ( b"--frame\r\n" b"Content-Type: image/jpeg\r\n\r\n" + buffer.tobytes() + b"\r\n" ) finally: capture.release() release_reset_event(stream_id) def detect_objects_from_video(video_path, line_data, stream_id): cap = cv2.VideoCapture(video_path) if not cap.isOpened(): cap.release() raise RuntimeError("Video konnte nicht geoeffnet werden") return generate_frames(cap, line_data, stream_id) # --------------------------------------------------------------------------- # Routen # --------------------------------------------------------------------------- @app.route("/") def index(): return render_template("index.html") @app.route("/start_webcam") def start_webcam(): get_line_from_session() stream_id = get_webcam_stream_id() return render_template("webcam.html", stream_id=stream_id) @app.route("/webcam_feed") def webcam_feed(): # Keine eigene Cam-Verbindung mehr pro Request -> Fan-out vom Grabber. return Response( webcam.frames(), mimetype="multipart/x-mixed-replace; boundary=frame", ) @app.route("/api/set_line", methods=["POST"]) def set_counting_line(): """Setzt die Zaehllinie (gilt fuer Video-Session UND Webcam-Grabber).""" data = request.get_json(silent=True) or {} line = _valid_line(data) if line is None: abort(400, description="Ungueltige Linienkoordinaten") global SAVED_LINE SAVED_LINE = line # Default fuer kuenftige Sessions save_line(line) # persistent -> ueberlebt Neustart session["counting_line"] = line webcam.set_line(line) # Webcam nutzt eine globale Linie (eine Kamera) return jsonify({"status": "success", "line": line}) @app.route("/api/get_line", methods=["GET"]) def get_counting_line(): return jsonify(get_line_from_session()) @app.route("/api/reset_count", methods=["POST"]) def reset_count(): data = request.get_json(silent=True) or {} stream_id = data.get("stream_id") if not stream_id: abort(400, description="stream_id ist erforderlich") valid_ids = {session.get("webcam_stream_id")} valid_ids.update(session.get("video_stream_ids", {}).values()) valid_ids.discard(None) if stream_id not in valid_ids: abort(403, description="Stream gehoert nicht zur aktuellen Sitzung") if stream_id == session.get("webcam_stream_id"): webcam.reset() else: get_reset_event(stream_id).set() return jsonify({"status": "success", "message": "Zaehler wird zurueckgesetzt"}) @app.route("/upload", methods=["POST"]) def upload_video(): if "file" not in request.files: abort(400, description="Keine Datei erhalten") file = request.files["file"] if not file or file.filename == "": abort(400, description="Keine Datei ausgewaehlt") filename = secure_filename(file.filename) if not filename: abort(400, description="Ungueltiger Dateiname") if not allowed_file(filename): abort(400, description="Ungueltiger Dateityp") ensure_upload_dir() name, ext = os.path.splitext(filename) stored_filename = f"{name}_{uuid.uuid4().hex}{ext.lower()}" file_path = os.path.join(UPLOAD_DIR, stored_filename) file.save(file_path) return redirect(url_for("play_video", filename=stored_filename)) @app.route("/uploads/") def play_video(filename): safe_filename = os.path.basename(filename) if safe_filename != filename: abort(400, description="Ungueltiger Dateiname") file_path = os.path.join(UPLOAD_DIR, safe_filename) if not os.path.isfile(file_path): abort(404) get_line_from_session() stream_id = get_video_stream_id(safe_filename) return render_template("play_video.html", filename=safe_filename, stream_id=stream_id) @app.route("/video/") def send_video(filename): return send_from_directory(UPLOAD_DIR, filename) @app.route("/video_feed/") def video_feed(filename): safe_filename = os.path.basename(filename) if safe_filename != filename: abort(400, description="Ungueltiger Dateiname") video_path = os.path.join(UPLOAD_DIR, safe_filename) if not os.path.isfile(video_path): abort(404) line_data = get_line_from_session() stream_id = get_video_stream_id(safe_filename) try: generator = detect_objects_from_video(video_path, line_data, stream_id) except RuntimeError as exc: abort(503, description=str(exc)) return Response(generator, mimetype="multipart/x-mixed-replace; boundary=frame") if __name__ == "__main__": app.run("0.0.0.0", debug=False, port=8080)