Spaces:
Sleeping
Sleeping
import cv2 | |
import numpy as np | |
import os | |
from ultralytics import YOLO | |
import time | |
from typing import Tuple, Set, List | |
def detection(path: str) -> Tuple[Set[str], str]: | |
""" | |
Detects and tracks objects in a video using YOLOv8 model, saving an annotated output video. | |
Args: | |
path (str): Path to the input video file. Supports common video formats (mp4, avi, etc.) | |
Returns: | |
Tuple[Set[str], str]: | |
- Set of unique detected object labels (e.g., {'Gun', 'Knife'}) | |
- Path to the output annotated video with detection boxes and tracking IDs | |
Raises: | |
FileNotFoundError: If input video doesn't exist | |
ValueError: If video cannot be opened/processed or output dir cannot be created | |
""" | |
# Validate input file exists | |
if not os.path.exists(path): | |
raise FileNotFoundError(f"Video file not found: {path}") | |
# --- Model Loading --- | |
# Construct path relative to this script file | |
model_path = os.path.join(os.path.dirname(__file__), "best.pt") | |
if not os.path.exists(model_path): | |
raise FileNotFoundError(f"YOLO model file not found at: {model_path}") | |
try: | |
model = YOLO(model_path) | |
class_names = model.names # Get class label mappings | |
print(f"[INFO] YOLO model loaded from {model_path}. Class names: {class_names}") | |
except Exception as e: | |
raise ValueError(f"Failed to load YOLO model: {e}") | |
# --- Output Path Setup --- | |
input_video_name = os.path.basename(path) | |
base_name = os.path.splitext(input_video_name)[0] | |
# Sanitize basename to prevent issues with weird characters in filenames | |
safe_base_name = "".join(c if c.isalnum() or c in ('-', '_') else '_' for c in base_name) | |
# Define output directory relative to this script | |
# In HF Spaces, this will be inside the container's file system | |
output_dir = os.path.join(os.path.dirname(__file__), "results") | |
temp_output_name = f"{safe_base_name}_output_temp.mp4" | |
try: | |
os.makedirs(output_dir, exist_ok=True) # Create output dir if needed | |
if not os.path.isdir(output_dir): | |
raise ValueError(f"Path exists but is not a directory: {output_dir}") | |
except OSError as e: | |
raise ValueError(f"Failed to create or access output directory '{output_dir}': {e}") | |
temp_output_path = os.path.join(output_dir, temp_output_name) | |
print(f"[INFO] Temporary output will be saved to: {temp_output_path}") | |
# --- Video Processing Setup --- | |
cap = cv2.VideoCapture(path) | |
if not cap.isOpened(): | |
raise ValueError(f"Failed to open video file: {path}") | |
# Get video properties for output writer | |
# Use source FPS if available and reasonable, otherwise default to 30 | |
source_fps = cap.get(cv2.CAP_PROP_FPS) | |
output_fps = source_fps if 10 <= source_fps <= 60 else 30.0 | |
# Process at a fixed resolution for consistency or use source resolution | |
# Using fixed 640x640 as potentially used during training/fine-tuning | |
frame_width, frame_height = 640, 640 | |
# OR use source resolution (might require adjusting YOLO parameters if model expects specific size) | |
# frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
# frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
try: | |
out = cv2.VideoWriter( | |
temp_output_path, | |
cv2.VideoWriter_fourcc(*'mp4v'), # Use MP4 codec | |
output_fps, | |
(frame_width, frame_height) | |
) | |
if not out.isOpened(): | |
# Attempt alternative codec if mp4v fails (less common) | |
print("[WARNING] mp4v codec failed, trying avc1...") | |
out = cv2.VideoWriter( | |
temp_output_path, | |
cv2.VideoWriter_fourcc(*'avc1'), | |
output_fps, | |
(frame_width, frame_height) | |
) | |
if not out.isOpened(): | |
raise ValueError("Failed to initialize VideoWriter with mp4v or avc1 codec.") | |
except Exception as e: | |
cap.release() # Release capture device before raising | |
raise ValueError(f"Failed to create VideoWriter: {e}") | |
# --- Main Processing Loop --- | |
detected_classes: List[str] = [] # Track detected object class names | |
start = time.time() | |
frame_count = 0 | |
print(f"[INFO] Video processing started...") | |
while True: | |
ret, frame = cap.read() | |
if not ret: # End of video or read error | |
break | |
frame_count += 1 | |
# Resize frame BEFORE passing to model | |
resized_frame = cv2.resize(frame, (frame_width, frame_height)) | |
try: | |
# Run YOLOv8 detection and tracking on the resized frame | |
results = model.track( | |
source=resized_frame, # Use resized frame | |
conf=0.7, # Confidence threshold | |
persist=True, # Maintain track IDs across frames | |
verbose=False # Suppress Ultralytics console output per frame | |
) | |
# Check if results are valid and contain boxes | |
if results and results[0] and results[0].boxes: | |
# Annotate the RESIZED frame with bounding boxes and track IDs | |
annotated_frame = results[0].plot() # plot() draws on the source image | |
# Record detected class names for this frame | |
for box in results[0].boxes: | |
if box.cls is not None: # Check if class ID is present | |
cls_id = int(box.cls[0]) # Get class index | |
if 0 <= cls_id < len(class_names): | |
detected_classes.append(class_names[cls_id]) | |
else: | |
print(f"[WARNING] Detected unknown class ID: {cls_id}") | |
else: | |
# If no detections, use the original resized frame for the output video | |
annotated_frame = resized_frame | |
# Write the (potentially annotated) frame to the output video | |
out.write(annotated_frame) | |
except Exception as e: | |
print(f"[ERROR] Error processing frame {frame_count}: {e}") | |
# Write the unannotated frame to keep video timing consistent | |
out.write(resized_frame) | |
# --- Clean Up --- | |
end = time.time() | |
print(f"[INFO] Video processing finished. Processed {frame_count} frames.") | |
print(f"[INFO] Total processing time: {end - start:.2f} seconds") | |
cap.release() | |
out.release() | |
cv2.destroyAllWindows() # Close any OpenCV windows if they were opened | |
# --- Final Output Renaming --- | |
unique_detected_labels = set(detected_classes) | |
# Create a short string from labels for the filename | |
labels_str = "_".join(sorted(list(unique_detected_labels))).replace(" ", "_") | |
# Limit length to avoid overly long filenames | |
max_label_len = 50 | |
if len(labels_str) > max_label_len: | |
labels_str = labels_str[:max_label_len] + "_etc" | |
if not labels_str: # Handle case where nothing was detected | |
labels_str = "no_detections" | |
final_output_name = f"{safe_base_name}_{labels_str}_output.mp4" | |
final_output_path = os.path.join(output_dir, final_output_name) | |
# Ensure final path doesn't already exist (rename might fail otherwise) | |
if os.path.exists(final_output_path): | |
os.remove(final_output_path) | |
try: | |
# Rename the temporary file to the final name | |
os.rename(temp_output_path, final_output_path) | |
print(f"[INFO] Detected object labels: {unique_detected_labels}") | |
print(f"[INFO] Annotated video saved successfully at: {final_output_path}") | |
except OSError as e: | |
print(f"[ERROR] Failed to rename {temp_output_path} to {final_output_path}: {e}") | |
# Fallback: return the temp path if rename fails but file exists | |
if os.path.exists(temp_output_path): | |
print(f"[WARNING] Returning path to temporary file: {temp_output_path}") | |
return unique_detected_labels, temp_output_path | |
else: | |
raise ValueError(f"Output video generation failed. No output file found.") | |
return unique_detected_labels, final_output_path | |
# # Example usage (commented out for library use) | |
# if __name__ == "__main__": | |
# test_video = input("Enter the local path to the video file: ").strip('"') | |
# if os.path.exists(test_video): | |
# try: | |
# print(f"[INFO] Processing video: {test_video}") | |
# labels, out_path = detection(test_video) | |
# print(f"\nDetection Complete.") | |
# print(f"Detected unique labels: {labels}") | |
# print(f"Output video saved to: {out_path}") | |
# except (FileNotFoundError, ValueError, Exception) as e: | |
# print(f"\nAn error occurred: {e}") | |
# else: | |
# print(f"Error: Input video file not found - {test_video}") | |