V3Test / yolo_detection.py
assentian1970's picture
Update yolo_detection.py
390fae8 verified
from ultralytics import YOLO
import cv2
import numpy as np
import tempfile
import os
# Initialize YOLO model
YOLO_MODEL = YOLO('./best_yolov11.pt')
def detect_people_and_machinery(media_path):
"""Detect people and machinery using YOLOv11 for both images and videos"""
try:
# Initialize counters with maximum values
max_people_count = 0
max_machine_types = {
"Tower Crane": 0,
"Mobile Crane": 0,
"Compactor/Roller": 0,
"Bulldozer": 0,
"Excavator": 0,
"Dump Truck": 0,
"Concrete Mixer": 0,
"Loader": 0,
"Pump Truck": 0,
"Pile Driver": 0,
"Grader": 0,
"Other Vehicle": 0
}
# Check if input is video
if isinstance(media_path, str) and is_video(media_path):
cap = cv2.VideoCapture(media_path)
fps = cap.get(cv2.CAP_PROP_FPS)
sample_rate = max(1, int(fps)) # Sample 1 frame per second
frame_count = 0 # Initialize frame counter
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# Process every nth frame based on sample rate
if frame_count % sample_rate == 0:
results = YOLO_MODEL(frame)
people, _, machine_types = process_yolo_results(results)
# Update maximum counts
max_people_count = max(max_people_count, people)
for k, v in machine_types.items():
max_machine_types[k] = max(max_machine_types[k], v)
frame_count += 1
cap.release()
else:
# Handle single image
if isinstance(media_path, str):
img = cv2.imread(media_path)
else:
# Handle PIL Image
img = cv2.cvtColor(np.array(media_path), cv2.COLOR_RGB2BGR)
results = YOLO_MODEL(img)
max_people_count, _, max_machine_types = process_yolo_results(results)
# Filter out machinery types with zero count
max_machine_types = {k: v for k, v in max_machine_types.items() if v > 0}
total_machinery_count = sum(max_machine_types.values())
return max_people_count, total_machinery_count, max_machine_types
except Exception as e:
print(f"Error in YOLO detection: {str(e)}")
return 0, 0, {}
def process_yolo_results(results):
"""Process YOLO detection results and count people and machinery"""
people_count = 0
machine_types = {
"Tower Crane": 0,
"Mobile Crane": 0,
"Compactor/Roller": 0,
"Bulldozer": 0,
"Excavator": 0,
"Dump Truck": 0,
"Concrete Mixer": 0,
"Loader": 0,
"Pump Truck": 0,
"Pile Driver": 0,
"Grader": 0,
"Other Vehicle": 0
}
# Process detection results
for r in results:
boxes = r.boxes
for box in boxes:
cls = int(box.cls[0])
conf = float(box.conf[0])
class_name = YOLO_MODEL.names[cls]
# Count people (Worker class)
if class_name.lower() == 'worker' and conf > 0.5:
people_count += 1
# Map YOLO classes to machinery types
machinery_mapping = {
'tower_crane': "Tower Crane",
'mobile_crane': "Mobile Crane",
'compactor': "Compactor/Roller",
'roller': "Compactor/Roller",
'bulldozer': "Bulldozer",
'dozer': "Bulldozer",
'excavator': "Excavator",
'dump_truck': "Dump Truck",
'truck': "Dump Truck",
'concrete_mixer_truck': "Concrete Mixer",
'loader': "Loader",
'pump_truck': "Pump Truck",
'pile_driver': "Pile Driver",
'grader': "Grader",
'other_vehicle': "Other Vehicle"
}
# Count machinery
if conf > 0.5:
class_lower = class_name.lower()
for key, value in machinery_mapping.items():
if key in class_lower:
machine_types[value] += 1
break
total_machinery = sum(machine_types.values())
return people_count, total_machinery, machine_types
def annotate_video_with_bboxes(video_path):
"""
Reads the entire video frame-by-frame, runs YOLO, draws bounding boxes,
writes a per-frame summary of detected classes on the frame, and saves
as a new annotated video. Returns: annotated_video_path
"""
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# Create a temp file for output
out_file = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False)
annotated_video_path = out_file.name
out_file.close()
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
writer = cv2.VideoWriter(annotated_video_path, fourcc, fps, (w, h))
while True:
ret, frame = cap.read()
if not ret:
break
results = YOLO_MODEL(frame)
# Dictionary to hold per-frame counts of each class
frame_counts = {}
for r in results:
boxes = r.boxes
for box in boxes:
cls_id = int(box.cls[0])
conf = float(box.conf[0])
if conf < 0.5:
continue # Skip low-confidence
x1, y1, x2, y2 = box.xyxy[0]
class_name = YOLO_MODEL.names[cls_id]
# Convert to int
x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
# Draw bounding box
color = (0, 255, 0)
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
label_text = f"{class_name} {conf:.2f}"
cv2.putText(frame, label_text, (x1, y1 - 6),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255,255,255), 1)
# Increment per-frame class count
frame_counts[class_name] = frame_counts.get(class_name, 0) + 1
# Build a summary line, e.g. "Worker: 2, Excavator: 1, ..."
summary_str = ", ".join(f"{cls_name}: {count}"
for cls_name, count in frame_counts.items())
# Put the summary text in the top-left
cv2.putText(
frame,
summary_str,
(15, 30), # position
cv2.FONT_HERSHEY_SIMPLEX,
1.0,
(255, 255, 0),
2
)
writer.write(frame)
cap.release()
writer.release()
return annotated_video_path
def process_video_unified(media_path):
"""
Single pass YOLO processing for video.
Detects people/machinery, calculates max counts, and generates an annotated video.
Returns: max_people_count, total_machinery_count, max_machine_types, annotated_video_path
"""
max_people_count = 0
max_machine_types = {
"Tower Crane": 0, "Mobile Crane": 0, "Compactor/Roller": 0, "Bulldozer": 0,
"Excavator": 0, "Dump Truck": 0, "Concrete Mixer": 0, "Loader": 0,
"Pump Truck": 0, "Pile Driver": 0, "Grader": 0, "Other Vehicle": 0
}
annotated_video_path = None
try:
cap = cv2.VideoCapture(media_path)
if not cap.isOpened():
print(f"Error: Could not open video file {media_path}")
return 0, 0, {}, None
fps = cap.get(cv2.CAP_PROP_FPS)
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
sample_rate = max(1, int(fps)) # Sample 1 frame per second
frame_count = 0
# Create a temp file for output annotated video
out_file = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False)
annotated_video_path = out_file.name
out_file.close()
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
writer = cv2.VideoWriter(annotated_video_path, fourcc, fps, (w, h))
while True:
ret, frame = cap.read()
if not ret:
break
# Process every nth frame based on sample rate for stats, but annotate every frame
if frame_count % sample_rate == 0:
results = YOLO_MODEL(frame) # Run detection
# --- Calculate Max Counts ---
people, _, machine_types = process_yolo_results(results)
max_people_count = max(max_people_count, people)
for k, v in machine_types.items():
if k in max_machine_types: # Ensure key exists
max_machine_types[k] = max(max_machine_types.get(k, 0), v)
# --- Annotate Frame (using the same results) ---
frame_counts = {} # For summary text on this frame
annotated_frame = frame.copy() # Work on a copy for annotation
for r in results:
boxes = r.boxes
for box in boxes:
cls_id = int(box.cls[0])
conf = float(box.conf[0])
if conf < 0.5: continue
x1, y1, x2, y2 = map(int, box.xyxy[0])
class_name = YOLO_MODEL.names[cls_id]
# Draw bounding box
color = (0, 255, 0)
cv2.rectangle(annotated_frame, (x1, y1), (x2, y2), color, 2)
label_text = f"{class_name} {conf:.2f}"
cv2.putText(annotated_frame, label_text, (x1, y1 - 6),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
# Increment per-frame class count for summary
frame_counts[class_name] = frame_counts.get(class_name, 0) + 1
# Build and draw summary string for the frame
summary_str = ", ".join(f"{cls}: {cnt}" for cls, cnt in frame_counts.items())
cv2.putText(annotated_frame, summary_str, (15, 30),
cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 255, 0), 2)
writer.write(annotated_frame) # Write annotated frame
else:
# If not sampling this frame for stats, still write original frame to keep video length correct
# Or optionally, run detection+annotation anyway if performance allows and annotation is desired for all frames
# For now, let's just write the original frame to maintain sync
writer.write(frame)
frame_count += 1
cap.release()
writer.release()
# Filter out zero counts from max_machine_types
max_machine_types = {k: v for k, v in max_machine_types.items() if v > 0}
total_machinery_count = sum(max_machine_types.values())
print(f"Unified processing complete. People: {max_people_count}, Machinery: {total_machinery_count}, Types: {max_machine_types}")
return max_people_count, total_machinery_count, max_machine_types, annotated_video_path
except Exception as e:
print(f"Error in unified YOLO video processing: {str(e)}")
# Clean up potentially created temp file on error
if annotated_video_path and os.path.exists(annotated_video_path):
try:
os.remove(annotated_video_path)
except OSError:
pass # Ignore error during cleanup
return 0, 0, {}, None
# File type validation
IMAGE_EXTENSIONS = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.webp'}
VIDEO_EXTENSIONS = {'.mp4', '.mkv', '.mov', '.avi', '.flv', '.wmv', '.webm', '.m4v'}
def get_file_extension(filename):
return os.path.splitext(filename)[1].lower()
def is_image(filename):
return get_file_extension(filename) in IMAGE_EXTENSIONS
def is_video(filename):
return get_file_extension(filename) in VIDEO_EXTENSIONS