Spaces:
Sleeping
Sleeping
import gradio as gr | |
import yt_dlp | |
import os | |
import tempfile | |
import shutil | |
from pathlib import Path | |
import re | |
import uuid | |
import json | |
from datetime import datetime | |
import google.generativeai as genai | |
class YouTubeDownloader: | |
def __init__(self): | |
self.download_dir = tempfile.mkdtemp() | |
# Use temp directory for Gradio compatibility | |
self.temp_downloads = tempfile.mkdtemp(prefix="youtube_downloads_") | |
# Also create user downloads folder for copying | |
self.downloads_folder = os.path.join(os.path.expanduser("~"), "Downloads", "YouTube_Downloads") | |
os.makedirs(self.downloads_folder, exist_ok=True) | |
self.gemini_model = None | |
def configure_gemini(self, api_key): | |
"""Configure Gemini API with the provided key""" | |
try: | |
genai.configure(api_key=api_key) | |
self.gemini_model = genai.GenerativeModel(model_name="gemini-1.5-flash-latest") | |
return True, "β Gemini API configured successfully!" | |
except Exception as e: | |
return False, f"β Failed to configure Gemini API: {str(e)}" | |
def cleanup(self): | |
"""Clean up temporary directories and files""" | |
try: | |
if hasattr(self, 'download_dir') and os.path.exists(self.download_dir): | |
shutil.rmtree(self.download_dir) | |
print(f"β Cleaned up temporary directory: {self.download_dir}") | |
if hasattr(self, 'temp_downloads') and os.path.exists(self.temp_downloads): | |
shutil.rmtree(self.temp_downloads) | |
print(f"β Cleaned up temp downloads directory: {self.temp_downloads}") | |
except Exception as e: | |
print(f"β οΈ Warning: Could not clean up temporary directory: {e}") | |
def is_valid_youtube_url(self, url): | |
youtube_regex = re.compile( | |
r'(https?://)?(www\.)?(youtube|youtu|youtube-nocookie)\.(com|be)/' | |
r'(watch\?v=|embed/|v/|.+\?v=)?([^&=%\?]{11})' | |
) | |
return youtube_regex.match(url) is not None | |
def generate_scene_breakdown_gemini(self, video_info): | |
"""Generate AI-powered scene breakdown using Gemini""" | |
if not self.gemini_model: | |
return self.generate_scene_breakdown_fallback(video_info) | |
try: | |
duration = video_info.get('duration', 0) | |
title = video_info.get('title', '') | |
description = video_info.get('description', '')[:1000] # Limit description length | |
if not duration: | |
return ["**[Duration Unknown]**: Unable to generate timestamped breakdown - video duration not available"] | |
# Create prompt for Gemini | |
prompt = f""" | |
Analyze this YouTube video and create a detailed scene-by-scene breakdown with timestamps: | |
Title: {title} | |
Duration: {duration} seconds | |
Description: {description} | |
Please provide a scene breakdown with the following format: | |
- Divide the video into logical segments based on typical content flow | |
- For videos under 2 minutes: 10-15 second segments | |
- For videos 2-10 minutes: 30-45 second segments | |
- For videos over 10 minutes: 60-90 second segments | |
- Maximum 15 scenes total | |
For each scene, provide: | |
**[START_TIME-END_TIME]**: Detailed description of what likely happens in this segment, including visual elements, audio cues, potential dialogue or narration, and scene transitions. | |
Consider the video type (tutorial, music video, vlog, etc.) and provide contextually appropriate descriptions. | |
Format timestamps as MM:SS. | |
""" | |
response = self.gemini_model.generate_content(prompt) | |
# Parse the response into individual scenes | |
if response and response.text: | |
scenes = [] | |
lines = response.text.split('\n') | |
for line in lines: | |
line = line.strip() | |
if line and ('**[' in line or line.startswith('*')): | |
scenes.append(line) | |
return scenes if scenes else self.generate_scene_breakdown_fallback(video_info) | |
else: | |
return self.generate_scene_breakdown_fallback(video_info) | |
except Exception as e: | |
print(f"Gemini API error: {e}") | |
return self.generate_scene_breakdown_fallback(video_info) | |
def generate_scene_breakdown_fallback(self, video_info): | |
"""Fallback scene generation when Gemini is not available""" | |
duration = video_info.get('duration', 0) | |
title = video_info.get('title', '').lower() | |
if not duration: | |
return ["**[Duration Unknown]**: Unable to generate timestamped breakdown"] | |
# Simple fallback logic | |
if duration <= 120: | |
segment_length = 15 | |
elif duration <= 600: | |
segment_length = 45 | |
else: | |
segment_length = 90 | |
scenes = [] | |
num_segments = min(duration // segment_length + 1, 15) | |
for i in range(num_segments): | |
start_time = i * segment_length | |
end_time = min(start_time + segment_length - 1, duration) | |
start_formatted = f"{start_time//60}:{start_time%60:02d}" | |
end_formatted = f"{end_time//60}:{end_time%60:02d}" | |
if i == 0: | |
desc = "Opening sequence with introduction and setup" | |
elif i == num_segments - 1: | |
desc = "Conclusion with final thoughts and call-to-action" | |
else: | |
desc = f"Main content segment {i} with key information and details" | |
scenes.append(f"**[{start_formatted}-{end_formatted}]**: {desc}") | |
return scenes | |
def detect_video_type(self, title, description): | |
"""Detect video type based on title and description""" | |
text = (title + " " + description).lower() | |
if any(word in text for word in ['music', 'song', 'album', 'artist', 'band', 'lyrics']): | |
return "π΅ Music Video" | |
elif any(word in text for word in ['tutorial', 'how to', 'guide', 'learn', 'teaching']): | |
return "π Tutorial/Educational" | |
elif any(word in text for word in ['funny', 'comedy', 'entertainment', 'vlog', 'challenge']): | |
return "π Entertainment/Comedy" | |
elif any(word in text for word in ['news', 'breaking', 'report', 'update']): | |
return "π° News/Information" | |
elif any(word in text for word in ['review', 'unboxing', 'test', 'comparison']): | |
return "β Review/Unboxing" | |
elif any(word in text for word in ['commercial', 'ad', 'brand', 'product']): | |
return "πΊ Commercial/Advertisement" | |
else: | |
return "π¬ General Content" | |
def detect_background_music(self, video_info): | |
"""Detect background music style""" | |
title = video_info.get('title', '').lower() | |
description = video_info.get('description', '').lower() | |
if any(word in title for word in ['music', 'song', 'soundtrack']): | |
return "π΅ Original Music/Soundtrack - Primary audio content" | |
elif any(word in title for word in ['commercial', 'ad', 'brand']): | |
return "πΆ Upbeat Commercial Music - Designed to enhance brand appeal" | |
elif any(word in title for word in ['tutorial', 'how to', 'guide']): | |
return "π Minimal/No Background Music - Focus on instruction" | |
elif any(word in title for word in ['vlog', 'daily', 'life']): | |
return "πΌ Ambient Background Music - Complementary to narration" | |
else: | |
return "π΅ Background Music - Complementing video mood and pacing" | |
def detect_influencer_status(self, video_info): | |
"""Detect influencer status""" | |
subscriber_count = video_info.get('channel_followers', 0) | |
view_count = video_info.get('view_count', 0) | |
if subscriber_count > 10000000: | |
return "π Mega Influencer (10M+ subscribers)" | |
elif subscriber_count > 1000000: | |
return "β Major Influencer (1M+ subscribers)" | |
elif subscriber_count > 100000: | |
return "π― Mid-tier Influencer (100K+ subscribers)" | |
elif subscriber_count > 10000: | |
return "π Micro Influencer (10K+ subscribers)" | |
elif view_count > 100000: | |
return "π₯ Viral Content Creator" | |
else: | |
return "π€ Regular Content Creator" | |
def format_number(self, num): | |
if num is None or num == 0: | |
return "0" | |
if num >= 1_000_000_000: | |
return f"{num/1_000_000_000:.1f}B" | |
elif num >= 1_000_000: | |
return f"{num/1_000_000:.1f}M" | |
elif num >= 1_000: | |
return f"{num/1_000:.1f}K" | |
return str(num) | |
def format_video_info(self, video_info): | |
"""Streamlined video information formatting""" | |
if not video_info: | |
return "β No video information available." | |
# Basic information | |
title = video_info.get("title", "Unknown") | |
uploader = video_info.get("uploader", "Unknown") | |
duration = video_info.get("duration", 0) | |
duration_str = f"{duration//60}:{duration%60:02d}" if duration else "Unknown" | |
view_count = video_info.get("view_count", 0) | |
like_count = video_info.get("like_count", 0) | |
comment_count = video_info.get("comment_count", 0) | |
upload_date = video_info.get("upload_date", "Unknown") | |
# Format upload date | |
if len(upload_date) == 8: | |
formatted_date = f"{upload_date[:4]}-{upload_date[4:6]}-{upload_date[6:8]}" | |
else: | |
formatted_date = upload_date | |
# Generate enhanced analysis | |
scene_descriptions = self.generate_scene_breakdown_gemini(video_info) | |
video_type = self.detect_video_type(title, video_info.get('description', '')) | |
background_music = self.detect_background_music(video_info) | |
influencer_status = self.detect_influencer_status(video_info) | |
# Calculate engagement metrics | |
engagement_rate = (like_count / view_count) * 100 if view_count > 0 else 0 | |
# Generate streamlined report | |
report = f""" | |
π¬ YOUTUBE VIDEO ANALYSIS REPORT | |
{'='*50} | |
π BASIC INFORMATION | |
{'β'*25} | |
πΉ **Title:** {title} | |
π€ **Uploader:** {uploader} | |
π **Upload Date:** {formatted_date} | |
β±οΈ **Duration:** {duration_str} | |
π **Video ID:** {video_info.get('id', 'Unknown')} | |
π PERFORMANCE METRICS | |
{'β'*25} | |
π **Views:** {self.format_number(view_count)} ({view_count:,}) | |
π **Likes:** {self.format_number(like_count)} ({like_count:,}) | |
π¬ **Comments:** {self.format_number(comment_count)} ({comment_count:,}) | |
π **Engagement Rate:** {engagement_rate:.2f}% | |
π― CONTENT ANALYSIS | |
{'β'*25} | |
π **Video Type:** {video_type} | |
π΅ **Background Music:** {background_music} | |
π **Creator Status:** {influencer_status} | |
π¬ DETAILED SCENE BREAKDOWN | |
{'β'*30} | |
{chr(10).join(scene_descriptions)} | |
π DESCRIPTION PREVIEW | |
{'β'*25} | |
{video_info.get('description', 'No description available')[:500]} | |
{'...(truncated)' if len(video_info.get('description', '')) > 500 else ''} | |
{'='*50} | |
π **Analysis completed:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | |
π€ **AI Enhancement:** {'Gemini AI' if self.gemini_model else 'Standard Analysis'} | |
""" | |
return report.strip() | |
def get_video_info(self, url, progress=gr.Progress(), cookiefile=None): | |
"""Extract video information""" | |
if not url or not url.strip(): | |
return None, "β Please enter a YouTube URL" | |
if not self.is_valid_youtube_url(url): | |
return None, "β Invalid YouTube URL format" | |
try: | |
progress(0.1, desc="Initializing YouTube extractor...") | |
ydl_opts = { | |
'noplaylist': True, | |
'extract_flat': False, | |
} | |
if cookiefile and os.path.exists(cookiefile): | |
ydl_opts['cookiefile'] = cookiefile | |
progress(0.5, desc="Extracting video metadata...") | |
with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
info = ydl.extract_info(url, download=False) | |
progress(1.0, desc="β Analysis complete!") | |
return info, "β Video information extracted successfully" | |
except Exception as e: | |
return None, f"β Error: {str(e)}" | |
def download_video(self, url, quality="best", audio_only=False, progress=gr.Progress(), cookiefile=None): | |
"""Download video with progress tracking""" | |
if not url or not url.strip(): | |
return None, "β Please enter a YouTube URL" | |
if not self.is_valid_youtube_url(url): | |
return None, "β Invalid YouTube URL format" | |
try: | |
progress(0.1, desc="Preparing download...") | |
# Create unique filename | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
# Download to temp directory first (Gradio compatible) | |
ydl_opts = { | |
'outtmpl': os.path.join(self.temp_downloads, f'%(title)s_{timestamp}.%(ext)s'), | |
'noplaylist': True, | |
} | |
if audio_only: | |
ydl_opts['format'] = 'bestaudio/best' | |
ydl_opts['postprocessors'] = [{ | |
'key': 'FFmpegExtractAudio', | |
'preferredcodec': 'mp3', | |
'preferredquality': '192', | |
}] | |
else: | |
if quality == "best": | |
ydl_opts['format'] = 'best[height<=1080]' | |
elif quality == "720p": | |
ydl_opts['format'] = 'best[height<=720]' | |
elif quality == "480p": | |
ydl_opts['format'] = 'best[height<=480]' | |
else: | |
ydl_opts['format'] = 'best' | |
if cookiefile and os.path.exists(cookiefile): | |
ydl_opts['cookiefile'] = cookiefile | |
# Progress hook | |
def progress_hook(d): | |
if d['status'] == 'downloading': | |
if 'total_bytes' in d: | |
percent = (d['downloaded_bytes'] / d['total_bytes']) * 100 | |
progress(0.1 + (percent / 100) * 0.7, desc=f"Downloading... {percent:.1f}%") | |
else: | |
progress(0.5, desc="Downloading...") | |
elif d['status'] == 'finished': | |
progress(0.8, desc="Processing download...") | |
ydl_opts['progress_hooks'] = [progress_hook] | |
with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
info = ydl.extract_info(url, download=True) | |
progress(0.9, desc="Copying to Downloads folder...") | |
# Find the downloaded file in temp directory | |
downloaded_file_temp = None | |
for file in os.listdir(self.temp_downloads): | |
if timestamp in file: | |
downloaded_file_temp = os.path.join(self.temp_downloads, file) | |
break | |
if not downloaded_file_temp: | |
return None, "β Downloaded file not found in temp directory" | |
# Copy to user's Downloads folder | |
final_filename = os.path.basename(downloaded_file_temp) | |
final_path = os.path.join(self.downloads_folder, final_filename) | |
try: | |
shutil.copy2(downloaded_file_temp, final_path) | |
copy_success = True | |
except Exception as e: | |
print(f"Warning: Could not copy to Downloads folder: {e}") | |
copy_success = False | |
final_path = "File downloaded to temp location only" | |
progress(1.0, desc="β Download complete!") | |
success_msg = f"""β Download successful! | |
π Temp file (for download): {os.path.basename(downloaded_file_temp)} | |
π Permanent location: {final_path if copy_success else 'Copy failed'} | |
π― File size: {os.path.getsize(downloaded_file_temp) / (1024*1024):.1f} MB""" | |
return downloaded_file_temp, success_msg | |
except Exception as e: | |
return None, f"β Download failed: {str(e)}" | |
# Initialize global downloader | |
downloader = YouTubeDownloader() | |
def configure_api_key(api_key): | |
"""Configure Gemini API key""" | |
if not api_key or not api_key.strip(): | |
return "β Please enter a valid Google API key", gr.update(visible=False) | |
success, message = downloader.configure_gemini(api_key.strip()) | |
if success: | |
return message, gr.update(visible=True) | |
else: | |
return message, gr.update(visible=False) | |
def analyze_with_cookies(url, cookies_file, progress=gr.Progress()): | |
"""Main analysis function""" | |
try: | |
progress(0.05, desc="Starting analysis...") | |
cookiefile = None | |
if cookies_file and os.path.exists(cookies_file): | |
cookiefile = cookies_file | |
info, msg = downloader.get_video_info(url, progress=progress, cookiefile=cookiefile) | |
if info: | |
progress(0.95, desc="Generating comprehensive report...") | |
formatted_info = downloader.format_video_info(info) | |
progress(1.0, desc="β Complete!") | |
return formatted_info | |
else: | |
return f"β Analysis Failed: {msg}" | |
except Exception as e: | |
return f"β System Error: {str(e)}" | |
def download_with_cookies(url, quality, audio_only, cookies_file, progress=gr.Progress()): | |
"""Main download function""" | |
try: | |
progress(0.05, desc="Preparing download...") | |
cookiefile = None | |
if cookies_file and os.path.exists(cookies_file): | |
cookiefile = cookies_file | |
file_path, msg = downloader.download_video(url, quality, audio_only, progress=progress, cookiefile=cookiefile) | |
if file_path: | |
return file_path, msg | |
else: | |
return None, msg | |
except Exception as e: | |
return None, f"β System Error: {str(e)}" | |
def create_interface(): | |
"""Create and configure the Gradio interface""" | |
with gr.Blocks(theme=gr.themes.Soft(), title="π₯ YouTube Video Analyzer & Downloader Pro") as interface: | |
gr.HTML("<h1>π₯ YouTube Video Analyzer & Downloader Pro</h1>") | |
# API Key Configuration Section | |
with gr.Group(): | |
gr.HTML("<h3>π Google Gemini API Configuration</h3>") | |
with gr.Row(): | |
api_key_input = gr.Textbox( | |
label="π Google API Key", | |
placeholder="Enter your Google API Key for enhanced AI analysis...", | |
type="password", | |
value="" | |
) | |
configure_btn = gr.Button("π§ Configure API", variant="secondary") | |
api_status = gr.Textbox( | |
label="API Status", | |
value="β Gemini API not configured - Using fallback analysis", | |
interactive=False, | |
lines=1 | |
) | |
# Main Interface (initially hidden until API is configured) | |
main_interface = gr.Group(visible=False) | |
with main_interface: | |
with gr.Row(): | |
url_input = gr.Textbox( | |
label="π YouTube URL", | |
placeholder="Paste your YouTube video URL here...", | |
value="" | |
) | |
cookies_input = gr.File( | |
label="πͺ Upload cookies.txt (Optional)", | |
file_types=[".txt"], | |
type="filepath" | |
) | |
with gr.Tabs(): | |
with gr.TabItem("π Video Analysis"): | |
analyze_btn = gr.Button("π Analyze Video", variant="primary") | |
analysis_output = gr.Textbox( | |
label="π Analysis Report", | |
lines=25, | |
show_copy_button=True | |
) | |
analyze_btn.click( | |
fn=analyze_with_cookies, | |
inputs=[url_input, cookies_input], | |
outputs=analysis_output, | |
show_progress=True | |
) | |
with gr.TabItem("β¬οΈ Video Download"): | |
with gr.Row(): | |
quality_dropdown = gr.Dropdown( | |
choices=["best", "720p", "480p"], | |
value="best", | |
label="πΊ Video Quality" | |
) | |
audio_only_checkbox = gr.Checkbox( | |
label="π΅ Audio Only (MP3)", | |
value=False | |
) | |
download_btn = gr.Button("β¬οΈ Download Video", variant="primary") | |
download_status = gr.Textbox( | |
label="π₯ Download Status", | |
lines=5, | |
show_copy_button=True | |
) | |
download_file = gr.File( | |
label="π Downloaded File", | |
visible=False | |
) | |
def download_and_update(url, quality, audio_only, cookies_file, progress=gr.Progress()): | |
file_path, status = download_with_cookies(url, quality, audio_only, cookies_file, progress) | |
if file_path and os.path.exists(file_path): | |
return status, gr.update(value=file_path, visible=True) | |
else: | |
return status, gr.update(visible=False) | |
download_btn.click( | |
fn=download_and_update, | |
inputs=[url_input, quality_dropdown, audio_only_checkbox, cookies_input], | |
outputs=[download_status, download_file], | |
show_progress=True | |
) | |
# Configure API key button action | |
configure_btn.click( | |
fn=configure_api_key, | |
inputs=[api_key_input], | |
outputs=[api_status, main_interface] | |
) | |
# Always show interface option (for fallback mode) | |
with gr.Row(): | |
show_interface_btn = gr.Button("π Use Without Gemini API (Fallback Mode)", variant="secondary") | |
def show_fallback_interface(): | |
return "β οΈ Using fallback analysis mode", gr.update(visible=True) | |
show_interface_btn.click( | |
fn=show_fallback_interface, | |
outputs=[api_status, main_interface] | |
) | |
gr.HTML(""" | |
<div style="margin-top: 20px; padding: 15px; background-color: #f0f8ff; border-radius: 10px; border-left: 5px solid #4285f4;"> | |
<h3>π How to Get Google API Key:</h3> | |
<ol> | |
<li>Go to <a href="https://console.cloud.google.com/" target="_blank">Google Cloud Console</a></li> | |
<li>Create a new project or select an existing one</li> | |
<li>Enable the "Generative Language API"</li> | |
<li>Go to "Credentials" and create an API key</li> | |
<li>Copy the API key and paste it above</li> | |
</ol> | |
<p><strong>β¨ Benefits of using Gemini API:</strong></p> | |
<ul> | |
<li>π€ AI-powered scene descriptions with contextual understanding</li> | |
<li>π― More accurate content type detection</li> | |
<li>π Enhanced analysis based on video content</li> | |
<li>β° Intelligent timestamp segmentation</li> | |
</ul> | |
</div> | |
""") | |
return interface | |
if __name__ == "__main__": | |
demo = create_interface() | |
import atexit | |
atexit.register(downloader.cleanup) | |
demo.launch(debug=True, show_error=True) |