Spaces:
Sleeping
Sleeping
import cv2 | |
import torch | |
import gradio as gr | |
import numpy as np | |
from ultralytics import YOLO, __version__ as ultralytics_version | |
import time | |
# Debug: Check environment | |
print(f"Torch version: {torch.__version__}") | |
print(f"Gradio version: {gr.__version__}") | |
print(f"Ultralytics version: {ultralytics_version}") | |
print(f"CUDA available: {torch.cuda.is_available()}") | |
# Load custom YOLO model | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
print(f"Using device: {device}") | |
model = YOLO('./data/best.pt').to(device) | |
print(f"Model classes: {model.names}") # Print classes (should include cracks, potholes) | |
def process_video(video, resize_width=640, resize_height=480, frame_skip=1): | |
if video is None: | |
return "Error: No video uploaded" | |
# Start timer | |
start_time = time.time() | |
# Open input video | |
cap = cv2.VideoCapture(video) | |
if not cap.isOpened(): | |
return "Error: Could not open video file" | |
# Get input video properties | |
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
expected_duration = total_frames / fps | |
print(f"Input video: {frame_width}x{frame_height}, {fps} FPS, {total_frames} frames, {expected_duration:.2f} seconds") | |
# Set output resolution | |
out_width, out_height = resize_width, resize_height | |
print(f"Output resolution: {out_width}x{out_height}") | |
# Set up video writer | |
output_path = "processed_output.mp4" | |
fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Use 'H264' if mp4v fails | |
out = cv2.VideoWriter(output_path, fourcc, fps, (out_width, out_height)) | |
frame_count = 0 | |
processed_frames = 0 | |
while True: | |
ret, frame = cap.read() | |
if not ret: | |
break | |
frame_count += 1 | |
# Skip frames if frame_skip > 1 | |
if frame_count % frame_skip != 0: | |
continue | |
processed_frames += 1 | |
print(f"Processing frame {frame_count}/{total_frames}") | |
# Resize frame for faster inference | |
frame = cv2.resize(frame, (out_width, out_height)) | |
# Run YOLO inference (detect cracks and potholes) | |
results = model(frame, verbose=False, conf=0.5) # Confidence threshold 0.5 | |
annotated_frame = results[0].plot() | |
# Log detections | |
for detection in results[0].boxes: | |
cls = int(detection.cls) | |
conf = float(detection.conf) | |
print(f"Frame {frame_count}: Detected {model.names[cls]} with confidence {conf:.2f}") | |
# Write annotated frame | |
out.write(annotated_frame) | |
# Duplicate frames if skipping to maintain duration | |
if frame_skip > 1: | |
for _ in range(frame_skip - 1): | |
if frame_count + 1 <= total_frames: | |
out.write(annotated_frame) | |
frame_count += 1 | |
# Release resources | |
cap.release() | |
out.release() | |
# Verify output duration | |
cap = cv2.VideoCapture(output_path) | |
output_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
output_fps = cap.get(cv2.CAP_PROP_FPS) | |
output_duration = output_frames / output_fps | |
cap.release() | |
print(f"Output video: {output_frames} frames, {output_fps} FPS, {output_duration:.2f} seconds") | |
print(f"Processing time: {time.time() - start_time:.2f} seconds") | |
return output_path | |
# Gradio interface | |
iface = gr.Interface( | |
fn=process_video, | |
inputs=[ | |
gr.Video(label="Upload Video"), | |
gr.Slider(minimum=320, maximum=1280, value=640, label="Output Width", step=1), | |
gr.Slider(minimum=240, maximum=720, value=480, label="Output Height", step=1), | |
gr.Slider(minimum=1, maximum=5, value=1, label="Frame Skip (1 = process all frames)", step=1) | |
], | |
outputs=gr.Video(label="Processed Video"), | |
title="Crack and Pothole Detection with YOLO", | |
description="Upload a video to detect cracks and potholes. Adjust resolution and frame skip for faster processing." | |
) | |
if __name__ == "__main__": | |
iface.launch() |