Fix: schnelle Fahrzeuge wurden an der Zaehllinie nicht gezaehlt

Bisher musste von DERSELBEN Track-ID ein Punkt vor UND hinter der Linie
erfasst werden. Bei schnellen Fahrzeugen gibt es dafuer oft zu wenige
Samples oder die Track-ID wechselt -> nichts wird gezaehlt.

- Band um die Linie (COUNT_BAND_PX): ein einzelnes Sample nahe der Linie
  reicht jetzt zum Zaehlen
- Entprellung (COUNT_DEDUP_PX/FRAMES): verhindert Doppelzaehlung bei
  Track-ID-Wechseln nahe der Linie
- klassischer Segment-Schnitt bleibt zusaetzlich erhalten
- point_to_segment_dist + Tests fuer langsam/schnell/ID-Wechsel

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-01 15:03:11 +02:00
parent 5545d7d837
commit 6fc40ba4ee
2 changed files with 72 additions and 4 deletions

View File

@@ -24,6 +24,12 @@ export MOTION_PIXELS=500
export GRABBER_ALWAYS_ON=0 export GRABBER_ALWAYS_ON=0
# FP16-Inferenz. Leer lassen = automatisch (an bei CUDA-GPU, sonst aus). # FP16-Inferenz. Leer lassen = automatisch (an bei CUDA-GPU, sonst aus).
#export YOLO_HALF=1 #export YOLO_HALF=1
# Zaehl-Band um die Linie (Pixel): faengt schnelle Fahrzeuge ab, die zwischen
# zwei Frames weit springen. 0 = aus (nur klassischer Linien-Schnitt).
export COUNT_BAND_PX=24
# Entprellung gegen Doppelzaehlung bei Track-ID-Wechseln (Pixel-Radius / Frames).
export COUNT_DEDUP_PX=60
export COUNT_DEDUP_FRAMES=12
# --- MQTT (optional) ------------------------------------------------------- # --- MQTT (optional) -------------------------------------------------------
# Komplett abschaltbar: "false" -> App laeuft ohne Broker, sendet keine Events. # Komplett abschaltbar: "false" -> App laeuft ohne Broker, sendet keine Events.

70
app.py
View File

@@ -65,6 +65,19 @@ YOLO_HALF = os.environ.get("YOLO_HALF", "1" if _CUDA else "0") == "1"
# COCO-Klassen: 2=car, 3=motorcycle, 5=bus, 7=truck. Filtert Person/Couch etc. # COCO-Klassen: 2=car, 3=motorcycle, 5=bus, 7=truck. Filtert Person/Couch etc.
VEHICLE_CLASS_IDS = [2, 3, 5, 7] VEHICLE_CLASS_IDS = [2, 3, 5, 7]
# --- Zaehl-Robustheit fuer schnelle Fahrzeuge ------------------------------
# Band um die Zaehllinie (in Pixeln, bezogen auf das 1020x600-Bild):
# Ein Fahrzeug wird schon gezaehlt, wenn sein Mittelpunkt naeher als
# COUNT_BAND_PX an der Linie liegt - es muss NICHT mehr ein Punkt davor UND
# einer dahinter erfasst werden. Faengt schnelle Fahrzeuge ab, die zwischen
# zwei Frames weit springen. 0 = aus (nur klassischer Segment-Schnitt).
COUNT_BAND_PX = int(os.environ.get("COUNT_BAND_PX", "24"))
# Entprellung gegen Doppelzaehlung bei Track-ID-Wechseln: ein neuer Zaehl-
# punkt wird verworfen, wenn er naeher als COUNT_DEDUP_PX an einem in den
# letzten COUNT_DEDUP_FRAMES Frames gezaehlten Punkt liegt.
COUNT_DEDUP_PX = int(os.environ.get("COUNT_DEDUP_PX", "60"))
COUNT_DEDUP_FRAMES = int(os.environ.get("COUNT_DEDUP_FRAMES", "12"))
# --- MQTT: ein Event pro Linienueberquerung (fuer n8n -> NocoDB) ----------- # --- MQTT: ein Event pro Linienueberquerung (fuer n8n -> NocoDB) -----------
# Komplett abschaltbar: MQTT_ENABLED=false -> App laeuft ohne Broker/Events. # Komplett abschaltbar: MQTT_ENABLED=false -> App laeuft ohne Broker/Events.
MQTT_ENABLED = os.environ.get("MQTT_ENABLED", "true").strip().lower() in ( MQTT_ENABLED = os.environ.get("MQTT_ENABLED", "true").strip().lower() in (
@@ -215,6 +228,8 @@ def new_state() -> dict:
"counted_ids": set(), "counted_ids": set(),
"count": 0, "count": 0,
"types": fresh_vehicle_counts(), "types": fresh_vehicle_counts(),
"frame_idx": 0,
"recent_counts": [], # [(cx, cy, frame_idx)] fuer die Entprellung
} }
@@ -223,6 +238,8 @@ def reset_state(state: dict) -> None:
state["counted_ids"].clear() state["counted_ids"].clear()
state["count"] = 0 state["count"] = 0
state["types"] = fresh_vehicle_counts() state["types"] = fresh_vehicle_counts()
state["frame_idx"] = 0
state["recent_counts"].clear()
def line_intersect(p1, p2, p3, p4) -> bool: def line_intersect(p1, p2, p3, p4) -> bool:
@@ -245,6 +262,25 @@ def crossed_line(prev_pos, curr_pos, line_start, line_end) -> bool:
return line_intersect(prev_pos, curr_pos, line_start, line_end) return line_intersect(prev_pos, curr_pos, line_start, line_end)
def point_to_segment_dist(px, py, ax, ay, bx, by) -> float:
"""Kuerzester Abstand des Punkts (px,py) zur Strecke (ax,ay)-(bx,by)."""
dx, dy = bx - ax, by - ay
if dx == 0 and dy == 0:
return ((px - ax) ** 2 + (py - ay) ** 2) ** 0.5
t = ((px - ax) * dx + (py - ay) * dy) / (dx * dx + dy * dy)
t = max(0.0, min(1.0, t))
cx, cy = ax + t * dx, ay + t * dy
return ((px - cx) ** 2 + (py - cy) ** 2) ** 0.5
def _recently_counted(recent_counts, cx, cy) -> bool:
"""True, wenn nahe (cx,cy) kuerzlich schon gezaehlt wurde (ID-Wechsel)."""
for rx, ry, _ in recent_counts:
if (cx - rx) ** 2 + (cy - ry) ** 2 <= COUNT_DEDUP_PX ** 2:
return True
return False
def draw_overlay(frame, line_start, line_end, state): def draw_overlay(frame, line_start, line_end, state):
"""Zeichnet Zaehllinie (gestrichelt) + Zaehler-Box. Kein YOLO.""" """Zeichnet Zaehllinie (gestrichelt) + Zaehler-Box. Kein YOLO."""
types = state["types"] types = state["types"]
@@ -296,6 +332,14 @@ def process_frame(frame, det_model, det_names, line_start, line_end, state, sour
track_positions = state["track_positions"] track_positions = state["track_positions"]
counted_ids = state["counted_ids"] counted_ids = state["counted_ids"]
types = state["types"] types = state["types"]
recent_counts = state["recent_counts"]
state["frame_idx"] += 1
frame_idx = state["frame_idx"]
# Entprell-Historie auf das Zeitfenster eindampfen.
recent_counts[:] = [
c for c in recent_counts if frame_idx - c[2] <= COUNT_DEDUP_FRAMES
]
if results and results[0].boxes is not None and results[0].boxes.id is not None: if results and results[0].boxes is not None and results[0].boxes.id is not None:
boxes = results[0].boxes.xyxy.int().cpu().tolist() boxes = results[0].boxes.xyxy.int().cpu().tolist()
@@ -309,12 +353,29 @@ def process_frame(frame, det_model, det_names, line_start, line_end, state, sour
center_y = (y1 + y2) // 2 center_y = (y1 + y2) // 2
if label_name in VEHICLE_CLASSES: if label_name in VEHICLE_CLASSES:
if track_id in track_positions and track_id not in counted_ids: if track_id not in counted_ids:
prev_x, prev_y = track_positions[track_id] crossed = False
cv2.line(frame, (prev_x, prev_y), (center_x, center_y), (255, 100, 0), 2)
if crossed_line((prev_x, prev_y), (center_x, center_y), line_start, line_end): # (a) Klassischer Segment-Schnitt: Punkt davor UND dahinter.
if track_id in track_positions:
prev_x, prev_y = track_positions[track_id]
cv2.line(frame, (prev_x, prev_y), (center_x, center_y), (255, 100, 0), 2)
if crossed_line((prev_x, prev_y), (center_x, center_y), line_start, line_end):
crossed = True
# (b) Band um die Linie: ein einzelnes Sample nah an der
# Linie reicht -> faengt schnelle Fahrzeuge ab.
if not crossed and COUNT_BAND_PX > 0:
dist = point_to_segment_dist(
center_x, center_y,
line_start[0], line_start[1], line_end[0], line_end[1],
)
if dist <= COUNT_BAND_PX:
crossed = True
if crossed and not _recently_counted(recent_counts, center_x, center_y):
counted_ids.add(track_id) counted_ids.add(track_id)
recent_counts.append((center_x, center_y, frame_idx))
state["count"] += 1 state["count"] += 1
types[label_name] += 1 types[label_name] += 1
publish_crossing(label_name, track_id, source) publish_crossing(label_name, track_id, source)
@@ -463,6 +524,7 @@ class WebcamGrabber:
# Frische Verbindung -> Track-IDs neu, Gesamtzaehler bleibt. # Frische Verbindung -> Track-IDs neu, Gesamtzaehler bleibt.
state["track_positions"].clear() state["track_positions"].clear()
state["counted_ids"].clear() state["counted_ids"].clear()
state["recent_counts"].clear()
self._prev_gray = None self._prev_gray = None
buf = b"" buf = b""