Implementiere Fahrzeugzählung mit Linienüberquerung
- Linienschnitt-Algorithmus für präzise Fahrzeugzählung - Interaktive Linienauswahl im Browser (Canvas-basiert) - Session-Management für benutzerdefinierte Zähllinien - Typ-spezifische Zähler (Autos, LKW, Busse, Motorräder) - REST-API für Linienkonfiguration und Zähler-Reset - Gestrichelte Zähllinie als Video-Overlay - Detailliertes Zähler-Display im Video Features: - Linienüberquerung-Erkennung (beide Richtungen) - Keine Mehrfachzählung durch Track-ID-Management - Funktioniert für Webcam und Video-Upload - Benutzerfreundliche UI mit Echtzeit-Feedback 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
304
app.py
Normal file
304
app.py
Normal file
@@ -0,0 +1,304 @@
|
||||
import os
|
||||
import cv2
|
||||
import numpy as np
|
||||
from flask import Flask, render_template, Response, request, redirect, url_for, send_from_directory, session, jsonify
|
||||
from ultralytics import YOLO
|
||||
|
||||
app = Flask(__name__)
|
||||
app.secret_key = 'vehicle_counting_secret_key_2024' # Required for session management
|
||||
|
||||
# Load the YOLOv8 model
|
||||
model = YOLO("yolo11s.pt")
|
||||
names = model.model.names
|
||||
|
||||
# Vehicle classes to count
|
||||
VEHICLE_CLASSES = {'car', 'truck', 'bus', 'motorcycle'}
|
||||
|
||||
# Helper function to check if two line segments intersect
|
||||
def line_intersect(p1, p2, p3, p4):
|
||||
"""
|
||||
Check if line segment p1-p2 intersects with line segment p3-p4
|
||||
Returns True if they intersect, False otherwise
|
||||
"""
|
||||
x1, y1 = p1
|
||||
x2, y2 = p2
|
||||
x3, y3 = p3
|
||||
x4, y4 = p4
|
||||
|
||||
denom = (x1 - x2) * (y3 - y4) - (y1 - y2) * (x3 - x4)
|
||||
if abs(denom) < 1e-10:
|
||||
return False
|
||||
|
||||
t = ((x1 - x3) * (y3 - y4) - (y1 - y3) * (x3 - x4)) / denom
|
||||
u = -((x1 - x2) * (y1 - y3) - (y1 - y2) * (x1 - x3)) / denom
|
||||
|
||||
return 0 <= t <= 1 and 0 <= u <= 1
|
||||
|
||||
@app.route('/')
|
||||
def index():
|
||||
return render_template('index.html')
|
||||
|
||||
@app.route('/start_webcam')
|
||||
def start_webcam():
|
||||
# Initialize default counting line in session if not set
|
||||
if 'counting_line' not in session:
|
||||
session['counting_line'] = {'x1': 0, 'y1': 300, 'x2': 1020, 'y2': 300}
|
||||
return render_template('webcam.html')
|
||||
|
||||
@app.route('/api/set_line', methods=['POST'])
|
||||
def set_counting_line():
|
||||
"""API endpoint to set the counting line coordinates"""
|
||||
data = request.json
|
||||
session['counting_line'] = {
|
||||
'x1': int(data['x1']),
|
||||
'y1': int(data['y1']),
|
||||
'x2': int(data['x2']),
|
||||
'y2': int(data['y2'])
|
||||
}
|
||||
return jsonify({'status': 'success', 'line': session['counting_line']})
|
||||
|
||||
@app.route('/api/get_line', methods=['GET'])
|
||||
def get_counting_line():
|
||||
"""API endpoint to get the current counting line coordinates"""
|
||||
if 'counting_line' not in session:
|
||||
session['counting_line'] = {'x1': 0, 'y1': 300, 'x2': 1020, 'y2': 300}
|
||||
return jsonify(session['counting_line'])
|
||||
|
||||
@app.route('/api/reset_count', methods=['POST'])
|
||||
def reset_count():
|
||||
"""API endpoint to reset the vehicle count"""
|
||||
session['reset_count'] = True
|
||||
return jsonify({'status': 'success'})
|
||||
|
||||
def detect_objects_from_webcam():
|
||||
count = 0
|
||||
cap = cv2.VideoCapture(0) # 0 for the default webcam
|
||||
|
||||
# Track vehicle positions and counted IDs
|
||||
track_positions = {} # {track_id: (center_x, center_y)}
|
||||
counted_ids = set()
|
||||
vehicle_count = 0
|
||||
vehicle_type_counts = {'car': 0, 'truck': 0, 'bus': 0, 'motorcycle': 0}
|
||||
|
||||
while True:
|
||||
ret, frame = cap.read()
|
||||
if not ret:
|
||||
break
|
||||
count += 1
|
||||
if count % 2 != 0:
|
||||
continue
|
||||
# Resize the frame to (1020, 600)
|
||||
frame = cv2.resize(frame, (1020, 600))
|
||||
|
||||
# Get counting line from session (default to horizontal middle line)
|
||||
line_data = session.get('counting_line', {'x1': 0, 'y1': 300, 'x2': 1020, 'y2': 300})
|
||||
line_start = (line_data['x1'], line_data['y1'])
|
||||
line_end = (line_data['x2'], line_data['y2'])
|
||||
|
||||
# Check if count should be reset
|
||||
if session.get('reset_count', False):
|
||||
counted_ids.clear()
|
||||
vehicle_count = 0
|
||||
vehicle_type_counts = {'car': 0, 'truck': 0, 'bus': 0, 'motorcycle': 0}
|
||||
session['reset_count'] = False
|
||||
|
||||
# Draw counting line (dashed yellow line with black gaps)
|
||||
cv2.line(frame, line_start, line_end, (0, 255, 255), 2, cv2.LINE_AA)
|
||||
# Draw dashed effect
|
||||
line_length = int(np.sqrt((line_end[0] - line_start[0])**2 + (line_end[1] - line_start[1])**2))
|
||||
dash_length = 20
|
||||
for i in range(0, line_length, dash_length * 2):
|
||||
t1 = i / line_length
|
||||
t2 = min((i + dash_length) / line_length, 1.0)
|
||||
x1 = int(line_start[0] + t1 * (line_end[0] - line_start[0]))
|
||||
y1 = int(line_start[1] + t1 * (line_end[1] - line_start[1]))
|
||||
x2 = int(line_start[0] + t2 * (line_end[0] - line_start[0]))
|
||||
y2 = int(line_start[1] + t2 * (line_end[1] - line_start[1]))
|
||||
cv2.line(frame, (x1, y1), (x2, y2), (0, 0, 0), 2)
|
||||
|
||||
# Run YOLOv8 tracking on the frame
|
||||
results = model.track(frame, persist=True)
|
||||
|
||||
if results[0].boxes is not None and results[0].boxes.id is not None:
|
||||
boxes = results[0].boxes.xyxy.int().cpu().tolist()
|
||||
class_ids = results[0].boxes.cls.int().cpu().tolist()
|
||||
track_ids = results[0].boxes.id.int().cpu().tolist()
|
||||
|
||||
for box, class_id, track_id in zip(boxes, class_ids, track_ids):
|
||||
c = names[class_id]
|
||||
x1, y1, x2, y2 = box
|
||||
|
||||
# Calculate center point of bounding box
|
||||
center_x = (x1 + x2) // 2
|
||||
center_y = (y1 + y2) // 2
|
||||
|
||||
# Check if this is a vehicle we want to count
|
||||
if c in VEHICLE_CLASSES:
|
||||
# If we have a previous position for this track
|
||||
if track_id in track_positions and track_id not in counted_ids:
|
||||
prev_x, prev_y = track_positions[track_id]
|
||||
# Check if the vehicle crossed the counting line
|
||||
if line_intersect((prev_x, prev_y), (center_x, center_y), line_start, line_end):
|
||||
counted_ids.add(track_id)
|
||||
vehicle_count += 1
|
||||
vehicle_type_counts[c] += 1
|
||||
|
||||
# Update position
|
||||
track_positions[track_id] = (center_x, center_y)
|
||||
|
||||
# Draw bounding box and label
|
||||
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
|
||||
cv2.putText(frame, f'{track_id} - {c}', (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 255), 1)
|
||||
|
||||
# Display vehicle count with type breakdown
|
||||
cv2.rectangle(frame, (10, 10), (350, 140), (0, 0, 0), -1)
|
||||
cv2.putText(frame, f'Gesamt: {vehicle_count}', (20, 35), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
|
||||
cv2.putText(frame, f'Autos: {vehicle_type_counts["car"]}', (20, 65), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (200, 200, 200), 1)
|
||||
cv2.putText(frame, f'LKW: {vehicle_type_counts["truck"]}', (20, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (200, 200, 200), 1)
|
||||
cv2.putText(frame, f'Busse: {vehicle_type_counts["bus"]}', (20, 115), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (200, 200, 200), 1)
|
||||
cv2.putText(frame, f'Motorraeder: {vehicle_type_counts["motorcycle"]}', (20, 135), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (200, 200, 200), 1)
|
||||
|
||||
_, buffer = cv2.imencode('.jpg', frame)
|
||||
frame = buffer.tobytes()
|
||||
|
||||
yield (b'--frame\r\n'
|
||||
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')
|
||||
|
||||
@app.route('/webcam_feed')
|
||||
def webcam_feed():
|
||||
return Response(detect_objects_from_webcam(),
|
||||
mimetype='multipart/x-mixed-replace; boundary=frame')
|
||||
|
||||
@app.route('/upload', methods=['POST'])
|
||||
def upload_video():
|
||||
if 'file' not in request.files:
|
||||
return redirect(request.url)
|
||||
|
||||
file = request.files['file']
|
||||
if file.filename == '':
|
||||
return redirect(request.url)
|
||||
|
||||
# Save the uploaded file to the uploads folder
|
||||
if not os.path.exists('uploads'):
|
||||
os.makedirs('uploads')
|
||||
|
||||
file_path = os.path.join('uploads', file.filename)
|
||||
file.save(file_path)
|
||||
|
||||
# Redirect to the video playback page after upload
|
||||
return redirect(url_for('play_video', filename=file.filename))
|
||||
|
||||
@app.route('/uploads/<filename>')
|
||||
def play_video(filename):
|
||||
# Initialize default counting line in session if not set
|
||||
if 'counting_line' not in session:
|
||||
session['counting_line'] = {'x1': 0, 'y1': 300, 'x2': 1020, 'y2': 300}
|
||||
return render_template('play_video.html', filename=filename)
|
||||
|
||||
@app.route('/video/<path:filename>')
|
||||
def send_video(filename):
|
||||
return send_from_directory('uploads', filename)
|
||||
|
||||
def detect_objects_from_video(video_path):
|
||||
cap = cv2.VideoCapture(video_path)
|
||||
count = 0
|
||||
|
||||
# Track vehicle positions and counted IDs
|
||||
track_positions = {} # {track_id: (center_x, center_y)}
|
||||
counted_ids = set()
|
||||
vehicle_count = 0
|
||||
vehicle_type_counts = {'car': 0, 'truck': 0, 'bus': 0, 'motorcycle': 0}
|
||||
|
||||
while cap.isOpened():
|
||||
ret, frame = cap.read()
|
||||
if not ret:
|
||||
break
|
||||
count += 1
|
||||
if count % 2 != 0:
|
||||
continue
|
||||
|
||||
# Resize the frame to (1020, 600)
|
||||
frame = cv2.resize(frame, (1020, 600))
|
||||
|
||||
# Get counting line from session (default to horizontal middle line)
|
||||
line_data = session.get('counting_line', {'x1': 0, 'y1': 300, 'x2': 1020, 'y2': 300})
|
||||
line_start = (line_data['x1'], line_data['y1'])
|
||||
line_end = (line_data['x2'], line_data['y2'])
|
||||
|
||||
# Check if count should be reset
|
||||
if session.get('reset_count', False):
|
||||
counted_ids.clear()
|
||||
vehicle_count = 0
|
||||
vehicle_type_counts = {'car': 0, 'truck': 0, 'bus': 0, 'motorcycle': 0}
|
||||
session['reset_count'] = False
|
||||
|
||||
# Draw counting line (dashed yellow line with black gaps)
|
||||
cv2.line(frame, line_start, line_end, (0, 255, 255), 2, cv2.LINE_AA)
|
||||
# Draw dashed effect
|
||||
line_length = int(np.sqrt((line_end[0] - line_start[0])**2 + (line_end[1] - line_start[1])**2))
|
||||
dash_length = 20
|
||||
for i in range(0, line_length, dash_length * 2):
|
||||
t1 = i / line_length
|
||||
t2 = min((i + dash_length) / line_length, 1.0)
|
||||
x1 = int(line_start[0] + t1 * (line_end[0] - line_start[0]))
|
||||
y1 = int(line_start[1] + t1 * (line_end[1] - line_start[1]))
|
||||
x2 = int(line_start[0] + t2 * (line_end[0] - line_start[0]))
|
||||
y2 = int(line_start[1] + t2 * (line_end[1] - line_start[1]))
|
||||
cv2.line(frame, (x1, y1), (x2, y2), (0, 0, 0), 2)
|
||||
|
||||
# Run YOLOv8 tracking on the frame
|
||||
results = model.track(frame, persist=True)
|
||||
|
||||
if results[0].boxes is not None and results[0].boxes.id is not None:
|
||||
boxes = results[0].boxes.xyxy.int().cpu().tolist()
|
||||
class_ids = results[0].boxes.cls.int().cpu().tolist()
|
||||
track_ids = results[0].boxes.id.int().cpu().tolist()
|
||||
|
||||
for box, class_id, track_id in zip(boxes, class_ids, track_ids):
|
||||
c = names[class_id]
|
||||
x1, y1, x2, y2 = box
|
||||
|
||||
# Calculate center point of bounding box
|
||||
center_x = (x1 + x2) // 2
|
||||
center_y = (y1 + y2) // 2
|
||||
|
||||
# Check if this is a vehicle we want to count
|
||||
if c in VEHICLE_CLASSES:
|
||||
# If we have a previous position for this track
|
||||
if track_id in track_positions and track_id not in counted_ids:
|
||||
prev_x, prev_y = track_positions[track_id]
|
||||
# Check if the vehicle crossed the counting line
|
||||
if line_intersect((prev_x, prev_y), (center_x, center_y), line_start, line_end):
|
||||
counted_ids.add(track_id)
|
||||
vehicle_count += 1
|
||||
vehicle_type_counts[c] += 1
|
||||
|
||||
# Update position
|
||||
track_positions[track_id] = (center_x, center_y)
|
||||
|
||||
# Draw bounding box and label
|
||||
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
|
||||
cv2.putText(frame, f'{track_id} - {c}', (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 255), 1)
|
||||
|
||||
# Display vehicle count with type breakdown
|
||||
cv2.rectangle(frame, (10, 10), (350, 140), (0, 0, 0), -1)
|
||||
cv2.putText(frame, f'Gesamt: {vehicle_count}', (20, 35), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
|
||||
cv2.putText(frame, f'Autos: {vehicle_type_counts["car"]}', (20, 65), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (200, 200, 200), 1)
|
||||
cv2.putText(frame, f'LKW: {vehicle_type_counts["truck"]}', (20, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (200, 200, 200), 1)
|
||||
cv2.putText(frame, f'Busse: {vehicle_type_counts["bus"]}', (20, 115), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (200, 200, 200), 1)
|
||||
cv2.putText(frame, f'Motorraeder: {vehicle_type_counts["motorcycle"]}', (20, 135), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (200, 200, 200), 1)
|
||||
|
||||
_, buffer = cv2.imencode('.jpg', frame)
|
||||
frame = buffer.tobytes()
|
||||
|
||||
yield (b'--frame\r\n'
|
||||
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')
|
||||
|
||||
@app.route('/video_feed/<filename>')
|
||||
def video_feed(filename):
|
||||
video_path = os.path.join('uploads', filename)
|
||||
return Response(detect_objects_from_video(video_path),
|
||||
mimetype='multipart/x-mixed-replace; boundary=frame')
|
||||
|
||||
if __name__ == '__main__':
|
||||
app.run('0.0.0.0',debug=False, port=8080)
|
||||
Reference in New Issue
Block a user