import os import cv2 import numpy as np from flask import Flask, render_template, Response, request, redirect, url_for, send_from_directory, session, jsonify from ultralytics import YOLO app = Flask(__name__) app.secret_key = 'vehicle_counting_secret_key_2024' # Required for session management # Load the YOLOv8 model model = YOLO("yolo11s.pt") names = model.model.names # Vehicle classes to count VEHICLE_CLASSES = {'car', 'truck', 'bus', 'motorcycle'} # Helper function to check if two line segments intersect def line_intersect(p1, p2, p3, p4): """ Check if line segment p1-p2 intersects with line segment p3-p4 Returns True if they intersect, False otherwise """ x1, y1 = p1 x2, y2 = p2 x3, y3 = p3 x4, y4 = p4 denom = (x1 - x2) * (y3 - y4) - (y1 - y2) * (x3 - x4) if abs(denom) < 1e-10: return False t = ((x1 - x3) * (y3 - y4) - (y1 - y3) * (x3 - x4)) / denom u = -((x1 - x2) * (y1 - y3) - (y1 - y2) * (x1 - x3)) / denom return 0 <= t <= 1 and 0 <= u <= 1 def ccw(A, B, C): """Check if three points are in counter-clockwise order""" return (C[1] - A[1]) * (B[0] - A[0]) > (B[1] - A[1]) * (C[0] - A[0]) def crossed_line(prev_pos, curr_pos, line_start, line_end): """ Check if movement from prev_pos to curr_pos crossed the line. Uses orientation check - more robust for frame skipping. """ # Check if the two line segments intersect if line_intersect(prev_pos, curr_pos, line_start, line_end): return True return False @app.route('/') def index(): return render_template('index.html') @app.route('/start_webcam') def start_webcam(): # Initialize default counting line in session if not set if 'counting_line' not in session: session['counting_line'] = {'x1': 0, 'y1': 300, 'x2': 1020, 'y2': 300} return render_template('webcam.html') @app.route('/api/set_line', methods=['POST']) def set_counting_line(): """API endpoint to set the counting line coordinates""" data = request.json session['counting_line'] = { 'x1': int(data['x1']), 'y1': int(data['y1']), 'x2': int(data['x2']), 'y2': int(data['y2']) } return jsonify({'status': 'success', 'line': session['counting_line']}) @app.route('/api/get_line', methods=['GET']) def get_counting_line(): """API endpoint to get the current counting line coordinates""" if 'counting_line' not in session: session['counting_line'] = {'x1': 0, 'y1': 300, 'x2': 1020, 'y2': 300} return jsonify(session['counting_line']) @app.route('/api/reset_count', methods=['POST']) def reset_count(): """API endpoint to reset the vehicle count (requires page reload to take effect)""" # Note: Count reset requires reloading the video stream # The count is stored in the generator function's local variables return jsonify({'status': 'success', 'message': 'Please reload the page to reset the count'}) def detect_objects_from_webcam(line_data): count = 0 cap = cv2.VideoCapture(0) # 0 for the default webcam # Track vehicle positions and counted IDs track_positions = {} # {track_id: (center_x, center_y)} counted_ids = set() vehicle_count = 0 vehicle_type_counts = {'car': 0, 'truck': 0, 'bus': 0, 'motorcycle': 0} # Get line coordinates line_start = (line_data['x1'], line_data['y1']) line_end = (line_data['x2'], line_data['y2']) while True: ret, frame = cap.read() if not ret: break count += 1 if count % 2 != 0: continue # Resize the frame to (1020, 600) frame = cv2.resize(frame, (1020, 600)) # Run YOLOv8 tracking on the frame (BEFORE drawing the counting line!) results = model.track(frame, persist=True) if results[0].boxes is not None and results[0].boxes.id is not None: boxes = results[0].boxes.xyxy.int().cpu().tolist() class_ids = results[0].boxes.cls.int().cpu().tolist() track_ids = results[0].boxes.id.int().cpu().tolist() for box, class_id, track_id in zip(boxes, class_ids, track_ids): c = names[class_id] x1, y1, x2, y2 = box # Calculate center point of bounding box center_x = (x1 + x2) // 2 center_y = (y1 + y2) // 2 # Check if this is a vehicle we want to count if c in VEHICLE_CLASSES: # If we have a previous position for this track if track_id in track_positions and track_id not in counted_ids: prev_x, prev_y = track_positions[track_id] # Draw the movement line (orange/blue) cv2.line(frame, (prev_x, prev_y), (center_x, center_y), (255, 100, 0), 2) # Check if the vehicle crossed the counting line if crossed_line((prev_x, prev_y), (center_x, center_y), line_start, line_end): counted_ids.add(track_id) vehicle_count += 1 vehicle_type_counts[c] += 1 # Draw visual feedback when vehicle is counted (large green circle) cv2.circle(frame, (center_x, center_y), 25, (0, 255, 0), 5) # Update position track_positions[track_id] = (center_x, center_y) # Draw bounding box and label box_color = (0, 255, 0) if c in VEHICLE_CLASSES else (255, 0, 0) cv2.rectangle(frame, (x1, y1), (x2, y2), box_color, 2) # Show if already counted label = f'{track_id} - {c}' if track_id in counted_ids: label += ' ✓' cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 255), 1) # Draw center point (yellow) cv2.circle(frame, (center_x, center_y), 3, (0, 255, 255), -1) # Draw counting line AFTER YOLO detection (dashed yellow line) cv2.line(frame, line_start, line_end, (0, 255, 255), 3, cv2.LINE_AA) # Draw dashed effect line_length = int(np.sqrt((line_end[0] - line_start[0])**2 + (line_end[1] - line_start[1])**2)) dash_length = 20 for i in range(0, line_length, dash_length * 2): t1 = i / line_length t2 = min((i + dash_length) / line_length, 1.0) x1 = int(line_start[0] + t1 * (line_end[0] - line_start[0])) y1 = int(line_start[1] + t1 * (line_end[1] - line_start[1])) x2 = int(line_start[0] + t2 * (line_end[0] - line_start[0])) y2 = int(line_start[1] + t2 * (line_end[1] - line_start[1])) cv2.line(frame, (x1, y1), (x2, y2), (0, 0, 0), 3) # Display vehicle count with type breakdown cv2.rectangle(frame, (10, 10), (350, 140), (0, 0, 0), -1) cv2.putText(frame, f'Gesamt: {vehicle_count}', (20, 35), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) cv2.putText(frame, f'Autos: {vehicle_type_counts["car"]}', (20, 65), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (200, 200, 200), 1) cv2.putText(frame, f'LKW: {vehicle_type_counts["truck"]}', (20, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (200, 200, 200), 1) cv2.putText(frame, f'Busse: {vehicle_type_counts["bus"]}', (20, 115), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (200, 200, 200), 1) cv2.putText(frame, f'Motorraeder: {vehicle_type_counts["motorcycle"]}', (20, 135), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (200, 200, 200), 1) _, buffer = cv2.imencode('.jpg', frame) frame = buffer.tobytes() yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n') @app.route('/webcam_feed') def webcam_feed(): # Get line data from session before starting generator line_data = session.get('counting_line', {'x1': 0, 'y1': 300, 'x2': 1020, 'y2': 300}) return Response(detect_objects_from_webcam(line_data), mimetype='multipart/x-mixed-replace; boundary=frame') @app.route('/upload', methods=['POST']) def upload_video(): if 'file' not in request.files: return redirect(request.url) file = request.files['file'] if file.filename == '': return redirect(request.url) # Save the uploaded file to the uploads folder if not os.path.exists('uploads'): os.makedirs('uploads') file_path = os.path.join('uploads', file.filename) file.save(file_path) # Redirect to the video playback page after upload return redirect(url_for('play_video', filename=file.filename)) @app.route('/uploads/') def play_video(filename): # Initialize default counting line in session if not set if 'counting_line' not in session: session['counting_line'] = {'x1': 0, 'y1': 300, 'x2': 1020, 'y2': 300} return render_template('play_video.html', filename=filename) @app.route('/video/') def send_video(filename): return send_from_directory('uploads', filename) def detect_objects_from_video(video_path, line_data): cap = cv2.VideoCapture(video_path) count = 0 # Track vehicle positions and counted IDs track_positions = {} # {track_id: (center_x, center_y)} counted_ids = set() vehicle_count = 0 vehicle_type_counts = {'car': 0, 'truck': 0, 'bus': 0, 'motorcycle': 0} # Get line coordinates line_start = (line_data['x1'], line_data['y1']) line_end = (line_data['x2'], line_data['y2']) while cap.isOpened(): ret, frame = cap.read() if not ret: break count += 1 if count % 2 != 0: continue # Resize the frame to (1020, 600) frame = cv2.resize(frame, (1020, 600)) # Run YOLOv8 tracking on the frame (BEFORE drawing the counting line!) results = model.track(frame, persist=True) if results[0].boxes is not None and results[0].boxes.id is not None: boxes = results[0].boxes.xyxy.int().cpu().tolist() class_ids = results[0].boxes.cls.int().cpu().tolist() track_ids = results[0].boxes.id.int().cpu().tolist() for box, class_id, track_id in zip(boxes, class_ids, track_ids): c = names[class_id] x1, y1, x2, y2 = box # Calculate center point of bounding box center_x = (x1 + x2) // 2 center_y = (y1 + y2) // 2 # Check if this is a vehicle we want to count if c in VEHICLE_CLASSES: # If we have a previous position for this track if track_id in track_positions and track_id not in counted_ids: prev_x, prev_y = track_positions[track_id] # Draw the movement line (orange/blue) cv2.line(frame, (prev_x, prev_y), (center_x, center_y), (255, 100, 0), 2) # Check if the vehicle crossed the counting line if crossed_line((prev_x, prev_y), (center_x, center_y), line_start, line_end): counted_ids.add(track_id) vehicle_count += 1 vehicle_type_counts[c] += 1 # Draw visual feedback when vehicle is counted (large green circle) cv2.circle(frame, (center_x, center_y), 25, (0, 255, 0), 5) # Update position track_positions[track_id] = (center_x, center_y) # Draw bounding box and label box_color = (0, 255, 0) if c in VEHICLE_CLASSES else (255, 0, 0) cv2.rectangle(frame, (x1, y1), (x2, y2), box_color, 2) # Show if already counted label = f'{track_id} - {c}' if track_id in counted_ids: label += ' ✓' cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 255), 1) # Draw center point (yellow) cv2.circle(frame, (center_x, center_y), 3, (0, 255, 255), -1) # Draw counting line AFTER YOLO detection (dashed yellow line) cv2.line(frame, line_start, line_end, (0, 255, 255), 3, cv2.LINE_AA) # Draw dashed effect line_length = int(np.sqrt((line_end[0] - line_start[0])**2 + (line_end[1] - line_start[1])**2)) dash_length = 20 for i in range(0, line_length, dash_length * 2): t1 = i / line_length t2 = min((i + dash_length) / line_length, 1.0) x1 = int(line_start[0] + t1 * (line_end[0] - line_start[0])) y1 = int(line_start[1] + t1 * (line_end[1] - line_start[1])) x2 = int(line_start[0] + t2 * (line_end[0] - line_start[0])) y2 = int(line_start[1] + t2 * (line_end[1] - line_start[1])) cv2.line(frame, (x1, y1), (x2, y2), (0, 0, 0), 3) # Display vehicle count with type breakdown cv2.rectangle(frame, (10, 10), (350, 140), (0, 0, 0), -1) cv2.putText(frame, f'Gesamt: {vehicle_count}', (20, 35), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) cv2.putText(frame, f'Autos: {vehicle_type_counts["car"]}', (20, 65), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (200, 200, 200), 1) cv2.putText(frame, f'LKW: {vehicle_type_counts["truck"]}', (20, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (200, 200, 200), 1) cv2.putText(frame, f'Busse: {vehicle_type_counts["bus"]}', (20, 115), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (200, 200, 200), 1) cv2.putText(frame, f'Motorraeder: {vehicle_type_counts["motorcycle"]}', (20, 135), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (200, 200, 200), 1) _, buffer = cv2.imencode('.jpg', frame) frame = buffer.tobytes() yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n') @app.route('/video_feed/') def video_feed(filename): video_path = os.path.join('uploads', filename) # Get line data from session before starting generator line_data = session.get('counting_line', {'x1': 0, 'y1': 300, 'x2': 1020, 'y2': 300}) return Response(detect_objects_from_video(video_path, line_data), mimetype='multipart/x-mixed-replace; boundary=frame') if __name__ == '__main__': app.run('0.0.0.0',debug=False, port=8080)