Files
vehicle-counter/app.py
Joachim Hummel 264b2b3e3b Fix: Offline-Erkennung loeste Reconnect-Resets aus (Zaehlung gestoert)
Bei GRABBER_ALWAYS_ON=0 kappte kurzes Zuschauer-Aus (<img>-Reload der
Auto-Recovery) die Kameraverbindung -> Reconnect -> Tracker/Zaehlzustand
wurde zurueckgesetzt. Bei ~1 FPS riss das die Zaehlung auseinander.

- Grabber: Karenzzeit (VIEWER_GRACE_SEC, Default 15s) bevor die Kamera
  bei fehlenden Zuschauern freigegeben wird -> kein Reconnect-Churn
- Frontend: Overlay/Reload erst nach ~6s echtem Ausfall (3 Polls),
  nicht bei einzelnen langsamen Frames -> kein Verbindungs-Churn

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-02 10:53:46 +02:00

904 lines
33 KiB
Python

import os
import json
import uuid
import time
import threading
from datetime import datetime
from zoneinfo import ZoneInfo
from threading import Event, Lock, Condition
import cv2
import numpy as np
import requests
import paho.mqtt.client as mqtt
from flask import (
Flask,
render_template,
Response,
request,
redirect,
url_for,
send_from_directory,
session,
jsonify,
abort,
)
from werkzeug.utils import secure_filename
from ultralytics import YOLO
# .env laden, BEVOR os.environ ausgelesen wird (Broker-/Kamera-/MQTT-Konfig).
# load_dotenv versteht auch "export VAR=..."-Zeilen und Inline-Kommentare.
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
print("[config] python-dotenv nicht installiert - .env wird ignoriert", flush=True)
# ---------------------------------------------------------------------------
# Konfiguration
# ---------------------------------------------------------------------------
# Kamera-URL ueber Env setzen (.env). ESP32-CAM: Port 81, Pfad /stream.
CAMERA_URL = os.environ.get("CAMERA_URL", "http://CAMERA-IP:81/stream")
# --- Inferenz-Tuning (alles per Env ueberschreibbar) -----------------------
# Webcam nutzt das leichte Nano-Modell; Video-Upload bleibt auf yolo11s.
WEBCAM_MODEL = os.environ.get("WEBCAM_MODEL", "yolo11n.pt")
# Kleinere imgsz = weniger Rechenarbeit pro Frame (Default YOLO waere 640).
YOLO_IMGSZ = int(os.environ.get("YOLO_IMGSZ", "480"))
# Motion-Gate: ab so vielen veraenderten Pixeln (320x180-Graubild) gilt
# die Szene als "in Bewegung" und YOLO laeuft. Sonst kein Inferenz-Call.
MOTION_PIXELS = int(os.environ.get("MOTION_PIXELS", "500"))
# Always-on: Grabber laeuft unabhaengig von Zuschauern durch (24/7-Counter).
# 0 = nur zaehlen/streamen, wenn ein Browser zuschaut (gibt ESP32-Slot frei).
# 1 = dauerhaft verbinden + zaehlen, egal ob jemand zuschaut.
GRABBER_ALWAYS_ON = os.environ.get("GRABBER_ALWAYS_ON", "0") == "1"
# Karenzzeit: nach dem letzten Zuschauer noch so viele Sekunden weiterstreamen,
# bevor die Kameraverbindung freigegeben wird. Verhindert, dass kurzes
# Zuschauer-Aus (z.B. <img>-Reload durch die Offline-Erkennung) staendig
# Reconnects samt Tracker-/Zaehlzustand-Reset ausloest.
VIEWER_GRACE_SEC = float(os.environ.get("VIEWER_GRACE_SEC", "15"))
# FP16 nur auf CUDA sinnvoll -> automatisch erkennen, per Env erzwingbar.
try:
import torch
_CUDA = torch.cuda.is_available()
except Exception:
_CUDA = False
YOLO_HALF = os.environ.get("YOLO_HALF", "1" if _CUDA else "0") == "1"
# COCO-Klassen: 2=car, 3=motorcycle, 5=bus, 7=truck. Filtert Person/Couch etc.
VEHICLE_CLASS_IDS = [2, 3, 5, 7]
# --- Zaehl-Robustheit fuer schnelle Fahrzeuge ------------------------------
# Band um die Zaehllinie (in Pixeln, bezogen auf das 1020x600-Bild):
# Ein Fahrzeug wird schon gezaehlt, wenn sein Mittelpunkt naeher als
# COUNT_BAND_PX an der Linie liegt - es muss NICHT mehr ein Punkt davor UND
# einer dahinter erfasst werden. Faengt schnelle Fahrzeuge ab, die zwischen
# zwei Frames weit springen. 0 = aus (nur klassischer Segment-Schnitt).
COUNT_BAND_PX = int(os.environ.get("COUNT_BAND_PX", "45"))
# Entprellung gegen Doppelzaehlung bei Track-ID-Wechseln: ein neuer Zaehl-
# punkt wird verworfen, wenn er naeher als COUNT_DEDUP_PX an einem in den
# letzten COUNT_DEDUP_FRAMES Frames gezaehlten Punkt liegt.
COUNT_DEDUP_PX = int(os.environ.get("COUNT_DEDUP_PX", "60"))
COUNT_DEDUP_FRAMES = int(os.environ.get("COUNT_DEDUP_FRAMES", "12"))
# 24/7-Betrieb: Track-IDs, die so viele Frames nicht mehr gesehen wurden,
# werden aus track_positions UND counted_ids entfernt. Verhindert das
# unbegrenzte Wachsen der Zustaende (Speicherleck) und falsches Unterdruecken
# bei spaeterer Track-ID-Wiederverwendung. Wert > Verweildauer eines Fahrzeugs.
COUNT_FORGET_FRAMES = int(os.environ.get("COUNT_FORGET_FRAMES", "150"))
# Diagnose: pro verarbeitetem Frame mit Fahrzeugen eine Log-Zeile ausgeben
# (Bewegung? Detektionen? IDs? Abstand zur Linie? gezaehlt?). COUNT_DEBUG=1.
COUNT_DEBUG = os.environ.get("COUNT_DEBUG", "0") == "1"
# --- MQTT: ein Event pro Linienueberquerung (fuer n8n -> NocoDB) -----------
# Komplett abschaltbar: MQTT_ENABLED=false -> App laeuft ohne Broker/Events.
MQTT_ENABLED = os.environ.get("MQTT_ENABLED", "true").strip().lower() in (
"1", "true", "yes", "on",
)
MQTT_HOST = os.environ.get("MQTT_HOST", "127.0.0.1")
MQTT_PORT = int(os.environ.get("MQTT_PORT", "1883"))
MQTT_USER = os.environ.get("MQTT_USER")
MQTT_PASS = os.environ.get("MQTT_PASS")
MQTT_TOPIC = os.environ.get("MQTT_TOPIC", "vehiclecounter/cam1")
CAMERA_ID = os.environ.get("CAMERA_ID", "cam1")
# Availability-/Status-Topic: "online" beim Verbinden (Birth-Message),
# "offline" automatisch via Last Will (LWT), falls die Verbindung abreisst.
STATUS_TOPIC = f"{MQTT_TOPIC}/status"
# Zeitzone fuer den Zeitstempel (DST-aware). Standard Europe/Berlin.
LOCAL_TZ = ZoneInfo(os.environ.get("TZ_NAME", "Europe/Berlin"))
def _on_mqtt_connect(client, userdata, flags, *args):
"""Birth-Message: nach jedem (Re-)Connect 'online' (retained) senden."""
client.publish(STATUS_TOPIC, "online", qos=1, retain=True)
print(f"[mqtt] connected -> {STATUS_TOPIC} online", flush=True)
_mqtt = None
if not MQTT_ENABLED:
print("[mqtt] deaktiviert (MQTT_ENABLED=false) - keine Events", flush=True)
else:
# paho-mqtt 2.x verlangt die CallbackAPIVersion, 1.x kennt sie nicht.
try:
_mqtt = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
except AttributeError:
_mqtt = mqtt.Client()
if MQTT_USER:
_mqtt.username_pw_set(MQTT_USER, MQTT_PASS)
_mqtt.on_connect = _on_mqtt_connect
# Last Will: Broker publiziert das, sobald die Verbindung unsauber abbricht.
_mqtt.will_set(STATUS_TOPIC, "offline", qos=1, retain=True)
try:
# async + loop_start -> blockiert den App-Start nicht, wenn der Broker weg ist
_mqtt.connect_async(MQTT_HOST, MQTT_PORT, keepalive=60)
_mqtt.loop_start()
except Exception as exc:
print(f"[mqtt] init failed: {exc}", flush=True)
def publish_crossing(vehicle_type, track_id, source):
"""Crossing-Event auf {MQTT_TOPIC}/crossing (QoS 1, nicht retained)."""
if _mqtt is None:
return
payload = {
"event": "crossing",
"camera": CAMERA_ID,
"source": source,
"type": vehicle_type,
"track_id": int(track_id),
"ts": datetime.now(LOCAL_TZ).isoformat(),
}
try:
_mqtt.publish(f"{MQTT_TOPIC}/crossing", json.dumps(payload), qos=1, retain=False)
except Exception as exc:
print(f"[mqtt] publish failed: {exc}", flush=True)
app = Flask(__name__)
app.secret_key = os.environ.get("SECRET_KEY", "vehicle_dev_secret")
app.config["MAX_CONTENT_LENGTH"] = 200 * 1024 * 1024 # 200MB Upload-Limit
# Globales Modell fuer den Video-Upload-Pfad (per Request).
model = YOLO("yolo11s.pt")
names = model.model.names
VEHICLE_CLASSES = {"car", "truck", "bus", "motorcycle"}
ALLOWED_EXTENSIONS = {"mp4", "mov", "avi", "mkv"}
UPLOAD_DIR = "uploads"
DEFAULT_LINE = {"x1": 0, "y1": 300, "x2": 1020, "y2": 300}
FRAME_SIZE = (1020, 600)
# Persistente Zaehllinie: wird als JSON gespeichert und beim Start geladen,
# damit sie einen Neustart ueberlebt (Pfad per Env ueberschreibbar).
LINE_FILE = os.environ.get("LINE_FILE", "counting_line.json")
def _valid_line(d):
"""Validiert/normalisiert ein Linien-Dict zu int-Koordinaten oder None."""
try:
return {k: int(d[k]) for k in ("x1", "y1", "x2", "y2")}
except (KeyError, ValueError, TypeError):
return None
def load_saved_line() -> dict:
"""Laedt die gespeicherte Linie oder faellt auf DEFAULT_LINE zurueck."""
try:
with open(LINE_FILE) as fh:
line = _valid_line(json.load(fh))
if line:
return line
except (OSError, json.JSONDecodeError):
pass
return dict(DEFAULT_LINE)
def save_line(line: dict) -> None:
"""Speichert die Linie atomar als JSON (ueberlebt App-Neustart)."""
try:
tmp = f"{LINE_FILE}.tmp"
with open(tmp, "w") as fh:
json.dump(line, fh)
os.replace(tmp, LINE_FILE)
except OSError as exc:
print(f"[line] save failed: {exc}", flush=True)
# Beim Start einmal laden -> Default fuer Session UND Webcam-Grabber.
SAVED_LINE = load_saved_line()
# ---------------------------------------------------------------------------
# Hilfsfunktionen
# ---------------------------------------------------------------------------
def allowed_file(filename: str) -> bool:
return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS
def ensure_upload_dir() -> None:
if not os.path.exists(UPLOAD_DIR):
os.makedirs(UPLOAD_DIR)
def get_line_from_session():
if "counting_line" not in session:
session["counting_line"] = dict(SAVED_LINE)
return session["counting_line"]
def fresh_vehicle_counts() -> dict[str, int]:
return {vehicle: 0 for vehicle in VEHICLE_CLASSES}
def new_state() -> dict:
"""Frischer Zaehl-/Tracking-Zustand fuer einen Stream."""
return {
"track_positions": {},
"counted_ids": set(),
"count": 0,
"types": fresh_vehicle_counts(),
"frame_idx": 0,
"recent_counts": [], # [(cx, cy, frame_idx)] fuer die Entprellung
}
def reset_state(state: dict) -> None:
state["track_positions"].clear()
state["counted_ids"].clear()
state["count"] = 0
state["types"] = fresh_vehicle_counts()
state["frame_idx"] = 0
state["recent_counts"].clear()
def line_intersect(p1, p2, p3, p4) -> bool:
"""True, wenn sich die Strecken p1-p2 und p3-p4 schneiden."""
x1, y1 = p1
x2, y2 = p2
x3, y3 = p3
x4, y4 = p4
denom = (x1 - x2) * (y3 - y4) - (y1 - y2) * (x3 - x4)
if abs(denom) < 1e-10:
return False
t = ((x1 - x3) * (y3 - y4) - (y1 - y3) * (x3 - x4)) / denom
u = -((x1 - x2) * (y1 - y3) - (y1 - y2) * (x1 - x3)) / denom
return 0 <= t <= 1 and 0 <= u <= 1
def crossed_line(prev_pos, curr_pos, line_start, line_end) -> bool:
return line_intersect(prev_pos, curr_pos, line_start, line_end)
def point_to_segment_dist(px, py, ax, ay, bx, by) -> float:
"""Kuerzester Abstand des Punkts (px,py) zur Strecke (ax,ay)-(bx,by)."""
dx, dy = bx - ax, by - ay
if dx == 0 and dy == 0:
return ((px - ax) ** 2 + (py - ay) ** 2) ** 0.5
t = ((px - ax) * dx + (py - ay) * dy) / (dx * dx + dy * dy)
t = max(0.0, min(1.0, t))
cx, cy = ax + t * dx, ay + t * dy
return ((px - cx) ** 2 + (py - cy) ** 2) ** 0.5
def _recently_counted(recent_counts, cx, cy) -> bool:
"""True, wenn nahe (cx,cy) kuerzlich schon gezaehlt wurde (ID-Wechsel)."""
for rx, ry, _ in recent_counts:
if (cx - rx) ** 2 + (cy - ry) ** 2 <= COUNT_DEDUP_PX ** 2:
return True
return False
def draw_overlay(frame, line_start, line_end, state):
"""Zeichnet Zaehllinie (gestrichelt) + Zaehler-Box. Kein YOLO."""
types = state["types"]
cv2.line(frame, line_start, line_end, (0, 255, 255), 3, cv2.LINE_AA)
line_length = int(np.sqrt((line_end[0] - line_start[0]) ** 2 + (line_end[1] - line_start[1]) ** 2))
dash_length = 20
for i in range(0, max(line_length, 1), dash_length * 2):
t1 = i / line_length if line_length else 0
t2 = min((i + dash_length) / line_length, 1.0) if line_length else 0
x1_dash = int(line_start[0] + t1 * (line_end[0] - line_start[0]))
y1_dash = int(line_start[1] + t1 * (line_end[1] - line_start[1]))
x2_dash = int(line_start[0] + t2 * (line_end[0] - line_start[0]))
y2_dash = int(line_start[1] + t2 * (line_end[1] - line_start[1]))
cv2.line(frame, (x1_dash, y1_dash), (x2_dash, y2_dash), (0, 0, 0), 3)
cv2.rectangle(frame, (10, 10), (350, 140), (0, 0, 0), -1)
cv2.putText(frame, f"Gesamt: {state['count']}", (20, 35), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
cv2.putText(frame, f"Autos: {types.get('car', 0)}", (20, 65), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (200, 200, 200), 1)
cv2.putText(frame, f"LKW: {types.get('truck', 0)}", (20, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (200, 200, 200), 1)
cv2.putText(frame, f"Busse: {types.get('bus', 0)}", (20, 115), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (200, 200, 200), 1)
cv2.putText(frame, f"Motorraeder: {types.get('motorcycle', 0)}", (20, 135), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (200, 200, 200), 1)
return frame
def render_static(img, line_start, line_end, state):
"""Nur Overlay auf den Rohframe -> wird genutzt, wenn nichts in Bewegung
ist (Motion-Gate). Laeuft kein YOLO -> GPU bleibt idle."""
frame = cv2.resize(img, FRAME_SIZE)
return draw_overlay(frame, line_start, line_end, state)
def process_frame(frame, det_model, det_names, line_start, line_end, state, source="video"):
"""
Skaliert den Frame, fuehrt YOLO-Tracking aus, zaehlt Linienueberquerungen
und zeichnet alle Overlays. Mutiert `state` in-place und gibt den
annotierten Frame zurueck. Wird von Webcam-Grabber UND Video-Pfad genutzt.
"""
frame = cv2.resize(frame, FRAME_SIZE)
results = det_model.track(
frame,
persist=True,
imgsz=YOLO_IMGSZ,
half=YOLO_HALF,
classes=VEHICLE_CLASS_IDS,
verbose=False,
)
track_positions = state["track_positions"]
counted_ids = state["counted_ids"]
types = state["types"]
recent_counts = state["recent_counts"]
state["frame_idx"] += 1
frame_idx = state["frame_idx"]
# Entprell-Historie auf das Zeitfenster eindampfen.
recent_counts[:] = [
c for c in recent_counts if frame_idx - c[2] <= COUNT_DEDUP_FRAMES
]
# Garbage Collection: laengst verschwundene Track-IDs vergessen, damit
# track_positions/counted_ids im 24/7-Betrieb nicht unbegrenzt wachsen.
stale = [
tid for tid, pos in track_positions.items()
if frame_idx - pos[2] > COUNT_FORGET_FRAMES
]
for tid in stale:
track_positions.pop(tid, None)
counted_ids.discard(tid)
dbg_vehicles = 0
dbg_counted = 0
dbg_dists = []
if results and results[0].boxes is not None and results[0].boxes.id is not None:
boxes = results[0].boxes.xyxy.int().cpu().tolist()
class_ids = results[0].boxes.cls.int().cpu().tolist()
track_ids = results[0].boxes.id.int().cpu().tolist()
for box, class_id, track_id in zip(boxes, class_ids, track_ids):
label_name = det_names[class_id]
x1, y1, x2, y2 = box
center_x = (x1 + x2) // 2
center_y = (y1 + y2) // 2
if label_name in VEHICLE_CLASSES:
if COUNT_DEBUG:
dbg_vehicles += 1
dbg_dists.append(int(point_to_segment_dist(
center_x, center_y,
line_start[0], line_start[1], line_end[0], line_end[1])))
if track_id not in counted_ids:
crossed = False
# (a) Klassischer Segment-Schnitt: Punkt davor UND dahinter.
if track_id in track_positions:
prev_x, prev_y, _ = track_positions[track_id]
cv2.line(frame, (prev_x, prev_y), (center_x, center_y), (255, 100, 0), 2)
if crossed_line((prev_x, prev_y), (center_x, center_y), line_start, line_end):
crossed = True
# (b) Band um die Linie: ein einzelnes Sample nah an der
# Linie reicht -> faengt schnelle Fahrzeuge ab.
if not crossed and COUNT_BAND_PX > 0:
dist = point_to_segment_dist(
center_x, center_y,
line_start[0], line_start[1], line_end[0], line_end[1],
)
if dist <= COUNT_BAND_PX:
crossed = True
if crossed and not _recently_counted(recent_counts, center_x, center_y):
counted_ids.add(track_id)
recent_counts.append((center_x, center_y, frame_idx))
state["count"] += 1
types[label_name] += 1
publish_crossing(label_name, track_id, source)
cv2.circle(frame, (center_x, center_y), 25, (0, 255, 0), 5)
dbg_counted += 1
track_positions[track_id] = (center_x, center_y, frame_idx)
box_color = (0, 255, 0) if label_name in VEHICLE_CLASSES else (255, 0, 0)
cv2.rectangle(frame, (x1, y1), (x2, y2), box_color, 2)
label = f"{track_id} - {label_name}"
if track_id in counted_ids:
label += " \u2713"
cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 255), 1)
cv2.circle(frame, (center_x, center_y), 3, (0, 255, 255), -1)
if COUNT_DEBUG and dbg_vehicles:
print(
f"[count-debug] frame#{frame_idx} fahrzeuge={dbg_vehicles} "
f"abstand_zur_linie={sorted(dbg_dists)} band={COUNT_BAND_PX} "
f"jetzt_gezaehlt={dbg_counted} gesamt={state['count']}",
flush=True,
)
return draw_overlay(frame, line_start, line_end, state)
# ---------------------------------------------------------------------------
# Webcam-Grabber: EINE Verbindung zur (ESP32-)Cam, Fan-out an viele Viewer
# ---------------------------------------------------------------------------
class WebcamGrabber:
"""
Haelt genau eine Verbindung zur Netzwerk-Kamera. Ein Hintergrund-Thread
liest Frames, laeuft YOLO drauf und legt das jeweils neueste annotierte
JPEG in einen gemeinsamen Slot. Jeder /webcam_feed-Request konsumiert nur
diesen Slot -> beliebig viele Zuschauer, aber nur EINE Cam-Verbindung.
Liest den MJPEG-Stream MANUELL ueber requests (SOI/EOI-Parsing) statt ueber
cv2.VideoCapture -> umgeht den FFmpeg-Demuxer, der bei ESP32-CAM scheitert.
Motion-Gate: YOLO laeuft nur, wenn sich im Bild genug aendert. Steht die
Szene still (z.B. nur parkende Autos), wird kein Inferenz-Call gemacht und
die GPU bleibt idle.
"""
MAX_BUFFER = 4 * 1024 * 1024 # Schutz gegen unbegrenztes Puffer-Wachstum
def __init__(self, url: str):
self.url = url
self.lock = Lock()
self.frame_cond = Condition()
self.latest_jpeg: bytes | None = None
self.frame_seq = 0
self.last_frame_ts = 0.0 # time.time() des letzten gelieferten Frames
self.viewers = 0
self._last_active_ts = 0.0 # letzter Zeitpunkt mit Zuschauer (Karenzzeit)
self.reset_flag = Event()
self.line = dict(SAVED_LINE)
# Eigenes Modell -> isolierter Tracker, getrennt vom Video-Pfad.
# Lazy: wird erst beim ersten aktiven Stream geladen.
self.model = None
self.names = None
# Motion-Gate-Referenzbild (Graustufen, herunterskaliert).
self._prev_gray = None
self.thread = None
# -- Steuerung -----------------------------------------------------------
def start(self):
if self.thread is None:
self.thread = threading.Thread(target=self._run, daemon=True)
self.thread.start()
def set_line(self, line: dict):
with self.lock:
self.line = dict(line)
def reset(self):
self.reset_flag.set()
def add_viewer(self):
with self.lock:
self.viewers += 1
def remove_viewer(self):
with self.lock:
self.viewers = max(0, self.viewers - 1)
def _viewer_count(self) -> int:
with self.lock:
return self.viewers
def _wants_stream(self) -> bool:
"""Soll der Grabber gerade die Kamera bedienen? Mit Karenzzeit, damit
kurzes Zuschauer-Aus (z.B. <img>-Reload) die Verbindung nicht kappt."""
if GRABBER_ALWAYS_ON:
return True
if self._viewer_count() > 0:
self._last_active_ts = time.time()
return True
return (time.time() - self._last_active_ts) < VIEWER_GRACE_SEC
def _current_line(self):
with self.lock:
line = dict(self.line)
return (line["x1"], line["y1"]), (line["x2"], line["y2"])
@staticmethod
def _extract_latest(buf: bytes):
"""
Zieht den ZULETZT vollstaendigen JPEG aus dem Puffer und verwirft
aeltere -> haelt die Latenz niedrig (Backlog wird uebersprungen).
Gibt (jpeg_or_None, rest_buffer) zurueck.
"""
latest = None
while True:
start = buf.find(b"\xff\xd8") # SOI
if start == -1:
buf = b""
break
end = buf.find(b"\xff\xd9", start + 2) # EOI
if end == -1:
buf = buf[start:] # unvollstaendig -> Tail behalten
break
latest = buf[start:end + 2]
buf = buf[end + 2:]
return latest, buf
def _ensure_model(self):
if self.model is None:
self.model = YOLO(WEBCAM_MODEL)
self.names = self.model.model.names
def _publish(self, jpeg: bytes):
with self.frame_cond:
self.latest_jpeg = jpeg
self.frame_seq += 1
self.last_frame_ts = time.time()
self.frame_cond.notify_all()
def _clear(self):
with self.frame_cond:
self.latest_jpeg = None
self.frame_seq += 1
self.frame_cond.notify_all()
def is_online(self) -> bool:
"""True, wenn zuletzt vor < 5s ein Frame kam (Kamera liefert)."""
return self.latest_jpeg is not None and (time.time() - self.last_frame_ts) < 5.0
def _reset_tracker(self):
"""Persistenten YOLO-Tracker leeren -> frische Track-IDs nach Reconnect.
Ohne Reset traegt der Tracker nach langer Pause (z.B. Kamera ueber Nacht
aus) alte/verlorene Tracks mit; neue Fahrzeuge bekommen dann verzoegert
oder keine ID -> werden nicht gezaehlt."""
try:
predictor = getattr(self.model, "predictor", None)
trackers = getattr(predictor, "trackers", None) if predictor else None
for tr in trackers or []:
tr.reset()
except Exception as exc:
print(f"[webcam-grabber] tracker reset skipped: {exc}", flush=True)
# -- Hintergrund-Thread (laeuft die ganze Prozess-Lebensdauer) ----------
def _run(self):
state = new_state()
while True:
# Ohne Always-on: nur verbinden, wenn jemand zuschaut (mit Karenzzeit).
if not self._wants_stream():
if self.latest_jpeg is not None:
self._clear()
time.sleep(0.3)
continue
resp = None
try:
self._ensure_model()
resp = requests.get(self.url, stream=True, timeout=(5, 10))
resp.raise_for_status()
# Frische Verbindung -> Track-IDs neu, Gesamtzaehler bleibt.
state["track_positions"].clear()
state["counted_ids"].clear()
state["recent_counts"].clear()
state["frame_idx"] = 0
self._reset_tracker() # stale Tracker-Zustand nach Pause verwerfen
self._prev_gray = None
print("[webcam-grabber] verbunden -> Tracker/Zaehlzustand frisch", flush=True)
buf = b""
for chunk in resp.iter_content(chunk_size=8192):
if not self._wants_stream():
break # Zuschauer (inkl. Karenz) weg -> Verbindung freigeben
if not chunk:
continue
buf += chunk
if len(buf) > self.MAX_BUFFER:
buf = buf[-self.MAX_BUFFER:]
jpeg, buf = self._extract_latest(buf)
if jpeg is None:
continue
img = cv2.imdecode(np.frombuffer(jpeg, np.uint8), cv2.IMREAD_COLOR)
if img is None:
continue
if self.reset_flag.is_set():
reset_state(state)
self.reset_flag.clear()
# --- Motion-Gate: YOLO nur bei Bewegung (spart GPU) ---
gray = cv2.resize(cv2.cvtColor(img, cv2.COLOR_BGR2GRAY), (320, 180))
moving = True
if self._prev_gray is not None:
diff = cv2.absdiff(gray, self._prev_gray)
_, mask = cv2.threshold(diff, 25, 255, cv2.THRESH_BINARY)
moving = cv2.countNonZero(mask) > MOTION_PIXELS
self._prev_gray = gray
line_start, line_end = self._current_line()
if moving:
frame = process_frame(img, self.model, self.names, line_start, line_end, state, source="webcam")
else:
frame = render_static(img, line_start, line_end, state)
ok, out = cv2.imencode(".jpg", frame)
if ok:
self._publish(out.tobytes())
except Exception as exc:
# Timeout / Verbindungsabbruch / HTTP-/Modell-Fehler -> sichtbar + Backoff
print(f"[webcam-grabber] {type(exc).__name__}: {exc}", flush=True)
time.sleep(1.0)
finally:
if resp is not None:
resp.close()
# -- Pro-Viewer-Generator ------------------------------------------------
def frames(self):
self.add_viewer()
last_seq = -1
got_any = False
try:
while True:
with self.frame_cond:
ok = self.frame_cond.wait_for(
lambda: self.latest_jpeg is not None and self.frame_seq != last_seq,
timeout=20,
)
jpeg = self.latest_jpeg
seq = self.frame_seq
if not ok:
if got_any:
break # hatten Frames, jetzt 20s nichts -> Cam weg
continue # noch nie ein Frame (Modell laedt / Connect) -> weiter warten
if jpeg is None:
continue
got_any = True
last_seq = seq
yield (
b"--frame\r\n"
b"Content-Type: image/jpeg\r\n\r\n" + jpeg + b"\r\n"
)
finally:
self.remove_viewer()
webcam = WebcamGrabber(CAMERA_URL)
webcam.start()
# ---------------------------------------------------------------------------
# Reset-Events fuer den Video-Pfad (per Stream-ID)
# ---------------------------------------------------------------------------
reset_events: dict[str, Event] = {}
reset_lock = Lock()
def get_reset_event(stream_id: str) -> Event:
with reset_lock:
event = reset_events.get(stream_id)
if event is None:
event = Event()
reset_events[stream_id] = event
return event
def release_reset_event(stream_id: str) -> None:
with reset_lock:
reset_events.pop(stream_id, None)
def get_webcam_stream_id() -> str:
stream_id = session.get("webcam_stream_id")
if not stream_id:
stream_id = f"webcam-{uuid.uuid4().hex}"
session["webcam_stream_id"] = stream_id
return stream_id
def get_video_stream_id(filename: str) -> str:
video_streams = session.get("video_stream_ids", {})
stream_id = video_streams.get(filename)
if not stream_id:
stream_id = f"video-{uuid.uuid4().hex}"
video_streams[filename] = stream_id
session["video_stream_ids"] = video_streams
return stream_id
# ---------------------------------------------------------------------------
# Video-Pfad (per Request, unveraendert in der Logik)
# ---------------------------------------------------------------------------
def generate_frames(capture, line_data, stream_id: str):
state = new_state()
line_start = (line_data["x1"], line_data["y1"])
line_end = (line_data["x2"], line_data["y2"])
frame_idx = 0
reset_event = get_reset_event(stream_id)
try:
while True:
ret, frame = capture.read()
if not ret:
break
frame_idx += 1
if frame_idx % 2 != 0:
continue
if reset_event.is_set():
reset_state(state)
reset_event.clear()
frame = process_frame(frame, model, names, line_start, line_end, state, source="video")
ok, buffer = cv2.imencode(".jpg", frame)
if not ok:
continue
yield (
b"--frame\r\n"
b"Content-Type: image/jpeg\r\n\r\n" + buffer.tobytes() + b"\r\n"
)
finally:
capture.release()
release_reset_event(stream_id)
def detect_objects_from_video(video_path, line_data, stream_id):
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
cap.release()
raise RuntimeError("Video konnte nicht geoeffnet werden")
return generate_frames(cap, line_data, stream_id)
# ---------------------------------------------------------------------------
# Routen
# ---------------------------------------------------------------------------
@app.route("/")
def index():
return render_template("index.html")
@app.route("/start_webcam")
def start_webcam():
get_line_from_session()
stream_id = get_webcam_stream_id()
return render_template("webcam.html", stream_id=stream_id)
@app.route("/webcam_feed")
def webcam_feed():
# Keine eigene Cam-Verbindung mehr pro Request -> Fan-out vom Grabber.
return Response(
webcam.frames(),
mimetype="multipart/x-mixed-replace; boundary=frame",
)
@app.route("/api/set_line", methods=["POST"])
def set_counting_line():
"""Setzt die Zaehllinie (gilt fuer Video-Session UND Webcam-Grabber)."""
data = request.get_json(silent=True) or {}
line = _valid_line(data)
if line is None:
abort(400, description="Ungueltige Linienkoordinaten")
global SAVED_LINE
SAVED_LINE = line # Default fuer kuenftige Sessions
save_line(line) # persistent -> ueberlebt Neustart
session["counting_line"] = line
webcam.set_line(line) # Webcam nutzt eine globale Linie (eine Kamera)
return jsonify({"status": "success", "line": line})
@app.route("/api/get_line", methods=["GET"])
def get_counting_line():
return jsonify(get_line_from_session())
@app.route("/api/webcam_status", methods=["GET"])
def webcam_status():
"""Liefert, ob der Webcam-Grabber gerade Frames bekommt (Kamera online)."""
return jsonify({"online": webcam.is_online()})
@app.route("/api/reset_count", methods=["POST"])
def reset_count():
data = request.get_json(silent=True) or {}
stream_id = data.get("stream_id")
if not stream_id:
abort(400, description="stream_id ist erforderlich")
valid_ids = {session.get("webcam_stream_id")}
valid_ids.update(session.get("video_stream_ids", {}).values())
valid_ids.discard(None)
if stream_id not in valid_ids:
abort(403, description="Stream gehoert nicht zur aktuellen Sitzung")
if stream_id == session.get("webcam_stream_id"):
webcam.reset()
else:
get_reset_event(stream_id).set()
return jsonify({"status": "success", "message": "Zaehler wird zurueckgesetzt"})
@app.route("/upload", methods=["POST"])
def upload_video():
if "file" not in request.files:
abort(400, description="Keine Datei erhalten")
file = request.files["file"]
if not file or file.filename == "":
abort(400, description="Keine Datei ausgewaehlt")
filename = secure_filename(file.filename)
if not filename:
abort(400, description="Ungueltiger Dateiname")
if not allowed_file(filename):
abort(400, description="Ungueltiger Dateityp")
ensure_upload_dir()
name, ext = os.path.splitext(filename)
stored_filename = f"{name}_{uuid.uuid4().hex}{ext.lower()}"
file_path = os.path.join(UPLOAD_DIR, stored_filename)
file.save(file_path)
return redirect(url_for("play_video", filename=stored_filename))
@app.route("/uploads/<filename>")
def play_video(filename):
safe_filename = os.path.basename(filename)
if safe_filename != filename:
abort(400, description="Ungueltiger Dateiname")
file_path = os.path.join(UPLOAD_DIR, safe_filename)
if not os.path.isfile(file_path):
abort(404)
get_line_from_session()
stream_id = get_video_stream_id(safe_filename)
return render_template("play_video.html", filename=safe_filename, stream_id=stream_id)
@app.route("/video/<path:filename>")
def send_video(filename):
return send_from_directory(UPLOAD_DIR, filename)
@app.route("/video_feed/<filename>")
def video_feed(filename):
safe_filename = os.path.basename(filename)
if safe_filename != filename:
abort(400, description="Ungueltiger Dateiname")
video_path = os.path.join(UPLOAD_DIR, safe_filename)
if not os.path.isfile(video_path):
abort(404)
line_data = get_line_from_session()
stream_id = get_video_stream_id(safe_filename)
try:
generator = detect_objects_from_video(video_path, line_data, stream_id)
except RuntimeError as exc:
abort(503, description=str(exc))
return Response(generator, mimetype="multipart/x-mixed-replace; boundary=frame")
if __name__ == "__main__":
app.run("0.0.0.0", debug=False, port=8080)