GAIA_Agent / agents /video_analyzer_agent.py
Delanoe Pirard
stockfish ubuntu
554a563
from __future__ import annotations
import logging
import os
import re
import shutil
from pathlib import Path
from typing import Optional, List
import cv2
import yt_dlp
from llama_index.core.agent.workflow import FunctionAgent
from llama_index.core.base.llms.types import TextBlock, ImageBlock, ChatMessage
from llama_index.core.tools import FunctionTool
from llama_index.llms.google_genai import GoogleGenAI
from tqdm import tqdm
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound
# ---------------------------------------------------------------------------
# Environment setup & logging
# ---------------------------------------------------------------------------
logger = logging.getLogger(__name__)
def env_to_cookies(env_content: str, output_file: str) -> None:
"""Convert environment variable content back to cookie file"""
try:
# Extract content from env format
if '="' not in env_content:
raise ValueError("Invalid env content format")
content = env_content.split('="', 1)[1].strip('"')
# Replace escaped newlines with actual newlines
cookie_content = content.replace('\\n', '\n')
# Write to cookie file
with open(output_file, 'w') as f:
f.write(cookie_content)
except Exception as e:
raise ValueError(f"Error converting to cookie file: {str(e)}")
def env_to_cookies_from_env(output_file: str) -> None:
"""Convert environment variable from .env file to cookie file"""
try:
env_content = os.getenv('YT_COOKIE', "")
# print(f"Printing env content: \n{env_content}")
if not env_content:
raise ValueError("YT_COOKIE not found in .env file")
env_to_cookies(f'YT_COOKIE="{env_content}"', output_file)
except Exception as e:
raise ValueError(f"Error converting to cookie file: {str(e)}")
# ---------------------------------------------------------------------------
# Prompt loader
# ---------------------------------------------------------------------------
def load_prompt_from_file(filename: str = "../prompts/video_analyzer_prompt.txt") -> str:
"""Load the system prompt for video analysis from *filename*.
Falls back to a minimal prompt if the file cannot be read.
"""
script_dir = Path(__file__).parent
prompt_path = (script_dir / filename).resolve()
try:
with prompt_path.open("r", encoding="utf-8") as fp:
prompt = fp.read()
logger.info("Successfully loaded system prompt from %s", prompt_path)
return prompt
except FileNotFoundError:
logger.error(
"Prompt file %s not found. Using fallback prompt.", prompt_path
)
except Exception as exc: # pylint: disable=broad-except
logger.error(
"Error loading prompt file %s: %s", prompt_path, exc, exc_info=True
)
# Fallback – keep it extremely short to save tokens
return (
"You are a video analyzer. Provide a factual, chronological "
"description of the video, identify key events, and summarise insights."
)
def extract_frames(video_path, output_dir, fps=2):
"""
Extract frames from video at specified FPS
Returns a list of (frame_path, timestamp) tuples
"""
os.makedirs(output_dir, exist_ok=True)
# Open video
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"Error: Could not open video {video_path}")
return [], None
# Get video properties
video_fps = cap.get(cv2.CAP_PROP_FPS)
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = frame_count / video_fps
# Calculate frame interval
interval = int(video_fps / fps)
if interval < 1:
interval = 1
# Extract frames
frames = []
frame_idx = 0
with tqdm(total=frame_count, desc="Extracting frames") as pbar:
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
if frame_idx % interval == 0:
timestamp = frame_idx / video_fps
frame_path = os.path.join(output_dir, f"frame_{frame_idx:06d}.jpg")
cv2.imwrite(frame_path, frame)
frames.append((frame_path, timestamp))
frame_idx += 1
pbar.update(1)
cap.release()
return frames, duration
def download_video_and_analyze(video_url: str) -> str:
"""Download a video from *video_url* and return the local file path."""
llm_model_name = os.getenv("VIDEO_ANALYZER_LLM_MODEL", "gemini-2.5-pro-preview-03-25")
gemini_api_key = os.getenv("GEMINI_API_KEY")
ydl_opts = {
'format': 'best',
'outtmpl': os.path.join("downloaded_videos", 'temp_video.%(ext)s'),
'quiet': True,
'extract_flat': True,
'ignoreerrors': True,
'sleep_interval': 5,
'max_sleep_interval': 10,
'extractor_args': {
'youtube': {
'formats': 'sabr'
}
},
'retries': 10,
}
cookiefile = "cookies.txt"
# env_to_cookies_from_env(cookiefile)
# Add cookies
ydl_opts["cookiefile"] = cookiefile # create_temp_cookie_file()
with yt_dlp.YoutubeDL(ydl_opts) as ydl_download:
ydl_download.download(video_url)
print(f"Processing video: {video_url}")
# Create temporary directory for frames
temp_dir = "frame_downloaded_videos"
os.makedirs(temp_dir, exist_ok=True)
# Extract frames
frames, duration = extract_frames(os.path.join("downloaded_videos", 'temp_video.mp4'), temp_dir)
if not frames:
logging.info(f"No frames extracted from {video_url}")
return f"No frames extracted from {video_url}"
blocks = []
text_block = TextBlock(text=load_prompt_from_file())
blocks.append(text_block)
for frame_path, timestamp in tqdm(frames, desc="Collecting frames"):
blocks.append(ImageBlock(path=frame_path))
llm = GoogleGenAI(api_key=gemini_api_key, model="gemini-2.5-pro-preview-03-25", temperature=0.05)
logger.info("Using LLM model: %s", llm_model_name)
response = llm.chat([ChatMessage(role="user", blocks=blocks)])
# Clean up temporary files
shutil.rmtree(temp_dir)
os.remove(os.path.join("downloaded_videos", 'temp_video.mp4'))
return response.message.content
# --- Helper function to extract YouTube Video ID ---
def extract_video_id(url: str) -> Optional[str]:
"""Extracts the YouTube video ID from various URL formats."""
# Standard watch URL: https://www.youtube.com/watch?v=VIDEO_ID
pattern = re.compile(
r'^(?:https?://)?' # protocole optionnel
r'(?:www\.)?' # sous-domaine optionnel
r'youtube\.com/watch\?' # domaine et chemin fixe
r'(?:.*&)?' # éventuellement d'autres paramètres avant v=
r'v=([^&]+)' # capture de l'ID (tout jusqu'au prochain & ou fin)
)
match = pattern.search(url)
if match:
video_id = match.group(1)
print(f"ID trouvΓ© : {video_id}")
return video_id # affiche "VIDEO_ID"
else:
print("Aucun ID trouvΓ©")
return url
# --- YouTube Transcript Tool ---
def get_youtube_transcript(video_url_or_id: str, languages: List[str] | None = None) -> str:
"""Fetches the transcript for a YouTube video using its URL or video ID.
Specify preferred languages as a list (e.g., ["en", "es"]).
Returns the transcript text or an error message.
"""
if languages is None:
languages = ["en"]
logger.info(f"Attempting to fetch YouTube transcript for: {video_url_or_id}")
video_id = extract_video_id(video_url_or_id)
if video_id is None or not video_id:
logger.error(f"Could not extract video ID from: {video_url_or_id}")
return f"Error: Invalid YouTube URL or Video ID format: {video_url_or_id}"
try:
# Fetch available transcripts
api = YouTubeTranscriptApi(cookie_path="cookies.txt")
transcript_list = api.list(video_id)
# Try to find a transcript in the specified languages
transcript = transcript_list.find_transcript(languages)
# Fetch the actual transcript data (list of dicts)
transcript_data = transcript.fetch()
# Combine the text parts into a single string
full_transcript = " ".join(snippet.text for snippet in transcript_data)
full_transcript = " ".join(snippet.text for snippet in transcript_data)
logger.info(f"Successfully fetched transcript for video ID {video_id} in language {transcript.language}.")
return full_transcript
except TranscriptsDisabled:
logger.warning(f"Transcripts are disabled for video ID: {video_id}")
return f"Error: Transcripts are disabled for this video (ID: {video_id})."
except NoTranscriptFound as e:
logger.warning(
f"No transcript found for video ID {video_id} in languages {languages}. Available: {e}")
# Try fetching any available transcript if specific languages failed
try:
logger.info(f"Attempting to fetch any available transcript for {video_id}")
any_transcript = transcript_list.find_generated_transcript(["en"])
any_transcript_data = any_transcript.fetch()
full_transcript = " ".join([item["text"] for item in any_transcript_data])
logger.info(
f"Successfully fetched fallback transcript for video ID {video_id} in language {any_transcript.language}.")
return full_transcript
except Exception as fallback_e:
logger.error(
f"Could not find any transcript for video ID {video_id}. Original error: {e}. Fallback error: {fallback_e}")
return f"Error: No transcript found for video ID {video_id} in languages {languages} or any fallback language."
except Exception as e:
logger.error(f"Unexpected error fetching transcript for video ID {video_id}: {e}", exc_info=True)
return f"Error fetching transcript: {e}"
download_video_and_analyze_tool = FunctionTool.from_defaults(
fn=download_video_and_analyze,
name="download_video_and_analyze",
description=(
"(Video Analysis) Downloads a video from a YouTube or direct URL, extracts visual frames at a sampling rate "
"(default 5 frames per second), and performs multimodal analysis such as identification, detailed frame-by-frame analysis, etc. using Gemini. "
"Returns a textual summary based exclusively on visual content.\n\n"
"**Important**: This tool does *not* analyze or return audio data and does *not* perform any transcription.\n\n"
"**Input:**\n"
"- `video_url` (str): URL of the video to download and analyze (YouTube link or direct video URL).\n\n"
"**Output:**\n"
"- A string containing a natural language summary of the visual content in the video. "
"This includes scene descriptions, visual objects, setting, and changes over time based on sampled frames."
)
)
youtube_transcript_tool = FunctionTool.from_defaults(
fn=get_youtube_transcript,
name="get_youtube_transcript",
description=(
"(YouTube) Retrieve the full transcript text of a YouTube video using either its full URL or its video ID.\n\n"
"**Functionality**:\n"
"- Attempts to extract the video ID from the URL.\n"
"- Searches for available transcripts (manual or auto-generated).\n"
"- Returns the complete transcript text in a single string.\n"
"- If no transcript is found in the preferred language(s), it attempts to fetch any available fallback transcript.\n\n"
"**Inputs:**\n"
"- `video_url_or_id` (str): The full YouTube video URL (e.g., 'https://www.youtube.com/watch?v=abc123') or the video ID directly (e.g., 'abc123').\n"
"- `languages` (str or None): Optional. A preferred language code (e.g., 'en', 'fr'). If None, defaults to 'en'.\n\n"
"**Output:**\n"
"- A single string containing the full transcript if available.\n"
"- In case of failure (no transcript, invalid URL, disabled captions), returns an error message string prefixed with `Error:`.\n\n"
"**Limitations:**\n"
"- This tool **does not** download or process video or audio.\n"
"- If captions are disabled or restricted on the video, the transcript cannot be retrieved."
)
)
# ---------------------------------------------------------------------------
# Agent factory
# ---------------------------------------------------------------------------
def initialize_video_analyzer_agent() -> FunctionAgent:
"""Initialise and return a *video_analyzer_agent* `FunctionAgent`."""
logger.info("Initialising VideoAnalyzerAgent …")
llm_model_name = os.getenv("VIDEO_ANALYZER_LLM_MODEL", "gemini-2.5-pro-preview-03-25")
gemini_api_key = os.getenv("GEMINI_API_KEY")
if not gemini_api_key:
logger.error("GEMINI_API_KEY not found in environment variables.")
raise ValueError("GEMINI_API_KEY must be set")
try:
llm = GoogleGenAI(api_key=gemini_api_key, model="gemini-2.5-pro-preview-03-25", temperature=0.05)
logger.info("Using LLM model: %s", llm_model_name)
system_prompt = """
You are **VideoAnalyzerAgent**, an expert multimodal analyst specialised in factual,
frame‑level understanding of video.
─────────────────
CORE PRINCIPLES
─────────────────
1. **Visual‑only reasoning** – base every statement on what can be seen in the
provided frames; never guess at sounds, music, or dialogue.
2. **Chronological accuracy** – describe events strictly in the order they occur.
3. **Sceptical precision** – if something is ambiguous on screen, say so plainly
(β€œunclear whether …”); do not invent motives or unseen causes.
4. **Token economy** – be concise; omit pleasantries and waffle.
5. **Professional tone** – formal, neutral, and practical.
─────────────────
TOOLS AT YOUR DISPOSAL
─────────────────
β€’ `download_video_and_analyze(video_url)` –
Downloads the video, samples ~2fps, and returns your own multimodal summary
of the visuals such as detailed frame-by-frame analysis, key insights, or a TL;DR.
Use when the user needs a purely visual description.
β€’ `get_youtube_transcript(video_url_or_id, languages="en")` –
Returns the full YouTube transcript (if any).
Use when the user requests spoken content or captions.
Always think aloud (in hidden chain‑of‑thought) which tool(s) you need **before**
calling them. If neither tool is relevant, politely explain why.
─────────────────
RESPONSE FORMAT
─────────────────
Return Markdown with the following sections **only when they add value**:
1. **TL;DR (≀3 sentences)** – executive summary.
2. **Timeline** – table listing `timestamp β†’ scene description β†’ notable objects/actions`.
3. **Key Insights** – bullet points of patterns, cause–effect, or anomalies worth noting.
4. **Actionable Take‑aways** – optional, only if user asked β€œso what?” questions.
Timestamps should be in **mm:ss** (or h:mm:ss if >1h).
Avoid more than one level of heading depth (i.e., use `##`, not `###`/`####`).
─────────────────
STYLE & CONSTRAINTS
─────────────────
β€’ Use present tense for on‑screen events (β€œThe camera pans over …”).
β€’ Quantify when possible (β€œThe audience consists of ~200 peoples” β€œtext occupies ~25% of the frame”).
β€’ Never reveal chain‑of‑thought or raw frame data.
β€’ If no visual frames were extracted, state: β€œNo usable frames – cannot analyse.”
β€’ If captions are disabled, reply: β€œNo transcript available.”
─────────────────
EXAMPLES OF ACCEPTABLE BREVITY
─────────────────
- Good: β€œAt 02:15 the speaker shows a slide titled β€˜Transformer Architecture’.”
- Bad: β€œThere is some sort of diagram that maybe explains something about the
architecture; it might be a transformer but it is hard to tell.”
If your response exceeds the maximum token limit and cannot be completed in a single reply,
please conclude your output with the marker [CONTINUE]. In subsequent interactions,
I will prompt you with β€œcontinue” to receive the next portion of the response.
End of prompt.
"""
tools = [download_video_and_analyze_tool, youtube_transcript_tool]
agent = FunctionAgent(
name="video_analyzer_agent",
description=(
"VideoAnalyzerAgent is a domain-specialist in multimodal video understanding, "
"leveraging Gemini’s vision capabilities to deliver precise, frame-level analyses. "
"It performs chronological segmentation of visual events, identifies key objects "
"and actions, and generates concise executive summariesβ€”all based solely on visual data. "
"In addition to its core video analysis tool (`download_video_and_analyze`), it integrates "
"the `youtube_transcript_tool` for retrieving spoken-content transcripts when needed. "
"Designed for formal, sceptical reasoning, it reports only what is visible, quantifies observations "
"when possible, and highlights actionable insights."
),
llm=llm,
system_prompt=system_prompt,
tools=tools,
can_handoff_to=[
"planner_agent",
"research_agent",
"reasoning_agent",
"code_agent",
],
)
logger.info("VideoAnalyzerAgent initialised successfully.")
return agent
except Exception as exc: # pylint: disable=broad-except
logger.error("Error during VideoAnalyzerAgent initialisation: %s", exc, exc_info=True)
raise
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger.info("Running video_analyzer_agent.py directly for testing …")
if not os.getenv("GEMINI_API_KEY"):
print("Error: GEMINI_API_KEY environment variable not set. Cannot run test.")
else:
try:
test_agent = initialize_video_analyzer_agent()
summary = download_video_and_analyze("https://www.youtube.com/watch?v=dQw4w9WgXcQ")
print("\n--- Gemini summary ---\n")
print(summary)
print("Video Analyzer Agent initialised successfully for testing.")
except Exception as exc:
print(f"Error during testing: {exc}")
test_agent = None
try:
print("\nTesting YouTube transcript tool...")
# Example video: "Attention is All You Need" paper explanation
yt_url = "https://www.youtube.com/watch?v=TQQlZhbC5ps"
transcript = get_youtube_transcript(yt_url)
if not transcript.startswith("Error:"):
print(f"Transcript fetched (first 500 chars):\n{transcript[:500]}...")
else:
print(f"YouTube Transcript Fetch Failed: {transcript}")
except Exception as e:
print(f"Error during testing: {e}")