from ultralytics import YOLO import cv2 import numpy as np import tempfile import os # Initialize YOLO model YOLO_MODEL = YOLO('./best_yolov11.pt') def detect_people_and_machinery(media_path): """Detect people and machinery using YOLOv11 for both images and videos""" try: # Initialize counters with maximum values max_people_count = 0 max_machine_types = { "Tower Crane": 0, "Mobile Crane": 0, "Compactor/Roller": 0, "Bulldozer": 0, "Excavator": 0, "Dump Truck": 0, "Concrete Mixer": 0, "Loader": 0, "Pump Truck": 0, "Pile Driver": 0, "Grader": 0, "Other Vehicle": 0 } # Check if input is video if isinstance(media_path, str) and is_video(media_path): cap = cv2.VideoCapture(media_path) fps = cap.get(cv2.CAP_PROP_FPS) sample_rate = max(1, int(fps)) # Sample 1 frame per second frame_count = 0 # Initialize frame counter while cap.isOpened(): ret, frame = cap.read() if not ret: break # Process every nth frame based on sample rate if frame_count % sample_rate == 0: results = YOLO_MODEL(frame) people, _, machine_types = process_yolo_results(results) # Update maximum counts max_people_count = max(max_people_count, people) for k, v in machine_types.items(): max_machine_types[k] = max(max_machine_types[k], v) frame_count += 1 cap.release() else: # Handle single image if isinstance(media_path, str): img = cv2.imread(media_path) else: # Handle PIL Image img = cv2.cvtColor(np.array(media_path), cv2.COLOR_RGB2BGR) results = YOLO_MODEL(img) max_people_count, _, max_machine_types = process_yolo_results(results) # Filter out machinery types with zero count max_machine_types = {k: v for k, v in max_machine_types.items() if v > 0} total_machinery_count = sum(max_machine_types.values()) return max_people_count, total_machinery_count, max_machine_types except Exception as e: print(f"Error in YOLO detection: {str(e)}") return 0, 0, {} def process_yolo_results(results): """Process YOLO detection results and count people and machinery""" people_count = 0 machine_types = { "Tower Crane": 0, "Mobile Crane": 0, "Compactor/Roller": 0, "Bulldozer": 0, "Excavator": 0, "Dump Truck": 0, "Concrete Mixer": 0, "Loader": 0, "Pump Truck": 0, "Pile Driver": 0, "Grader": 0, "Other Vehicle": 0 } # Process detection results for r in results: boxes = r.boxes for box in boxes: cls = int(box.cls[0]) conf = float(box.conf[0]) class_name = YOLO_MODEL.names[cls] # Count people (Worker class) if class_name.lower() == 'worker' and conf > 0.5: people_count += 1 # Map YOLO classes to machinery types machinery_mapping = { 'tower_crane': "Tower Crane", 'mobile_crane': "Mobile Crane", 'compactor': "Compactor/Roller", 'roller': "Compactor/Roller", 'bulldozer': "Bulldozer", 'dozer': "Bulldozer", 'excavator': "Excavator", 'dump_truck': "Dump Truck", 'truck': "Dump Truck", 'concrete_mixer_truck': "Concrete Mixer", 'loader': "Loader", 'pump_truck': "Pump Truck", 'pile_driver': "Pile Driver", 'grader': "Grader", 'other_vehicle': "Other Vehicle" } # Count machinery if conf > 0.5: class_lower = class_name.lower() for key, value in machinery_mapping.items(): if key in class_lower: machine_types[value] += 1 break total_machinery = sum(machine_types.values()) return people_count, total_machinery, machine_types def annotate_video_with_bboxes(video_path): """ Reads the entire video frame-by-frame, runs YOLO, draws bounding boxes, writes a per-frame summary of detected classes on the frame, and saves as a new annotated video. Returns: annotated_video_path """ cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # Create a temp file for output out_file = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) annotated_video_path = out_file.name out_file.close() fourcc = cv2.VideoWriter_fourcc(*'mp4v') writer = cv2.VideoWriter(annotated_video_path, fourcc, fps, (w, h)) while True: ret, frame = cap.read() if not ret: break results = YOLO_MODEL(frame) # Dictionary to hold per-frame counts of each class frame_counts = {} for r in results: boxes = r.boxes for box in boxes: cls_id = int(box.cls[0]) conf = float(box.conf[0]) if conf < 0.5: continue # Skip low-confidence x1, y1, x2, y2 = box.xyxy[0] class_name = YOLO_MODEL.names[cls_id] # Convert to int x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2) # Draw bounding box color = (0, 255, 0) cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) label_text = f"{class_name} {conf:.2f}" cv2.putText(frame, label_text, (x1, y1 - 6), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1) # Increment per-frame class count frame_counts[class_name] = frame_counts.get(class_name, 0) + 1 # Build a summary line, e.g. "Worker: 2, Excavator: 1, ..." summary_str = ", ".join(f"{cls_name}: {count}" for cls_name, count in frame_counts.items()) # Put the summary text in the top-left cv2.putText( frame, summary_str, (15, 30), # position cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 255, 0), 2 ) writer.write(frame) cap.release() writer.release() return annotated_video_path def process_video_unified(media_path): """ Single pass YOLO processing for video. Detects people/machinery, calculates max counts, and generates an annotated video. Returns: max_people_count, total_machinery_count, max_machine_types, annotated_video_path """ max_people_count = 0 max_machine_types = { "Tower Crane": 0, "Mobile Crane": 0, "Compactor/Roller": 0, "Bulldozer": 0, "Excavator": 0, "Dump Truck": 0, "Concrete Mixer": 0, "Loader": 0, "Pump Truck": 0, "Pile Driver": 0, "Grader": 0, "Other Vehicle": 0 } annotated_video_path = None try: cap = cv2.VideoCapture(media_path) if not cap.isOpened(): print(f"Error: Could not open video file {media_path}") return 0, 0, {}, None fps = cap.get(cv2.CAP_PROP_FPS) w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) sample_rate = max(1, int(fps)) # Sample 1 frame per second frame_count = 0 # Create a temp file for output annotated video out_file = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) annotated_video_path = out_file.name out_file.close() fourcc = cv2.VideoWriter_fourcc(*'mp4v') writer = cv2.VideoWriter(annotated_video_path, fourcc, fps, (w, h)) while True: ret, frame = cap.read() if not ret: break # Process every nth frame based on sample rate for stats, but annotate every frame if frame_count % sample_rate == 0: results = YOLO_MODEL(frame) # Run detection # --- Calculate Max Counts --- people, _, machine_types = process_yolo_results(results) max_people_count = max(max_people_count, people) for k, v in machine_types.items(): if k in max_machine_types: # Ensure key exists max_machine_types[k] = max(max_machine_types.get(k, 0), v) # --- Annotate Frame (using the same results) --- frame_counts = {} # For summary text on this frame annotated_frame = frame.copy() # Work on a copy for annotation for r in results: boxes = r.boxes for box in boxes: cls_id = int(box.cls[0]) conf = float(box.conf[0]) if conf < 0.5: continue x1, y1, x2, y2 = map(int, box.xyxy[0]) class_name = YOLO_MODEL.names[cls_id] # Draw bounding box color = (0, 255, 0) cv2.rectangle(annotated_frame, (x1, y1), (x2, y2), color, 2) label_text = f"{class_name} {conf:.2f}" cv2.putText(annotated_frame, label_text, (x1, y1 - 6), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) # Increment per-frame class count for summary frame_counts[class_name] = frame_counts.get(class_name, 0) + 1 # Build and draw summary string for the frame summary_str = ", ".join(f"{cls}: {cnt}" for cls, cnt in frame_counts.items()) cv2.putText(annotated_frame, summary_str, (15, 30), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 255, 0), 2) writer.write(annotated_frame) # Write annotated frame else: # If not sampling this frame for stats, still write original frame to keep video length correct # Or optionally, run detection+annotation anyway if performance allows and annotation is desired for all frames # For now, let's just write the original frame to maintain sync writer.write(frame) frame_count += 1 cap.release() writer.release() # Filter out zero counts from max_machine_types max_machine_types = {k: v for k, v in max_machine_types.items() if v > 0} total_machinery_count = sum(max_machine_types.values()) print(f"Unified processing complete. People: {max_people_count}, Machinery: {total_machinery_count}, Types: {max_machine_types}") return max_people_count, total_machinery_count, max_machine_types, annotated_video_path except Exception as e: print(f"Error in unified YOLO video processing: {str(e)}") # Clean up potentially created temp file on error if annotated_video_path and os.path.exists(annotated_video_path): try: os.remove(annotated_video_path) except OSError: pass # Ignore error during cleanup return 0, 0, {}, None # File type validation IMAGE_EXTENSIONS = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.webp'} VIDEO_EXTENSIONS = {'.mp4', '.mkv', '.mov', '.avi', '.flv', '.wmv', '.webm', '.m4v'} def get_file_extension(filename): return os.path.splitext(filename)[1].lower() def is_image(filename): return get_file_extension(filename) in IMAGE_EXTENSIONS def is_video(filename): return get_file_extension(filename) in VIDEO_EXTENSIONS