Yago Bolivar
feat: enhance YouTube video processing with improved error handling and logging
baa65ee
import os | |
import yt_dlp | |
import cv2 | |
import numpy as np | |
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound | |
import tempfile | |
import re | |
import shutil | |
import time | |
from smolagents.tools import Tool | |
import logging | |
# Set up logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
class VideoProcessingTool(Tool): | |
""" | |
Analyzes video content, extracting information such as frames, audio, or metadata. | |
Useful for tasks like video summarization, frame extraction, transcript analysis, or content analysis. | |
Has limitations with YouTube content due to platform restrictions. | |
""" | |
name = "video_processor" | |
description = "Analyzes video content from a file path or YouTube URL. Can extract frames, detect objects, get transcripts, and provide video metadata. Note: Has limitations with YouTube content due to platform restrictions." | |
inputs = { | |
"file_path": {"type": "string", "description": "Path to the video file or YouTube URL.", "nullable": True}, | |
"task": {"type": "string", "description": "Specific task to perform (e.g., 'extract_frames', 'get_transcript', 'detect_objects', 'get_metadata').", "nullable": True}, | |
"task_parameters": {"type": "object", "description": "Parameters for the specific task (e.g., frame extraction interval, object detection confidence).", "nullable": True} | |
} | |
outputs = {"result": {"type": "object", "description": "The result of the video processing task, e.g., list of frame paths, transcript text, object detection results, or metadata dictionary."}} | |
output_type = "object" | |
def __init__(self, model_cfg_path=None, model_weights_path=None, class_names_path=None, temp_dir_base=None, *args, **kwargs): | |
""" | |
Initializes the VideoProcessingTool. | |
Args: | |
model_cfg_path (str, optional): Path to the object detection model's configuration file. | |
model_weights_path (str, optional): Path to the object detection model's weights file. | |
class_names_path (str, optional): Path to the file containing class names for the model. | |
temp_dir_base (str, optional): Base directory for temporary files. Defaults to system temp. | |
""" | |
super().__init__(*args, **kwargs) | |
self.is_initialized = False # Will be set to True after successful setup | |
if temp_dir_base: | |
self.temp_dir = tempfile.mkdtemp(dir=temp_dir_base) | |
else: | |
self.temp_dir = tempfile.mkdtemp() | |
self.object_detection_model = None | |
self.class_names = [] | |
if model_cfg_path and model_weights_path and class_names_path: | |
if os.path.exists(model_cfg_path) and os.path.exists(model_weights_path) and os.path.exists(class_names_path): | |
try: | |
self.object_detection_model = cv2.dnn.readNetFromDarknet(model_cfg_path, model_weights_path) | |
self.object_detection_model.setPreferableBackend(cv2.dnn.DNN_BACKEND_OPENCV) | |
self.object_detection_model.setPreferableTarget(cv2.dnn.DNN_TARGET_CPU) | |
with open(class_names_path, "r") as f: | |
self.class_names = [line.strip() for line in f.readlines()] | |
print("CV Model loaded successfully.") | |
except Exception as e: | |
print(f"Error loading CV model: {e}. Object detection will not be available.") | |
self.object_detection_model = None | |
else: | |
print("Warning: One or more CV model paths are invalid. Object detection will not be available.") | |
else: | |
print("CV model paths not provided. Object detection will not be available.") | |
self.is_initialized = True | |
def forward(self, file_path: str = None, task: str = "get_metadata", task_parameters: dict = None): | |
""" | |
Main entry point for video processing tasks. | |
""" | |
if not self.is_initialized: | |
return {"error": "Tool not initialized properly."} | |
if task_parameters is None: | |
task_parameters = {} | |
# Check for YouTube URL and provide appropriate warnings | |
is_youtube_url = file_path and ("youtube.com/" in file_path or "youtu.be/" in file_path) | |
video_source_path = file_path | |
# Special case for YouTube - check for likely restrictions before attempting download | |
if is_youtube_url: | |
# For transcript tasks, try direct API first without downloading | |
if task == "get_transcript": | |
transcript_result = self.get_youtube_transcript(file_path) | |
if not transcript_result.get("error"): | |
return transcript_result | |
# If transcript API fails with certain errors, provide more helpful response | |
error_msg = transcript_result.get("error", "") | |
if "Transcripts are disabled" in error_msg: | |
return { | |
"error": "This YouTube video has disabled transcripts. Consider these alternatives:", | |
"alternatives": [ | |
"Please provide a different video with transcripts enabled", | |
"Upload a local video file that you have permission to use", | |
"Provide a text summary of the video content manually" | |
] | |
} | |
# For other tasks that require downloading | |
logger.info(f"YouTube URL detected: {file_path}. Attempting to access content...") | |
# Try to get metadata about the video before downloading (title, etc.) | |
try: | |
with yt_dlp.YoutubeDL({'quiet': True, 'no_warnings': True}) as ydl: | |
info = ydl.extract_info(file_path, download=False) | |
video_title = info.get('title', 'Unknown') | |
logger.info(f"Video title: {video_title}") | |
except Exception as e: | |
# YouTube is likely blocking access | |
error_text = str(e).lower() | |
if any(term in error_text for term in ["forbidden", "403", "blocked", "bot", "captcha", "cookie"]): | |
return { | |
"error": "YouTube access restricted. This agent cannot access this content due to platform restrictions.", | |
"alternatives": [ | |
"Please upload a local video file instead", | |
"For transcripts, try providing a text summary manually", | |
"For visual analysis, consider uploading screenshots from the video" | |
] | |
} | |
return {"error": f"Failed to access video info: {str(e)}"} | |
# Proceed with download attempt but with better handling | |
download_resolution = task_parameters.get("resolution", "360p") | |
download_result = self.download_video(file_path, resolution=download_resolution) | |
if download_result.get("error"): | |
error_text = download_result.get("error", "").lower() | |
if any(term in error_text for term in ["forbidden", "403", "blocked", "bot", "captcha", "cookie"]): | |
return { | |
"error": "YouTube download restricted. This agent cannot download this content due to platform restrictions.", | |
"alternatives": [ | |
"Please upload a local video file instead", | |
"For transcripts, try obtaining them separately or summarizing manually", | |
"For visual analysis, consider uploading key frames as images" | |
] | |
} | |
return download_result | |
video_source_path = download_result.get("file_path") | |
if not video_source_path or not os.path.exists(video_source_path): | |
return {"error": f"Failed to download or locate video from URL: {file_path}"} | |
elif file_path and not os.path.exists(file_path): | |
return {"error": f"Video file not found: {file_path}"} | |
elif not file_path and task not in ['get_transcript']: # transcript can work with URL directly | |
return {"error": "File path is required for this task."} | |
# Execute the appropriate task based on the request | |
if task == "get_metadata": | |
return self.get_video_metadata(video_source_path) | |
elif task == "extract_frames": | |
interval_seconds = task_parameters.get("interval_seconds", 5) | |
max_frames = task_parameters.get("max_frames") | |
return self.extract_frames_from_video(video_source_path, interval_seconds=interval_seconds, max_frames=max_frames) | |
elif task == "get_transcript": | |
# Use original file_path which might be the URL | |
return self.get_youtube_transcript(file_path) | |
elif task == "detect_objects": | |
if not self.object_detection_model: | |
return {"error": "Object detection model not loaded."} | |
confidence_threshold = task_parameters.get("confidence_threshold", 0.5) | |
frames_to_process = task_parameters.get("frames_to_process", 5) # Process N frames | |
return self.detect_objects_in_video(video_source_path, confidence_threshold=confidence_threshold, num_frames_to_sample=frames_to_process) | |
else: | |
return {"error": f"Unsupported task: {task}"} | |
def _extract_video_id(self, youtube_url): | |
"""Extract the YouTube video ID from a URL.""" | |
match = re.search(r"(?:v=|\/|embed\/|watch\?v=|youtu\.be\/)([0-9A-Za-z_-]{11})", youtube_url) | |
if match: | |
return match.group(1) | |
return None | |
def download_video(self, youtube_url, resolution="360p"): | |
"""Download YouTube video for processing with improved error handling.""" | |
video_id = self._extract_video_id(youtube_url) | |
if not video_id: | |
return {"error": "Invalid YouTube URL or could not extract video ID."} | |
output_file_name = f"{video_id}.mp4" | |
output_file_path = os.path.join(self.temp_dir, output_file_name) | |
if os.path.exists(output_file_path): # Avoid re-downloading | |
return {"success": True, "file_path": output_file_path, "message": "Video already downloaded."} | |
try: | |
# First try with default options | |
ydl_opts = { | |
'format': f'bestvideo[height<={resolution[:-1]}][ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best', | |
'outtmpl': output_file_path, | |
'noplaylist': True, | |
'quiet': True, | |
'no_warnings': True, | |
} | |
logger.info(f"Attempting to download YouTube video {video_id} at {resolution}...") | |
with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
ydl.download([youtube_url]) | |
if not os.path.exists(output_file_path): # Check if download actually created the file | |
# Fallback for some formats if mp4 direct is not available | |
logger.info("Primary download method failed, trying alternative format...") | |
ydl_opts['format'] = f'best[height<={resolution[:-1]}]' # more generic | |
with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
info_dict = ydl.extract_info(youtube_url, download=True) | |
# yt-dlp might save with a different extension, find the downloaded file | |
downloaded_files = [f for f in os.listdir(self.temp_dir) if f.startswith(video_id)] | |
if downloaded_files: | |
actual_file_path = os.path.join(self.temp_dir, downloaded_files[0]) | |
if actual_file_path != output_file_path and actual_file_path.endswith(('.mkv', '.webm', '.flv')): | |
# Use the actual downloaded file | |
output_file_path = actual_file_path | |
elif not actual_file_path.endswith('.mp4'): | |
return {"error": f"Downloaded video is not in a directly usable format: {downloaded_files[0]}"} | |
if os.path.exists(output_file_path): | |
return {"success": True, "file_path": output_file_path} | |
else: | |
return {"error": "Video download failed, file not found after attempt."} | |
except yt_dlp.utils.DownloadError as e: | |
error_msg = str(e) | |
if "Sign in to confirm your age" in error_msg: | |
return {"error": "Age-restricted video. Cannot download due to platform restrictions."} | |
elif "This video is private" in error_msg: | |
return {"error": "This video is private and cannot be accessed."} | |
elif any(term in error_msg.lower() for term in ["captcha", "bot", "cookie", "forbidden"]): | |
return {"error": f"YouTube access restricted due to bot detection. Consider uploading a local video file instead."} | |
return {"error": f"yt-dlp download error: {error_msg}"} | |
except Exception as e: | |
return {"error": f"Failed to download video: {str(e)}"} | |
def get_video_metadata(self, video_path): | |
"""Extract metadata from the video file.""" | |
if not os.path.exists(video_path): | |
return {"error": f"Video file not found: {video_path}"} | |
cap = cv2.VideoCapture(video_path) | |
if not cap.isOpened(): | |
return {"error": "Could not open video file."} | |
metadata = { | |
"frame_count": int(cap.get(cv2.CAP_PROP_FRAME_COUNT)), | |
"fps": cap.get(cv2.CAP_PROP_FPS), | |
"width": int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), | |
"height": int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)), | |
"duration": cap.get(cv2.CAP_PROP_FRAME_COUNT) / cap.get(cv2.CAP_PROP_FPS) | |
} | |
cap.release() | |
return {"success": True, "metadata": metadata} | |
def extract_frames_from_video(self, video_path, interval_seconds=5, max_frames=None): | |
""" | |
Extracts frames from the video at specified intervals. | |
Args: | |
video_path (str): Path to the video file. | |
interval_seconds (int): Interval in seconds between frames. | |
max_frames (int, optional): Maximum number of frames to extract. | |
Returns: | |
dict: {"success": True, "extracted_frame_paths": [...] } or {"error": "..."} | |
""" | |
if not os.path.exists(video_path): | |
return {"error": f"Video file not found: {video_path}"} | |
cap = cv2.VideoCapture(video_path) | |
if not cap.isOpened(): | |
return {"error": "Could not open video file."} | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
frame_interval = int(fps * interval_seconds) | |
extracted_frame_paths = [] | |
frame_count = 0 | |
while cap.isOpened(): | |
ret, frame = cap.read() | |
if not ret: | |
break | |
if frame_count % frame_interval == 0: | |
frame_id = int(frame_count / frame_interval) | |
frame_file_path = os.path.join(self.temp_dir, f"frame_{frame_id:04d}.jpg") | |
cv2.imwrite(frame_file_path, frame) | |
extracted_frame_paths.append(frame_file_path) | |
if max_frames and len(extracted_frame_paths) >= max_frames: | |
break | |
frame_count += 1 | |
cap.release() | |
return {"success": True, "extracted_frame_paths": extracted_frame_paths} | |
def get_youtube_transcript(self, youtube_url, languages=None): | |
"""Get the transcript/captions of a YouTube video.""" | |
if languages is None: | |
languages = ['en', 'en-US'] # Default to English | |
video_id = self._extract_video_id(youtube_url) | |
if not video_id: | |
return {"error": "Invalid YouTube URL or could not extract video ID."} | |
try: | |
# Reverting to list_transcripts due to issues with list() in the current env | |
transcript_list_obj = YouTubeTranscriptApi.list_transcripts(video_id) | |
transcript = None | |
# Try to find a manual transcript first in the specified languages | |
try: | |
transcript = transcript_list_obj.find_manually_created_transcript(languages) | |
except NoTranscriptFound: | |
# If no manual transcript, try to find a generated one | |
# This will raise NoTranscriptFound if it also fails, which is caught below. | |
transcript = transcript_list_obj.find_generated_transcript(languages) | |
# Retry logic for transcript.fetch() | |
fetched_transcript_entries = None | |
max_attempts = 3 # Total attempts | |
last_fetch_exception = None | |
for attempt in range(max_attempts): | |
try: | |
fetched_transcript_entries = transcript.fetch() | |
last_fetch_exception = None # Clear exception on success | |
break # Successful fetch | |
except Exception as e_fetch: | |
last_fetch_exception = e_fetch | |
if attempt < max_attempts - 1: | |
time.sleep(1) # Wait 1 second before retrying | |
# If it's the last attempt, the loop will end, and last_fetch_exception will be set. | |
if last_fetch_exception: # If all attempts failed | |
raise last_fetch_exception # Re-raise the last exception from fetch() | |
# Correctly access the 'text' attribute | |
full_transcript_text = " ".join([entry.text for entry in fetched_transcript_entries]) | |
return { | |
"success": True, | |
"transcript": full_transcript_text, | |
"transcript_entries": fetched_transcript_entries | |
} | |
except TranscriptsDisabled: | |
return {"error": "Transcripts are disabled for this video."} | |
except NoTranscriptFound: # This will catch if neither manual nor generated is found for the languages | |
return {"error": f"No transcript found for the video in languages: {languages}."} | |
except Exception as e: | |
# Catches other exceptions from YouTubeTranscriptApi calls or re-raised from fetch | |
return {"error": f"Failed to get transcript: {str(e)}"} | |
def detect_objects_in_video(self, video_path, confidence_threshold=0.5, num_frames_to_sample=5, target_fps=1): | |
""" | |
Detects objects in the video and returns the count of specified objects. | |
Args: | |
video_path (str): Path to the video file. | |
confidence_threshold (float): Minimum confidence for an object to be counted. | |
num_frames_to_sample (int): Number of frames to sample for object detection. | |
target_fps (int): Target frames per second for processing. | |
Returns: | |
dict: {"success": True, "object_counts": {...}} or {"error": "..."} | |
""" | |
if not self.object_detection_model or not self.class_names: | |
return {"error": "Object detection model not loaded or class names missing."} | |
if not os.path.exists(video_path): | |
return {"error": f"Video file not found: {video_path}"} | |
cap = cv2.VideoCapture(video_path) | |
if not cap.isOpened(): | |
return {"error": "Could not open video file."} | |
object_counts = {cls: 0 for cls in self.class_names} | |
frame_count = 0 | |
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
sample_interval = max(1, total_frames // num_frames_to_sample) | |
while cap.isOpened(): | |
ret, frame = cap.read() | |
if not ret: | |
break | |
if frame_count % sample_interval == 0: | |
height, width = frame.shape[:2] | |
blob = cv2.dnn.blobFromImage(frame, 1/255.0, (416, 416), swapRB=True, crop=False) | |
self.object_detection_model.setInput(blob) | |
layer_names = self.object_detection_model.getLayerNames() | |
# Handle potential differences in getUnconnectedOutLayers() return value | |
unconnected_out_layers_indices = self.object_detection_model.getUnconnectedOutLayers() | |
if isinstance(unconnected_out_layers_indices, np.ndarray) and unconnected_out_layers_indices.ndim > 1 : # For some OpenCV versions | |
output_layer_names = [layer_names[i[0] - 1] for i in unconnected_out_layers_indices] | |
else: # For typical cases | |
output_layer_names = [layer_names[i - 1] for i in unconnected_out_layers_indices] | |
detections = self.object_detection_model.forward(output_layer_names) | |
for detection_set in detections: # Detections can come from multiple output layers | |
for detection in detection_set: | |
scores = detection[5:] | |
class_id = np.argmax(scores) | |
confidence = scores[class_id] | |
if confidence > confidence_threshold: | |
detected_class_name = self.class_names[class_id] | |
object_counts[detected_class_name] += 1 | |
frame_count += 1 | |
cap.release() | |
return {"success": True, "object_counts": object_counts} | |
def cleanup(self): | |
"""Remove temporary files and directory.""" | |
if os.path.exists(self.temp_dir): | |
shutil.rmtree(self.temp_dir, ignore_errors=True) | |
# print(f"Cleaned up temp directory: {self.temp_dir}") | |
# Example Usage (for testing purposes, assuming model files are in ./models/cv/): | |
if __name__ == '__main__': | |
# Create dummy model files for local testing if they don't exist | |
os.makedirs("./models/cv", exist_ok=True) | |
dummy_cfg = "./models/cv/dummy-yolov3-tiny.cfg" | |
dummy_weights = "./models/cv/dummy-yolov3-tiny.weights" | |
dummy_names = "./models/cv/dummy-coco.names" | |
if not os.path.exists(dummy_cfg): open(dummy_cfg, 'w').write("# Dummy YOLOv3 tiny config") | |
if not os.path.exists(dummy_weights): open(dummy_weights, 'w').write("dummy weights") # Actual weights file is binary | |
if not os.path.exists(dummy_names): open(dummy_names, 'w').write("bird\\ncat\\ndog\\nperson") | |
# Initialize tool | |
# Note: For real object detection, provide paths to actual .cfg, .weights, and .names files. | |
# For example, from: https://pjreddie.com/darknet/yolo/ | |
video_tool = VideoProcessingTool( | |
model_cfg_path=dummy_cfg, # Replace with actual path to YOLOv3-tiny.cfg or similar | |
model_weights_path=dummy_weights, # Replace with actual path to YOLOv3-tiny.weights | |
class_names_path=dummy_names # Replace with actual path to coco.names | |
) | |
# Test 1: Get Transcript | |
# Replace with a video that has transcripts | |
transcript_test_url = "https://www.youtube.com/watch?v=1htKBjuUWec" # Stargate SG-1 clip | |
print(f"--- Testing Transcript for: {transcript_test_url} ---") | |
transcript_info = video_tool.process_video(transcript_test_url, "transcript") | |
if transcript_info.get("success"): | |
print("Transcript (first 100 chars):", transcript_info.get("transcript", "")[:100]) | |
else: | |
print("Transcript Error:", transcript_info.get("error")) | |
print("\\n") | |
# Test 2: Find Dialogue Response | |
dialogue_test_url = "https://www.youtube.com/watch?v=1htKBjuUWec" # Stargate SG-1 clip | |
print(f"--- Testing Dialogue Response for: {dialogue_test_url} ---") | |
dialogue_info = video_tool.process_video( | |
dialogue_test_url, | |
"dialogue_response", | |
query_params={"query_phrase": "Isn't that hot?"} | |
) | |
if dialogue_info.get("success"): | |
print(f"Query: 'Isn't that hot?', Response: '{dialogue_info.get('response_text')}'") | |
else: | |
print("Dialogue Error:", dialogue_info.get("error")) | |
print("\\n") | |
# Test 3: Object Counting (will likely use dummy model and might not detect much without real video/model) | |
# Replace with a video URL that you want to test object counting on. | |
# This example will download a short video. | |
object_count_test_url = "https://www.youtube.com/watch?v=L1vXCYZAYYM" # Birds video | |
print(f"--- Testing Object Counting for: {object_count_test_url} ---") | |
# Ensure you have actual model files for this to work meaningfully. | |
# The dummy model files will likely result in zero counts or errors if OpenCV can't parse them. | |
# For this example, we expect it to run through, but actual detection depends on valid models. | |
if video_tool.object_detection_model: | |
count_info = video_tool.process_video( | |
object_count_test_url, | |
"object_count", | |
query_params={"target_classes": ["bird"], "resolution": "360p"} | |
) | |
if count_info.get("success"): | |
print("Object Counts:", count_info) | |
else: | |
print("Object Counting Error:", count_info.get("error")) | |
else: | |
print("Object detection model not loaded, skipping object count test.") | |
# Cleanup | |
video_tool.cleanup() | |
# Clean up dummy model files if they were created by this script | |
# (Be careful if you have real files with these names) | |
# if os.path.exists(dummy_cfg) and "dummy-yolov3-tiny.cfg" in dummy_cfg : os.remove(dummy_cfg) | |
# if os.path.exists(dummy_weights) and "dummy-yolov3-tiny.weights" in dummy_weights: os.remove(dummy_weights) | |
# if os.path.exists(dummy_names) and "dummy-coco.names" in dummy_names: os.remove(dummy_names) | |
# if os.path.exists("./models/cv") and not os.listdir("./models/cv"): os.rmdir("./models/cv") | |
# if os.path.exists("./models") and not os.listdir("./models"): os.rmdir("./models") | |
print("\\nAll tests finished.") | |