Spaces:
Runtime error
Runtime error
Update objec_detect_yolo.py
Browse files- objec_detect_yolo.py +170 -54
objec_detect_yolo.py
CHANGED
|
@@ -1,101 +1,217 @@
|
|
|
|
|
|
|
|
| 1 |
import cv2
|
| 2 |
import numpy as np
|
| 3 |
import os
|
| 4 |
from ultralytics import YOLO
|
| 5 |
import time
|
| 6 |
-
from typing import Tuple, Set
|
| 7 |
|
| 8 |
-
def
|
| 9 |
"""
|
| 10 |
-
Detects and tracks objects in a video using
|
| 11 |
|
| 12 |
Args:
|
| 13 |
-
path (str): Path to the input video file.
|
| 14 |
|
| 15 |
Returns:
|
| 16 |
-
Tuple[Set[str], str]:
|
| 17 |
- Set of unique detected object labels (e.g., {'Gun', 'Knife'})
|
| 18 |
- Path to the output annotated video with detection boxes and tracking IDs
|
|
|
|
|
|
|
|
|
|
|
|
|
| 19 |
"""
|
|
|
|
|
|
|
| 20 |
if not os.path.exists(path):
|
| 21 |
raise FileNotFoundError(f"Video file not found: {path}")
|
| 22 |
|
| 23 |
-
#
|
| 24 |
-
|
| 25 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 26 |
|
| 27 |
-
|
|
|
|
| 28 |
input_video_name = os.path.basename(path)
|
| 29 |
base_name = os.path.splitext(input_video_name)[0]
|
| 30 |
-
|
| 31 |
-
|
| 32 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 33 |
temp_output_path = os.path.join(output_dir, temp_output_name)
|
|
|
|
|
|
|
| 34 |
|
| 35 |
-
# Video
|
| 36 |
cap = cv2.VideoCapture(path)
|
| 37 |
if not cap.isOpened():
|
| 38 |
raise ValueError(f"Failed to open video file: {path}")
|
| 39 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 40 |
frame_width, frame_height = 640, 640
|
| 41 |
-
|
| 42 |
-
|
| 43 |
-
|
| 44 |
-
|
| 45 |
-
|
| 46 |
-
|
| 47 |
-
|
| 48 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 49 |
start = time.time()
|
| 50 |
-
|
|
|
|
| 51 |
|
| 52 |
while True:
|
| 53 |
ret, frame = cap.read()
|
| 54 |
-
if not ret:
|
| 55 |
break
|
| 56 |
|
| 57 |
-
|
| 58 |
-
|
| 59 |
-
|
| 60 |
-
|
| 61 |
-
|
| 62 |
-
|
| 63 |
-
|
| 64 |
-
|
| 65 |
-
|
| 66 |
-
|
| 67 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 68 |
out.write(annotated_frame)
|
| 69 |
|
| 70 |
-
|
| 71 |
-
|
| 72 |
-
|
| 73 |
-
|
| 74 |
-
|
| 75 |
-
else:
|
| 76 |
-
out.write(frame)
|
| 77 |
|
|
|
|
| 78 |
end = time.time()
|
|
|
|
|
|
|
| 79 |
cap.release()
|
| 80 |
out.release()
|
|
|
|
|
|
|
| 81 |
|
| 82 |
-
#
|
| 83 |
-
|
| 84 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 85 |
final_output_path = os.path.join(output_dir, final_output_name)
|
| 86 |
|
| 87 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 88 |
|
| 89 |
-
print(f"[INFO] Processing finished at {end:.2f} seconds")
|
| 90 |
-
print(f"[INFO] Total execution time: {end - start:.2f} seconds")
|
| 91 |
-
print(f"[INFO] Detected crimes: {detected_labels}")
|
| 92 |
-
print(f"[INFO] Annotated video saved at: {final_output_path}")
|
| 93 |
|
| 94 |
-
return
|
| 95 |
|
| 96 |
|
| 97 |
-
# Example usage (
|
| 98 |
# if __name__ == "__main__":
|
| 99 |
-
#
|
| 100 |
-
#
|
| 101 |
-
#
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
--- START OF FILE objec_detect_yolo.py ---
|
| 2 |
+
|
| 3 |
import cv2
|
| 4 |
import numpy as np
|
| 5 |
import os
|
| 6 |
from ultralytics import YOLO
|
| 7 |
import time
|
| 8 |
+
from typing import Tuple, Set, List
|
| 9 |
|
| 10 |
+
def detection(path: str) -> Tuple[Set[str], str]:
|
| 11 |
"""
|
| 12 |
+
Detects and tracks objects in a video using YOLOv8 model, saving an annotated output video.
|
| 13 |
|
| 14 |
Args:
|
| 15 |
+
path (str): Path to the input video file. Supports common video formats (mp4, avi, etc.)
|
| 16 |
|
| 17 |
Returns:
|
| 18 |
+
Tuple[Set[str], str]:
|
| 19 |
- Set of unique detected object labels (e.g., {'Gun', 'Knife'})
|
| 20 |
- Path to the output annotated video with detection boxes and tracking IDs
|
| 21 |
+
|
| 22 |
+
Raises:
|
| 23 |
+
FileNotFoundError: If input video doesn't exist
|
| 24 |
+
ValueError: If video cannot be opened/processed or output dir cannot be created
|
| 25 |
"""
|
| 26 |
+
|
| 27 |
+
# Validate input file exists
|
| 28 |
if not os.path.exists(path):
|
| 29 |
raise FileNotFoundError(f"Video file not found: {path}")
|
| 30 |
|
| 31 |
+
# --- Model Loading ---
|
| 32 |
+
# Construct path relative to this script file
|
| 33 |
+
model_path = os.path.join(os.path.dirname(__file__), "yolo", "best.pt")
|
| 34 |
+
if not os.path.exists(model_path):
|
| 35 |
+
raise FileNotFoundError(f"YOLO model file not found at: {model_path}")
|
| 36 |
+
try:
|
| 37 |
+
model = YOLO(model_path)
|
| 38 |
+
class_names = model.names # Get class label mappings
|
| 39 |
+
print(f"[INFO] YOLO model loaded from {model_path}. Class names: {class_names}")
|
| 40 |
+
except Exception as e:
|
| 41 |
+
raise ValueError(f"Failed to load YOLO model: {e}")
|
| 42 |
|
| 43 |
+
|
| 44 |
+
# --- Output Path Setup ---
|
| 45 |
input_video_name = os.path.basename(path)
|
| 46 |
base_name = os.path.splitext(input_video_name)[0]
|
| 47 |
+
# Sanitize basename to prevent issues with weird characters in filenames
|
| 48 |
+
safe_base_name = "".join(c if c.isalnum() or c in ('-', '_') else '_' for c in base_name)
|
| 49 |
+
|
| 50 |
+
# Define output directory relative to this script
|
| 51 |
+
# In HF Spaces, this will be inside the container's file system
|
| 52 |
+
output_dir = os.path.join(os.path.dirname(__file__), "results")
|
| 53 |
+
temp_output_name = f"{safe_base_name}_output_temp.mp4"
|
| 54 |
+
|
| 55 |
+
try:
|
| 56 |
+
os.makedirs(output_dir, exist_ok=True) # Create output dir if needed
|
| 57 |
+
if not os.path.isdir(output_dir):
|
| 58 |
+
raise ValueError(f"Path exists but is not a directory: {output_dir}")
|
| 59 |
+
except OSError as e:
|
| 60 |
+
raise ValueError(f"Failed to create or access output directory '{output_dir}': {e}")
|
| 61 |
+
|
| 62 |
temp_output_path = os.path.join(output_dir, temp_output_name)
|
| 63 |
+
print(f"[INFO] Temporary output will be saved to: {temp_output_path}")
|
| 64 |
+
|
| 65 |
|
| 66 |
+
# --- Video Processing Setup ---
|
| 67 |
cap = cv2.VideoCapture(path)
|
| 68 |
if not cap.isOpened():
|
| 69 |
raise ValueError(f"Failed to open video file: {path}")
|
| 70 |
|
| 71 |
+
# Get video properties for output writer
|
| 72 |
+
# Use source FPS if available and reasonable, otherwise default to 30
|
| 73 |
+
source_fps = cap.get(cv2.CAP_PROP_FPS)
|
| 74 |
+
output_fps = source_fps if 10 <= source_fps <= 60 else 30.0
|
| 75 |
+
|
| 76 |
+
# Process at a fixed resolution for consistency or use source resolution
|
| 77 |
+
# Using fixed 640x640 as potentially used during training/fine-tuning
|
| 78 |
frame_width, frame_height = 640, 640
|
| 79 |
+
# OR use source resolution (might require adjusting YOLO parameters if model expects specific size)
|
| 80 |
+
# frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
| 81 |
+
# frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
| 82 |
+
|
| 83 |
+
try:
|
| 84 |
+
out = cv2.VideoWriter(
|
| 85 |
+
temp_output_path,
|
| 86 |
+
cv2.VideoWriter_fourcc(*'mp4v'), # Use MP4 codec
|
| 87 |
+
output_fps,
|
| 88 |
+
(frame_width, frame_height)
|
| 89 |
+
)
|
| 90 |
+
if not out.isOpened():
|
| 91 |
+
# Attempt alternative codec if mp4v fails (less common)
|
| 92 |
+
print("[WARNING] mp4v codec failed, trying avc1...")
|
| 93 |
+
out = cv2.VideoWriter(
|
| 94 |
+
temp_output_path,
|
| 95 |
+
cv2.VideoWriter_fourcc(*'avc1'),
|
| 96 |
+
output_fps,
|
| 97 |
+
(frame_width, frame_height)
|
| 98 |
+
)
|
| 99 |
+
if not out.isOpened():
|
| 100 |
+
raise ValueError("Failed to initialize VideoWriter with mp4v or avc1 codec.")
|
| 101 |
+
|
| 102 |
+
except Exception as e:
|
| 103 |
+
cap.release() # Release capture device before raising
|
| 104 |
+
raise ValueError(f"Failed to create VideoWriter: {e}")
|
| 105 |
+
|
| 106 |
+
|
| 107 |
+
# --- Main Processing Loop ---
|
| 108 |
+
detected_classes: List[str] = [] # Track detected object class names
|
| 109 |
start = time.time()
|
| 110 |
+
frame_count = 0
|
| 111 |
+
print(f"[INFO] Video processing started...")
|
| 112 |
|
| 113 |
while True:
|
| 114 |
ret, frame = cap.read()
|
| 115 |
+
if not ret: # End of video or read error
|
| 116 |
break
|
| 117 |
|
| 118 |
+
frame_count += 1
|
| 119 |
+
# Resize frame BEFORE passing to model
|
| 120 |
+
resized_frame = cv2.resize(frame, (frame_width, frame_height))
|
| 121 |
+
|
| 122 |
+
try:
|
| 123 |
+
# Run YOLOv8 detection and tracking on the resized frame
|
| 124 |
+
results = model.track(
|
| 125 |
+
source=resized_frame, # Use resized frame
|
| 126 |
+
conf=0.7, # Confidence threshold
|
| 127 |
+
persist=True, # Maintain track IDs across frames
|
| 128 |
+
verbose=False # Suppress Ultralytics console output per frame
|
| 129 |
+
)
|
| 130 |
+
|
| 131 |
+
# Check if results are valid and contain boxes
|
| 132 |
+
if results and results[0] and results[0].boxes:
|
| 133 |
+
# Annotate the RESIZED frame with bounding boxes and track IDs
|
| 134 |
+
annotated_frame = results[0].plot() # plot() draws on the source image
|
| 135 |
+
|
| 136 |
+
# Record detected class names for this frame
|
| 137 |
+
for box in results[0].boxes:
|
| 138 |
+
if box.cls is not None: # Check if class ID is present
|
| 139 |
+
cls_id = int(box.cls[0]) # Get class index
|
| 140 |
+
if 0 <= cls_id < len(class_names):
|
| 141 |
+
detected_classes.append(class_names[cls_id])
|
| 142 |
+
else:
|
| 143 |
+
print(f"[WARNING] Detected unknown class ID: {cls_id}")
|
| 144 |
+
else:
|
| 145 |
+
# If no detections, use the original resized frame for the output video
|
| 146 |
+
annotated_frame = resized_frame
|
| 147 |
+
|
| 148 |
+
# Write the (potentially annotated) frame to the output video
|
| 149 |
out.write(annotated_frame)
|
| 150 |
|
| 151 |
+
except Exception as e:
|
| 152 |
+
print(f"[ERROR] Error processing frame {frame_count}: {e}")
|
| 153 |
+
# Write the unannotated frame to keep video timing consistent
|
| 154 |
+
out.write(resized_frame)
|
| 155 |
+
|
|
|
|
|
|
|
| 156 |
|
| 157 |
+
# --- Clean Up ---
|
| 158 |
end = time.time()
|
| 159 |
+
print(f"[INFO] Video processing finished. Processed {frame_count} frames.")
|
| 160 |
+
print(f"[INFO] Total processing time: {end - start:.2f} seconds")
|
| 161 |
cap.release()
|
| 162 |
out.release()
|
| 163 |
+
cv2.destroyAllWindows() # Close any OpenCV windows if they were opened
|
| 164 |
+
|
| 165 |
|
| 166 |
+
# --- Final Output Renaming ---
|
| 167 |
+
unique_detected_labels = set(detected_classes)
|
| 168 |
+
# Create a short string from labels for the filename
|
| 169 |
+
labels_str = "_".join(sorted(list(unique_detected_labels))).replace(" ", "_")
|
| 170 |
+
# Limit length to avoid overly long filenames
|
| 171 |
+
max_label_len = 50
|
| 172 |
+
if len(labels_str) > max_label_len:
|
| 173 |
+
labels_str = labels_str[:max_label_len] + "_etc"
|
| 174 |
+
if not labels_str: # Handle case where nothing was detected
|
| 175 |
+
labels_str = "no_detections"
|
| 176 |
+
|
| 177 |
+
final_output_name = f"{safe_base_name}_{labels_str}_output.mp4"
|
| 178 |
final_output_path = os.path.join(output_dir, final_output_name)
|
| 179 |
|
| 180 |
+
# Ensure final path doesn't already exist (rename might fail otherwise)
|
| 181 |
+
if os.path.exists(final_output_path):
|
| 182 |
+
os.remove(final_output_path)
|
| 183 |
+
|
| 184 |
+
try:
|
| 185 |
+
# Rename the temporary file to the final name
|
| 186 |
+
os.rename(temp_output_path, final_output_path)
|
| 187 |
+
print(f"[INFO] Detected object labels: {unique_detected_labels}")
|
| 188 |
+
print(f"[INFO] Annotated video saved successfully at: {final_output_path}")
|
| 189 |
+
except OSError as e:
|
| 190 |
+
print(f"[ERROR] Failed to rename {temp_output_path} to {final_output_path}: {e}")
|
| 191 |
+
# Fallback: return the temp path if rename fails but file exists
|
| 192 |
+
if os.path.exists(temp_output_path):
|
| 193 |
+
print(f"[WARNING] Returning path to temporary file: {temp_output_path}")
|
| 194 |
+
return unique_detected_labels, temp_output_path
|
| 195 |
+
else:
|
| 196 |
+
raise ValueError(f"Output video generation failed. No output file found.")
|
| 197 |
|
|
|
|
|
|
|
|
|
|
|
|
|
| 198 |
|
| 199 |
+
return unique_detected_labels, final_output_path
|
| 200 |
|
| 201 |
|
| 202 |
+
# # Example usage (commented out for library use)
|
| 203 |
# if __name__ == "__main__":
|
| 204 |
+
# test_video = input("Enter the local path to the video file: ").strip('"')
|
| 205 |
+
# if os.path.exists(test_video):
|
| 206 |
+
# try:
|
| 207 |
+
# print(f"[INFO] Processing video: {test_video}")
|
| 208 |
+
# labels, out_path = detection(test_video)
|
| 209 |
+
# print(f"\nDetection Complete.")
|
| 210 |
+
# print(f"Detected unique labels: {labels}")
|
| 211 |
+
# print(f"Output video saved to: {out_path}")
|
| 212 |
+
# except (FileNotFoundError, ValueError, Exception) as e:
|
| 213 |
+
# print(f"\nAn error occurred: {e}")
|
| 214 |
+
# else:
|
| 215 |
+
# print(f"Error: Input video file not found - {test_video}")
|
| 216 |
+
|
| 217 |
+
--- END OF FILE objec_detect_yolo.py ---
|