Spaces:
Sleeping
Sleeping
from ultralytics import YOLO | |
import cv2 | |
import numpy as np | |
import tempfile | |
import os | |
# Initialize YOLO model | |
YOLO_MODEL = YOLO('./best_yolov11.pt') | |
def detect_people_and_machinery(media_path): | |
"""Detect people and machinery using YOLOv11 for both images and videos""" | |
try: | |
# Initialize counters with maximum values | |
max_people_count = 0 | |
max_machine_types = { | |
"Tower Crane": 0, | |
"Mobile Crane": 0, | |
"Compactor/Roller": 0, | |
"Bulldozer": 0, | |
"Excavator": 0, | |
"Dump Truck": 0, | |
"Concrete Mixer": 0, | |
"Loader": 0, | |
"Pump Truck": 0, | |
"Pile Driver": 0, | |
"Grader": 0, | |
"Other Vehicle": 0 | |
} | |
# Check if input is video | |
if isinstance(media_path, str) and is_video(media_path): | |
cap = cv2.VideoCapture(media_path) | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
sample_rate = max(1, int(fps)) # Sample 1 frame per second | |
frame_count = 0 # Initialize frame counter | |
while cap.isOpened(): | |
ret, frame = cap.read() | |
if not ret: | |
break | |
# Process every nth frame based on sample rate | |
if frame_count % sample_rate == 0: | |
results = YOLO_MODEL(frame) | |
people, _, machine_types = process_yolo_results(results) | |
# Update maximum counts | |
max_people_count = max(max_people_count, people) | |
for k, v in machine_types.items(): | |
max_machine_types[k] = max(max_machine_types[k], v) | |
frame_count += 1 | |
cap.release() | |
else: | |
# Handle single image | |
if isinstance(media_path, str): | |
img = cv2.imread(media_path) | |
else: | |
# Handle PIL Image | |
img = cv2.cvtColor(np.array(media_path), cv2.COLOR_RGB2BGR) | |
results = YOLO_MODEL(img) | |
max_people_count, _, max_machine_types = process_yolo_results(results) | |
# Filter out machinery types with zero count | |
max_machine_types = {k: v for k, v in max_machine_types.items() if v > 0} | |
total_machinery_count = sum(max_machine_types.values()) | |
return max_people_count, total_machinery_count, max_machine_types | |
except Exception as e: | |
print(f"Error in YOLO detection: {str(e)}") | |
return 0, 0, {} | |
def process_yolo_results(results): | |
"""Process YOLO detection results and count people and machinery""" | |
people_count = 0 | |
machine_types = { | |
"Tower Crane": 0, | |
"Mobile Crane": 0, | |
"Compactor/Roller": 0, | |
"Bulldozer": 0, | |
"Excavator": 0, | |
"Dump Truck": 0, | |
"Concrete Mixer": 0, | |
"Loader": 0, | |
"Pump Truck": 0, | |
"Pile Driver": 0, | |
"Grader": 0, | |
"Other Vehicle": 0 | |
} | |
# Process detection results | |
for r in results: | |
boxes = r.boxes | |
for box in boxes: | |
cls = int(box.cls[0]) | |
conf = float(box.conf[0]) | |
class_name = YOLO_MODEL.names[cls] | |
# Count people (Worker class) | |
if class_name.lower() == 'worker' and conf > 0.5: | |
people_count += 1 | |
# Map YOLO classes to machinery types | |
machinery_mapping = { | |
'tower_crane': "Tower Crane", | |
'mobile_crane': "Mobile Crane", | |
'compactor': "Compactor/Roller", | |
'roller': "Compactor/Roller", | |
'bulldozer': "Bulldozer", | |
'dozer': "Bulldozer", | |
'excavator': "Excavator", | |
'dump_truck': "Dump Truck", | |
'truck': "Dump Truck", | |
'concrete_mixer_truck': "Concrete Mixer", | |
'loader': "Loader", | |
'pump_truck': "Pump Truck", | |
'pile_driver': "Pile Driver", | |
'grader': "Grader", | |
'other_vehicle': "Other Vehicle" | |
} | |
# Count machinery | |
if conf > 0.5: | |
class_lower = class_name.lower() | |
for key, value in machinery_mapping.items(): | |
if key in class_lower: | |
machine_types[value] += 1 | |
break | |
total_machinery = sum(machine_types.values()) | |
return people_count, total_machinery, machine_types | |
def annotate_video_with_bboxes(video_path): | |
""" | |
Reads the entire video frame-by-frame, runs YOLO, draws bounding boxes, | |
writes a per-frame summary of detected classes on the frame, and saves | |
as a new annotated video. Returns: annotated_video_path | |
""" | |
cap = cv2.VideoCapture(video_path) | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
# Create a temp file for output | |
out_file = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) | |
annotated_video_path = out_file.name | |
out_file.close() | |
fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
writer = cv2.VideoWriter(annotated_video_path, fourcc, fps, (w, h)) | |
while True: | |
ret, frame = cap.read() | |
if not ret: | |
break | |
results = YOLO_MODEL(frame) | |
# Dictionary to hold per-frame counts of each class | |
frame_counts = {} | |
for r in results: | |
boxes = r.boxes | |
for box in boxes: | |
cls_id = int(box.cls[0]) | |
conf = float(box.conf[0]) | |
if conf < 0.5: | |
continue # Skip low-confidence | |
x1, y1, x2, y2 = box.xyxy[0] | |
class_name = YOLO_MODEL.names[cls_id] | |
# Convert to int | |
x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2) | |
# Draw bounding box | |
color = (0, 255, 0) | |
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) | |
label_text = f"{class_name} {conf:.2f}" | |
cv2.putText(frame, label_text, (x1, y1 - 6), | |
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1) | |
# Increment per-frame class count | |
frame_counts[class_name] = frame_counts.get(class_name, 0) + 1 | |
# Build a summary line, e.g. "Worker: 2, Excavator: 1, ..." | |
summary_str = ", ".join(f"{cls_name}: {count}" | |
for cls_name, count in frame_counts.items()) | |
# Put the summary text in the top-left | |
cv2.putText( | |
frame, | |
summary_str, | |
(15, 30), # position | |
cv2.FONT_HERSHEY_SIMPLEX, | |
1.0, | |
(255, 255, 0), | |
2 | |
) | |
writer.write(frame) | |
cap.release() | |
writer.release() | |
return annotated_video_path | |
def process_video_unified(media_path): | |
""" | |
Single pass YOLO processing for video. | |
Detects people/machinery, calculates max counts, and generates an annotated video. | |
Returns: max_people_count, total_machinery_count, max_machine_types, annotated_video_path | |
""" | |
max_people_count = 0 | |
max_machine_types = { | |
"Tower Crane": 0, "Mobile Crane": 0, "Compactor/Roller": 0, "Bulldozer": 0, | |
"Excavator": 0, "Dump Truck": 0, "Concrete Mixer": 0, "Loader": 0, | |
"Pump Truck": 0, "Pile Driver": 0, "Grader": 0, "Other Vehicle": 0 | |
} | |
annotated_video_path = None | |
try: | |
cap = cv2.VideoCapture(media_path) | |
if not cap.isOpened(): | |
print(f"Error: Could not open video file {media_path}") | |
return 0, 0, {}, None | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
sample_rate = max(1, int(fps)) # Sample 1 frame per second | |
frame_count = 0 | |
# Create a temp file for output annotated video | |
out_file = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) | |
annotated_video_path = out_file.name | |
out_file.close() | |
fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
writer = cv2.VideoWriter(annotated_video_path, fourcc, fps, (w, h)) | |
while True: | |
ret, frame = cap.read() | |
if not ret: | |
break | |
# Process every nth frame based on sample rate for stats, but annotate every frame | |
if frame_count % sample_rate == 0: | |
results = YOLO_MODEL(frame) # Run detection | |
# --- Calculate Max Counts --- | |
people, _, machine_types = process_yolo_results(results) | |
max_people_count = max(max_people_count, people) | |
for k, v in machine_types.items(): | |
if k in max_machine_types: # Ensure key exists | |
max_machine_types[k] = max(max_machine_types.get(k, 0), v) | |
# --- Annotate Frame (using the same results) --- | |
frame_counts = {} # For summary text on this frame | |
annotated_frame = frame.copy() # Work on a copy for annotation | |
for r in results: | |
boxes = r.boxes | |
for box in boxes: | |
cls_id = int(box.cls[0]) | |
conf = float(box.conf[0]) | |
if conf < 0.5: continue | |
x1, y1, x2, y2 = map(int, box.xyxy[0]) | |
class_name = YOLO_MODEL.names[cls_id] | |
# Draw bounding box | |
color = (0, 255, 0) | |
cv2.rectangle(annotated_frame, (x1, y1), (x2, y2), color, 2) | |
label_text = f"{class_name} {conf:.2f}" | |
cv2.putText(annotated_frame, label_text, (x1, y1 - 6), | |
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) | |
# Increment per-frame class count for summary | |
frame_counts[class_name] = frame_counts.get(class_name, 0) + 1 | |
# Build and draw summary string for the frame | |
summary_str = ", ".join(f"{cls}: {cnt}" for cls, cnt in frame_counts.items()) | |
cv2.putText(annotated_frame, summary_str, (15, 30), | |
cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 255, 0), 2) | |
writer.write(annotated_frame) # Write annotated frame | |
else: | |
# If not sampling this frame for stats, still write original frame to keep video length correct | |
# Or optionally, run detection+annotation anyway if performance allows and annotation is desired for all frames | |
# For now, let's just write the original frame to maintain sync | |
writer.write(frame) | |
frame_count += 1 | |
cap.release() | |
writer.release() | |
# Filter out zero counts from max_machine_types | |
max_machine_types = {k: v for k, v in max_machine_types.items() if v > 0} | |
total_machinery_count = sum(max_machine_types.values()) | |
print(f"Unified processing complete. People: {max_people_count}, Machinery: {total_machinery_count}, Types: {max_machine_types}") | |
return max_people_count, total_machinery_count, max_machine_types, annotated_video_path | |
except Exception as e: | |
print(f"Error in unified YOLO video processing: {str(e)}") | |
# Clean up potentially created temp file on error | |
if annotated_video_path and os.path.exists(annotated_video_path): | |
try: | |
os.remove(annotated_video_path) | |
except OSError: | |
pass # Ignore error during cleanup | |
return 0, 0, {}, None | |
# File type validation | |
IMAGE_EXTENSIONS = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.webp'} | |
VIDEO_EXTENSIONS = {'.mp4', '.mkv', '.mov', '.avi', '.flv', '.wmv', '.webm', '.m4v'} | |
def get_file_extension(filename): | |
return os.path.splitext(filename)[1].lower() | |
def is_image(filename): | |
return get_file_extension(filename) in IMAGE_EXTENSIONS | |
def is_video(filename): | |
return get_file_extension(filename) in VIDEO_EXTENSIONS | |