import os import uuid import time import threading from threading import Event, Lock, Condition import cv2 import numpy as np from flask import ( Flask, render_template, Response, request, redirect, url_for, send_from_directory, session, jsonify, abort, ) from werkzeug.utils import secure_filename from ultralytics import YOLO # --------------------------------------------------------------------------- # Konfiguration # --------------------------------------------------------------------------- # Kamera-URL ueber Env ueberschreibbar (ESP32-CAM Default: Port 81, /stream). CAMERA_URL = os.environ.get("CAMERA_URL", "http://CAMERA-IP:81/stream") # FFmpeg-Optionen fuer den Netzwerk-Stream: Socket-Timeout + Auto-Reconnect. # Muss VOR dem ersten VideoCapture gesetzt sein. os.environ.setdefault( "OPENCV_FFMPEG_CAPTURE_OPTIONS", "timeout;5000000|reconnect;1|reconnect_streamed;1|reconnect_delay_max;2", ) app = Flask(__name__) app.secret_key = os.environ.get("SECRET_KEY", "vehicle_dev_secret") app.config["MAX_CONTENT_LENGTH"] = 200 * 1024 * 1024 # 200MB Upload-Limit # Globales Modell fuer den Video-Upload-Pfad (per Request). model = YOLO("yolo11s.pt") names = model.model.names VEHICLE_CLASSES = {"car", "truck", "bus", "motorcycle"} ALLOWED_EXTENSIONS = {"mp4", "mov", "avi", "mkv"} UPLOAD_DIR = "uploads" DEFAULT_LINE = {"x1": 0, "y1": 300, "x2": 1020, "y2": 300} FRAME_SIZE = (1020, 600) # --------------------------------------------------------------------------- # Hilfsfunktionen # --------------------------------------------------------------------------- def allowed_file(filename: str) -> bool: return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS def ensure_upload_dir() -> None: if not os.path.exists(UPLOAD_DIR): os.makedirs(UPLOAD_DIR) def get_line_from_session(): if "counting_line" not in session: session["counting_line"] = dict(DEFAULT_LINE) return session["counting_line"] def fresh_vehicle_counts() -> dict[str, int]: return {vehicle: 0 for vehicle in VEHICLE_CLASSES} def new_state() -> dict: """Frischer Zaehl-/Tracking-Zustand fuer einen Stream.""" return { "track_positions": {}, "counted_ids": set(), "count": 0, "types": fresh_vehicle_counts(), } def reset_state(state: dict) -> None: state["track_positions"].clear() state["counted_ids"].clear() state["count"] = 0 state["types"] = fresh_vehicle_counts() def line_intersect(p1, p2, p3, p4) -> bool: """True, wenn sich die Strecken p1-p2 und p3-p4 schneiden.""" x1, y1 = p1 x2, y2 = p2 x3, y3 = p3 x4, y4 = p4 denom = (x1 - x2) * (y3 - y4) - (y1 - y2) * (x3 - x4) if abs(denom) < 1e-10: return False t = ((x1 - x3) * (y3 - y4) - (y1 - y3) * (x3 - x4)) / denom u = -((x1 - x2) * (y1 - y3) - (y1 - y2) * (x1 - x3)) / denom return 0 <= t <= 1 and 0 <= u <= 1 def crossed_line(prev_pos, curr_pos, line_start, line_end) -> bool: return line_intersect(prev_pos, curr_pos, line_start, line_end) def process_frame(frame, det_model, det_names, line_start, line_end, state): """ Skaliert den Frame, fuehrt YOLO-Tracking aus, zaehlt Linienueberquerungen und zeichnet alle Overlays. Mutiert `state` in-place und gibt den annotierten Frame zurueck. Wird von Webcam-Grabber UND Video-Pfad genutzt. """ frame = cv2.resize(frame, FRAME_SIZE) results = det_model.track(frame, persist=True) track_positions = state["track_positions"] counted_ids = state["counted_ids"] types = state["types"] if results and results[0].boxes is not None and results[0].boxes.id is not None: boxes = results[0].boxes.xyxy.int().cpu().tolist() class_ids = results[0].boxes.cls.int().cpu().tolist() track_ids = results[0].boxes.id.int().cpu().tolist() for box, class_id, track_id in zip(boxes, class_ids, track_ids): label_name = det_names[class_id] x1, y1, x2, y2 = box center_x = (x1 + x2) // 2 center_y = (y1 + y2) // 2 if label_name in VEHICLE_CLASSES: if track_id in track_positions and track_id not in counted_ids: prev_x, prev_y = track_positions[track_id] cv2.line(frame, (prev_x, prev_y), (center_x, center_y), (255, 100, 0), 2) if crossed_line((prev_x, prev_y), (center_x, center_y), line_start, line_end): counted_ids.add(track_id) state["count"] += 1 types[label_name] += 1 cv2.circle(frame, (center_x, center_y), 25, (0, 255, 0), 5) track_positions[track_id] = (center_x, center_y) box_color = (0, 255, 0) if label_name in VEHICLE_CLASSES else (255, 0, 0) cv2.rectangle(frame, (x1, y1), (x2, y2), box_color, 2) label = f"{track_id} - {label_name}" if track_id in counted_ids: label += " \u2713" cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 255), 1) cv2.circle(frame, (center_x, center_y), 3, (0, 255, 255), -1) # Zaehllinie (gestrichelt) cv2.line(frame, line_start, line_end, (0, 255, 255), 3, cv2.LINE_AA) line_length = int(np.sqrt((line_end[0] - line_start[0]) ** 2 + (line_end[1] - line_start[1]) ** 2)) dash_length = 20 for i in range(0, max(line_length, 1), dash_length * 2): t1 = i / line_length if line_length else 0 t2 = min((i + dash_length) / line_length, 1.0) if line_length else 0 x1_dash = int(line_start[0] + t1 * (line_end[0] - line_start[0])) y1_dash = int(line_start[1] + t1 * (line_end[1] - line_start[1])) x2_dash = int(line_start[0] + t2 * (line_end[0] - line_start[0])) y2_dash = int(line_start[1] + t2 * (line_end[1] - line_start[1])) cv2.line(frame, (x1_dash, y1_dash), (x2_dash, y2_dash), (0, 0, 0), 3) # Zaehler-Box cv2.rectangle(frame, (10, 10), (350, 140), (0, 0, 0), -1) cv2.putText(frame, f"Gesamt: {state['count']}", (20, 35), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) cv2.putText(frame, f"Autos: {types.get('car', 0)}", (20, 65), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (200, 200, 200), 1) cv2.putText(frame, f"LKW: {types.get('truck', 0)}", (20, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (200, 200, 200), 1) cv2.putText(frame, f"Busse: {types.get('bus', 0)}", (20, 115), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (200, 200, 200), 1) cv2.putText(frame, f"Motorraeder: {types.get('motorcycle', 0)}", (20, 135), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (200, 200, 200), 1) return frame # --------------------------------------------------------------------------- # Webcam-Grabber: EINE Verbindung zur (ESP32-)Cam, Fan-out an viele Viewer # --------------------------------------------------------------------------- class WebcamGrabber: """ Haelt genau eine Verbindung zur Netzwerk-Kamera. Ein Hintergrund-Thread liest Frames, laeuft YOLO drauf und legt das jeweils neueste annotierte JPEG in einen gemeinsamen Slot. Jeder /webcam_feed-Request konsumiert nur diesen Slot -> beliebig viele Zuschauer, aber nur EINE Cam-Verbindung. """ def __init__(self, url: str): self.url = url self.lock = Lock() self.frame_cond = Condition() self.latest_jpeg: bytes | None = None self.frame_seq = 0 self.viewers = 0 self.reset_flag = Event() self.line = dict(DEFAULT_LINE) # Eigenes Modell -> isolierter Tracker, getrennt vom Video-Pfad. # Lazy: wird erst beim ersten aktiven Stream geladen. self.model = None self.names = None self.thread = None # -- Steuerung ----------------------------------------------------------- def start(self): if self.thread is None: self.thread = threading.Thread(target=self._run, daemon=True) self.thread.start() def set_line(self, line: dict): with self.lock: self.line = dict(line) def reset(self): self.reset_flag.set() def add_viewer(self): with self.lock: self.viewers += 1 def remove_viewer(self): with self.lock: self.viewers = max(0, self.viewers - 1) def _viewer_count(self) -> int: with self.lock: return self.viewers def _current_line(self): with self.lock: line = dict(self.line) return (line["x1"], line["y1"]), (line["x2"], line["y2"]) def _open(self): cap = cv2.VideoCapture(self.url, cv2.CAP_FFMPEG) # Timeouts sind versions-/backend-abhaengig -> defensiv setzen. try: cap.set(cv2.CAP_PROP_OPEN_TIMEOUT_MSEC, 5000) cap.set(cv2.CAP_PROP_READ_TIMEOUT_MSEC, 5000) except Exception: pass return cap def _ensure_model(self): if self.model is None: self.model = YOLO("yolo11s.pt") self.names = self.model.model.names def _publish(self, jpeg: bytes): with self.frame_cond: self.latest_jpeg = jpeg self.frame_seq += 1 self.frame_cond.notify_all() def _clear(self): with self.frame_cond: self.latest_jpeg = None self.frame_seq += 1 self.frame_cond.notify_all() # -- Hintergrund-Thread (laeuft die ganze Prozess-Lebensdauer) ---------- def _run(self): cap = None state = new_state() frame_idx = 0 while True: # Niemand schaut zu -> Cam-Verbindung freigeben, idlen. if self._viewer_count() == 0: if cap is not None: cap.release() cap = None self._clear() time.sleep(0.3) continue self._ensure_model() # (Re-)Connect mit Backoff. if cap is None or not cap.isOpened(): if cap is not None: cap.release() cap = self._open() if not cap.isOpened(): cap = None time.sleep(2.0) continue # Frische Verbindung -> Track-IDs starten neu, Gesamtzaehler bleibt. state["track_positions"].clear() state["counted_ids"].clear() ret, frame = cap.read() if not ret: cap.release() cap = None time.sleep(1.0) continue frame_idx += 1 if frame_idx % 2 != 0: continue if self.reset_flag.is_set(): reset_state(state) self.reset_flag.clear() line_start, line_end = self._current_line() frame = process_frame(frame, self.model, self.names, line_start, line_end, state) ok, buffer = cv2.imencode(".jpg", frame) if not ok: continue self._publish(buffer.tobytes()) # -- Pro-Viewer-Generator ------------------------------------------------ def frames(self): self.add_viewer() last_seq = -1 try: while True: with self.frame_cond: got = self.frame_cond.wait_for( lambda: self.frame_seq != last_seq, timeout=15 ) if not got: # 15s kein neuer Frame -> Cam vermutlich tot, Viewer beenden. break jpeg = self.latest_jpeg last_seq = self.frame_seq if jpeg is None: # Grabber hat Verbindung freigegeben. break yield ( b"--frame\r\n" b"Content-Type: image/jpeg\r\n\r\n" + jpeg + b"\r\n" ) finally: self.remove_viewer() webcam = WebcamGrabber(CAMERA_URL) webcam.start() # --------------------------------------------------------------------------- # Reset-Events fuer den Video-Pfad (per Stream-ID) # --------------------------------------------------------------------------- reset_events: dict[str, Event] = {} reset_lock = Lock() def get_reset_event(stream_id: str) -> Event: with reset_lock: event = reset_events.get(stream_id) if event is None: event = Event() reset_events[stream_id] = event return event def release_reset_event(stream_id: str) -> None: with reset_lock: reset_events.pop(stream_id, None) def get_webcam_stream_id() -> str: stream_id = session.get("webcam_stream_id") if not stream_id: stream_id = f"webcam-{uuid.uuid4().hex}" session["webcam_stream_id"] = stream_id return stream_id def get_video_stream_id(filename: str) -> str: video_streams = session.get("video_stream_ids", {}) stream_id = video_streams.get(filename) if not stream_id: stream_id = f"video-{uuid.uuid4().hex}" video_streams[filename] = stream_id session["video_stream_ids"] = video_streams return stream_id # --------------------------------------------------------------------------- # Video-Pfad (per Request, unveraendert in der Logik) # --------------------------------------------------------------------------- def generate_frames(capture, line_data, stream_id: str): state = new_state() line_start = (line_data["x1"], line_data["y1"]) line_end = (line_data["x2"], line_data["y2"]) frame_idx = 0 reset_event = get_reset_event(stream_id) try: while True: ret, frame = capture.read() if not ret: break frame_idx += 1 if frame_idx % 2 != 0: continue if reset_event.is_set(): reset_state(state) reset_event.clear() frame = process_frame(frame, model, names, line_start, line_end, state) ok, buffer = cv2.imencode(".jpg", frame) if not ok: continue yield ( b"--frame\r\n" b"Content-Type: image/jpeg\r\n\r\n" + buffer.tobytes() + b"\r\n" ) finally: capture.release() release_reset_event(stream_id) def detect_objects_from_video(video_path, line_data, stream_id): cap = cv2.VideoCapture(video_path) if not cap.isOpened(): cap.release() raise RuntimeError("Video konnte nicht geoeffnet werden") return generate_frames(cap, line_data, stream_id) # --------------------------------------------------------------------------- # Routen # --------------------------------------------------------------------------- @app.route("/") def index(): return render_template("index.html") @app.route("/start_webcam") def start_webcam(): get_line_from_session() stream_id = get_webcam_stream_id() return render_template("webcam.html", stream_id=stream_id) @app.route("/webcam_feed") def webcam_feed(): # Keine eigene Cam-Verbindung mehr pro Request -> Fan-out vom Grabber. return Response( webcam.frames(), mimetype="multipart/x-mixed-replace; boundary=frame", ) @app.route("/api/set_line", methods=["POST"]) def set_counting_line(): """Setzt die Zaehllinie (gilt fuer Video-Session UND Webcam-Grabber).""" data = request.get_json(silent=True) or {} try: line = { "x1": int(data["x1"]), "y1": int(data["y1"]), "x2": int(data["x2"]), "y2": int(data["y2"]), } except (KeyError, ValueError, TypeError): abort(400, description="Ungueltige Linienkoordinaten") session["counting_line"] = line webcam.set_line(line) # Webcam nutzt eine globale Linie (eine Kamera) return jsonify({"status": "success", "line": line}) @app.route("/api/get_line", methods=["GET"]) def get_counting_line(): return jsonify(get_line_from_session()) @app.route("/api/reset_count", methods=["POST"]) def reset_count(): data = request.get_json(silent=True) or {} stream_id = data.get("stream_id") if not stream_id: abort(400, description="stream_id ist erforderlich") valid_ids = {session.get("webcam_stream_id")} valid_ids.update(session.get("video_stream_ids", {}).values()) valid_ids.discard(None) if stream_id not in valid_ids: abort(403, description="Stream gehoert nicht zur aktuellen Sitzung") if stream_id == session.get("webcam_stream_id"): webcam.reset() else: get_reset_event(stream_id).set() return jsonify({"status": "success", "message": "Zaehler wird zurueckgesetzt"}) @app.route("/upload", methods=["POST"]) def upload_video(): if "file" not in request.files: abort(400, description="Keine Datei erhalten") file = request.files["file"] if not file or file.filename == "": abort(400, description="Keine Datei ausgewaehlt") filename = secure_filename(file.filename) if not filename: abort(400, description="Ungueltiger Dateiname") if not allowed_file(filename): abort(400, description="Ungueltiger Dateityp") ensure_upload_dir() name, ext = os.path.splitext(filename) stored_filename = f"{name}_{uuid.uuid4().hex}{ext.lower()}" file_path = os.path.join(UPLOAD_DIR, stored_filename) file.save(file_path) return redirect(url_for("play_video", filename=stored_filename)) @app.route("/uploads/") def play_video(filename): safe_filename = os.path.basename(filename) if safe_filename != filename: abort(400, description="Ungueltiger Dateiname") file_path = os.path.join(UPLOAD_DIR, safe_filename) if not os.path.isfile(file_path): abort(404) get_line_from_session() stream_id = get_video_stream_id(safe_filename) return render_template("play_video.html", filename=safe_filename, stream_id=stream_id) @app.route("/video/") def send_video(filename): return send_from_directory(UPLOAD_DIR, filename) @app.route("/video_feed/") def video_feed(filename): safe_filename = os.path.basename(filename) if safe_filename != filename: abort(400, description="Ungueltiger Dateiname") video_path = os.path.join(UPLOAD_DIR, safe_filename) if not os.path.isfile(video_path): abort(404) line_data = get_line_from_session() stream_id = get_video_stream_id(safe_filename) try: generator = detect_objects_from_video(video_path, line_data, stream_id) except RuntimeError as exc: abort(503, description=str(exc)) return Response(generator, mimetype="multipart/x-mixed-replace; boundary=frame") if __name__ == "__main__": app.run("0.0.0.0", debug=False, port=8080)