Spaces:
Sleeping
Sleeping
import cv2 | |
import numpy as np | |
import os | |
from ultralytics import YOLO | |
import time | |
from typing import Tuple, Set | |
def detect_objects_in_video(path: str) -> Tuple[Set[str], str]: | |
""" | |
Detects and tracks objects in a video using a YOLOv8 model, saving an annotated output video. | |
Args: | |
path (str): Path to the input video file. | |
Returns: | |
Tuple[Set[str], str]: | |
- Set of unique detected object labels (e.g., {'Gun', 'Knife'}) | |
- Path to the output annotated video with detection boxes and tracking IDs | |
""" | |
if not os.path.exists(path): | |
raise FileNotFoundError(f"Video file not found: {path}") | |
# Load YOLOv8 model (adjust path if necessary) | |
model = YOLO("yolo/best.pt") # Make sure this path is correct | |
class_names = model.names | |
# Output setup | |
input_video_name = os.path.basename(path) | |
base_name = os.path.splitext(input_video_name)[0] | |
temp_output_name = f"{base_name}_output_temp.mp4" | |
output_dir = "results" | |
os.makedirs(output_dir, exist_ok=True) | |
temp_output_path = os.path.join(output_dir, temp_output_name) | |
# Video I/O setup | |
cap = cv2.VideoCapture(path) | |
if not cap.isOpened(): | |
raise ValueError(f"Failed to open video file: {path}") | |
frame_width, frame_height = 640, 640 | |
out = cv2.VideoWriter( | |
temp_output_path, | |
cv2.VideoWriter_fourcc(*'mp4v'), | |
30.0, | |
(frame_width, frame_height) | |
) | |
detected_labels = set() | |
start = time.time() | |
print(f"[INFO] Processing started at {start:.2f} seconds") | |
while True: | |
ret, frame = cap.read() | |
if not ret: | |
break | |
frame = cv2.resize(frame, (frame_width, frame_height)) | |
# Run detection and tracking | |
results = model.track( | |
source=frame, | |
conf=0.7, | |
persist=True | |
) | |
if results and hasattr(results[0], "plot"): | |
annotated_frame = results[0].plot() | |
out.write(annotated_frame) | |
# Extract class labels | |
if hasattr(results[0], "boxes"): | |
for box in results[0].boxes: | |
cls = int(box.cls) | |
detected_labels.add(class_names[cls]) | |
else: | |
out.write(frame) | |
end = time.time() | |
cap.release() | |
out.release() | |
# Create final output filename | |
crimes_str = "_".join(sorted(detected_labels)).replace(" ", "_")[:50] | |
final_output_name = f"{base_name}_{crimes_str}_output.mp4" | |
final_output_path = os.path.join(output_dir, final_output_name) | |
os.rename(temp_output_path, final_output_path) | |
print(f"[INFO] Processing finished at {end:.2f} seconds") | |
print(f"[INFO] Total execution time: {end - start:.2f} seconds") | |
print(f"[INFO] Detected crimes: {detected_labels}") | |
print(f"[INFO] Annotated video saved at: {final_output_path}") | |
return detected_labels, final_output_path | |
# Example usage (uncomment to use as standalone script) | |
# if __name__ == "__main__": | |
# video_path = input("Enter the path to the video file: ").strip('"') | |
# print(f"[INFO] Loading video: {video_path}") | |
# detect_objects_in_video(video_path) | |