|
import gradio as gr |
|
import yt_dlp |
|
import os |
|
import tempfile |
|
import shutil |
|
from pathlib import Path |
|
import re |
|
import uuid |
|
import json |
|
from datetime import datetime |
|
import google.generativeai as genai |
|
|
|
class YouTubeDownloader: |
|
def __init__(self): |
|
self.download_dir = tempfile.mkdtemp() |
|
|
|
self.temp_downloads = tempfile.mkdtemp(prefix="youtube_downloads_") |
|
|
|
self.downloads_folder = os.path.join(os.path.expanduser("~"), "Downloads", "YouTube_Downloads") |
|
os.makedirs(self.downloads_folder, exist_ok=True) |
|
self.gemini_model = None |
|
|
|
def configure_gemini(self, api_key): |
|
"""Configure Gemini API with the provided key""" |
|
try: |
|
genai.configure(api_key=api_key) |
|
self.gemini_model = genai.GenerativeModel(model_name="gemini-1.5-flash-latest") |
|
return True, "β
Gemini API configured successfully!" |
|
except Exception as e: |
|
return False, f"β Failed to configure Gemini API: {str(e)}" |
|
|
|
def cleanup(self): |
|
"""Clean up temporary directories and files""" |
|
try: |
|
if hasattr(self, 'download_dir') and os.path.exists(self.download_dir): |
|
shutil.rmtree(self.download_dir) |
|
print(f"β
Cleaned up temporary directory: {self.download_dir}") |
|
if hasattr(self, 'temp_downloads') and os.path.exists(self.temp_downloads): |
|
shutil.rmtree(self.temp_downloads) |
|
print(f"β
Cleaned up temp downloads directory: {self.temp_downloads}") |
|
except Exception as e: |
|
print(f"β οΈ Warning: Could not clean up temporary directory: {e}") |
|
|
|
def is_valid_youtube_url(self, url): |
|
youtube_regex = re.compile( |
|
r'(https?://)?(www\.)?(youtube|youtu|youtube-nocookie)\.(com|be)/' |
|
r'(watch\?v=|embed/|v/|.+\?v=)?([^&=%\?]{11})' |
|
) |
|
return youtube_regex.match(url) is not None |
|
|
|
def generate_scene_breakdown_gemini(self, video_info): |
|
"""Generate AI-powered scene breakdown using Gemini""" |
|
if not self.gemini_model: |
|
return self.generate_scene_breakdown_fallback(video_info) |
|
|
|
try: |
|
duration = video_info.get('duration', 0) |
|
title = video_info.get('title', '') |
|
description = video_info.get('description', '')[:1000] |
|
|
|
if not duration: |
|
return ["**[Duration Unknown]**: Unable to generate timestamped breakdown - video duration not available"] |
|
|
|
|
|
prompt = f""" |
|
Analyze this YouTube video and create a detailed scene-by-scene breakdown with timestamps: |
|
|
|
Title: {title} |
|
Duration: {duration} seconds |
|
Description: {description} |
|
|
|
Please provide a scene breakdown with the following format: |
|
- Divide the video into logical segments based on typical content flow |
|
- For videos under 2 minutes: 10-15 second segments |
|
- For videos 2-10 minutes: 30-45 second segments |
|
- For videos over 10 minutes: 60-90 second segments |
|
- Maximum 15 scenes total |
|
|
|
For each scene, provide: |
|
**[START_TIME-END_TIME]**: Detailed description of what likely happens in this segment, including visual elements, audio cues, potential dialogue or narration, and scene transitions. |
|
|
|
Consider the video type (tutorial, music video, vlog, etc.) and provide contextually appropriate descriptions. |
|
Format timestamps as MM:SS. |
|
""" |
|
|
|
response = self.gemini_model.generate_content(prompt) |
|
|
|
|
|
if response and response.text: |
|
scenes = [] |
|
lines = response.text.split('\n') |
|
for line in lines: |
|
line = line.strip() |
|
if line and ('**[' in line or line.startswith('*')): |
|
scenes.append(line) |
|
|
|
return scenes if scenes else self.generate_scene_breakdown_fallback(video_info) |
|
else: |
|
return self.generate_scene_breakdown_fallback(video_info) |
|
|
|
except Exception as e: |
|
print(f"Gemini API error: {e}") |
|
return self.generate_scene_breakdown_fallback(video_info) |
|
|
|
def generate_scene_breakdown_fallback(self, video_info): |
|
"""Fallback scene generation when Gemini is not available""" |
|
duration = video_info.get('duration', 0) |
|
title = video_info.get('title', '').lower() |
|
|
|
if not duration: |
|
return ["**[Duration Unknown]**: Unable to generate timestamped breakdown"] |
|
|
|
|
|
if duration <= 120: |
|
segment_length = 15 |
|
elif duration <= 600: |
|
segment_length = 45 |
|
else: |
|
segment_length = 90 |
|
|
|
scenes = [] |
|
num_segments = min(duration // segment_length + 1, 15) |
|
|
|
for i in range(num_segments): |
|
start_time = i * segment_length |
|
end_time = min(start_time + segment_length - 1, duration) |
|
|
|
start_formatted = f"{start_time//60}:{start_time%60:02d}" |
|
end_formatted = f"{end_time//60}:{end_time%60:02d}" |
|
|
|
if i == 0: |
|
desc = "Opening sequence with introduction and setup" |
|
elif i == num_segments - 1: |
|
desc = "Conclusion with final thoughts and call-to-action" |
|
else: |
|
desc = f"Main content segment {i} with key information and details" |
|
|
|
scenes.append(f"**[{start_formatted}-{end_formatted}]**: {desc}") |
|
|
|
return scenes |
|
|
|
def detect_video_type(self, title, description): |
|
"""Detect video type based on title and description""" |
|
text = (title + " " + description).lower() |
|
|
|
if any(word in text for word in ['music', 'song', 'album', 'artist', 'band', 'lyrics']): |
|
return "π΅ Music Video" |
|
elif any(word in text for word in ['tutorial', 'how to', 'guide', 'learn', 'teaching']): |
|
return "π Tutorial/Educational" |
|
elif any(word in text for word in ['funny', 'comedy', 'entertainment', 'vlog', 'challenge']): |
|
return "π Entertainment/Comedy" |
|
elif any(word in text for word in ['news', 'breaking', 'report', 'update']): |
|
return "π° News/Information" |
|
elif any(word in text for word in ['review', 'unboxing', 'test', 'comparison']): |
|
return "β Review/Unboxing" |
|
elif any(word in text for word in ['commercial', 'ad', 'brand', 'product']): |
|
return "πΊ Commercial/Advertisement" |
|
else: |
|
return "π¬ General Content" |
|
|
|
def detect_background_music(self, video_info): |
|
"""Detect background music style""" |
|
title = video_info.get('title', '').lower() |
|
description = video_info.get('description', '').lower() |
|
|
|
if any(word in title for word in ['music', 'song', 'soundtrack']): |
|
return "π΅ Original Music/Soundtrack - Primary audio content" |
|
elif any(word in title for word in ['commercial', 'ad', 'brand']): |
|
return "πΆ Upbeat Commercial Music - Designed to enhance brand appeal" |
|
elif any(word in title for word in ['tutorial', 'how to', 'guide']): |
|
return "π Minimal/No Background Music - Focus on instruction" |
|
elif any(word in title for word in ['vlog', 'daily', 'life']): |
|
return "πΌ Ambient Background Music - Complementary to narration" |
|
else: |
|
return "π΅ Background Music - Complementing video mood and pacing" |
|
|
|
def detect_influencer_status(self, video_info): |
|
"""Detect influencer status""" |
|
subscriber_count = video_info.get('channel_followers', 0) |
|
view_count = video_info.get('view_count', 0) |
|
|
|
if subscriber_count > 10000000: |
|
return "π Mega Influencer (10M+ subscribers)" |
|
elif subscriber_count > 1000000: |
|
return "β Major Influencer (1M+ subscribers)" |
|
elif subscriber_count > 100000: |
|
return "π― Mid-tier Influencer (100K+ subscribers)" |
|
elif subscriber_count > 10000: |
|
return "π Micro Influencer (10K+ subscribers)" |
|
elif view_count > 100000: |
|
return "π₯ Viral Content Creator" |
|
else: |
|
return "π€ Regular Content Creator" |
|
|
|
def format_number(self, num): |
|
if num is None or num == 0: |
|
return "0" |
|
if num >= 1_000_000_000: |
|
return f"{num/1_000_000_000:.1f}B" |
|
elif num >= 1_000_000: |
|
return f"{num/1_000_000:.1f}M" |
|
elif num >= 1_000: |
|
return f"{num/1_000:.1f}K" |
|
return str(num) |
|
|
|
def format_video_info(self, video_info): |
|
"""Streamlined video information formatting""" |
|
if not video_info: |
|
return "β No video information available." |
|
|
|
|
|
title = video_info.get("title", "Unknown") |
|
uploader = video_info.get("uploader", "Unknown") |
|
duration = video_info.get("duration", 0) |
|
duration_str = f"{duration//60}:{duration%60:02d}" if duration else "Unknown" |
|
view_count = video_info.get("view_count", 0) |
|
like_count = video_info.get("like_count", 0) |
|
comment_count = video_info.get("comment_count", 0) |
|
upload_date = video_info.get("upload_date", "Unknown") |
|
|
|
|
|
if len(upload_date) == 8: |
|
formatted_date = f"{upload_date[:4]}-{upload_date[4:6]}-{upload_date[6:8]}" |
|
else: |
|
formatted_date = upload_date |
|
|
|
|
|
scene_descriptions = self.generate_scene_breakdown_gemini(video_info) |
|
video_type = self.detect_video_type(title, video_info.get('description', '')) |
|
background_music = self.detect_background_music(video_info) |
|
influencer_status = self.detect_influencer_status(video_info) |
|
|
|
|
|
engagement_rate = (like_count / view_count) * 100 if view_count > 0 else 0 |
|
|
|
|
|
report = f""" |
|
π¬ YOUTUBE VIDEO ANALYSIS REPORT |
|
{'='*50} |
|
|
|
π BASIC INFORMATION |
|
{'β'*25} |
|
πΉ **Title:** {title} |
|
π€ **Uploader:** {uploader} |
|
π
**Upload Date:** {formatted_date} |
|
β±οΈ **Duration:** {duration_str} |
|
π **Video ID:** {video_info.get('id', 'Unknown')} |
|
|
|
π PERFORMANCE METRICS |
|
{'β'*25} |
|
π **Views:** {self.format_number(view_count)} ({view_count:,}) |
|
π **Likes:** {self.format_number(like_count)} ({like_count:,}) |
|
π¬ **Comments:** {self.format_number(comment_count)} ({comment_count:,}) |
|
π **Engagement Rate:** {engagement_rate:.2f}% |
|
|
|
π― CONTENT ANALYSIS |
|
{'β'*25} |
|
π **Video Type:** {video_type} |
|
π΅ **Background Music:** {background_music} |
|
π **Creator Status:** {influencer_status} |
|
|
|
π¬ DETAILED SCENE BREAKDOWN |
|
{'β'*30} |
|
{chr(10).join(scene_descriptions)} |
|
|
|
π DESCRIPTION PREVIEW |
|
{'β'*25} |
|
{video_info.get('description', 'No description available')[:500]} |
|
{'...(truncated)' if len(video_info.get('description', '')) > 500 else ''} |
|
|
|
{'='*50} |
|
π **Analysis completed:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} |
|
π€ **AI Enhancement:** {'Gemini AI' if self.gemini_model else 'Standard Analysis'} |
|
""" |
|
|
|
return report.strip() |
|
|
|
def get_video_info(self, url, progress=gr.Progress(), cookiefile=None): |
|
"""Extract video information""" |
|
if not url or not url.strip(): |
|
return None, "β Please enter a YouTube URL" |
|
|
|
if not self.is_valid_youtube_url(url): |
|
return None, "β Invalid YouTube URL format" |
|
|
|
try: |
|
progress(0.1, desc="Initializing YouTube extractor...") |
|
|
|
ydl_opts = { |
|
'noplaylist': True, |
|
'extract_flat': False, |
|
} |
|
|
|
if cookiefile and os.path.exists(cookiefile): |
|
ydl_opts['cookiefile'] = cookiefile |
|
|
|
progress(0.5, desc="Extracting video metadata...") |
|
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
info = ydl.extract_info(url, download=False) |
|
|
|
progress(1.0, desc="β
Analysis complete!") |
|
|
|
return info, "β
Video information extracted successfully" |
|
|
|
except Exception as e: |
|
return None, f"β Error: {str(e)}" |
|
|
|
def download_video(self, url, quality="best", audio_only=False, progress=gr.Progress(), cookiefile=None): |
|
"""Download video with progress tracking""" |
|
if not url or not url.strip(): |
|
return None, "β Please enter a YouTube URL" |
|
|
|
if not self.is_valid_youtube_url(url): |
|
return None, "β Invalid YouTube URL format" |
|
|
|
try: |
|
progress(0.1, desc="Preparing download...") |
|
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
|
|
|
ydl_opts = { |
|
'outtmpl': os.path.join(self.temp_downloads, f'%(title)s_{timestamp}.%(ext)s'), |
|
'noplaylist': True, |
|
} |
|
|
|
if audio_only: |
|
ydl_opts['format'] = 'bestaudio/best' |
|
ydl_opts['postprocessors'] = [{ |
|
'key': 'FFmpegExtractAudio', |
|
'preferredcodec': 'mp3', |
|
'preferredquality': '192', |
|
}] |
|
else: |
|
if quality == "best": |
|
ydl_opts['format'] = 'best[height<=1080]' |
|
elif quality == "720p": |
|
ydl_opts['format'] = 'best[height<=720]' |
|
elif quality == "480p": |
|
ydl_opts['format'] = 'best[height<=480]' |
|
else: |
|
ydl_opts['format'] = 'best' |
|
|
|
if cookiefile and os.path.exists(cookiefile): |
|
ydl_opts['cookiefile'] = cookiefile |
|
|
|
|
|
def progress_hook(d): |
|
if d['status'] == 'downloading': |
|
if 'total_bytes' in d: |
|
percent = (d['downloaded_bytes'] / d['total_bytes']) * 100 |
|
progress(0.1 + (percent / 100) * 0.7, desc=f"Downloading... {percent:.1f}%") |
|
else: |
|
progress(0.5, desc="Downloading...") |
|
elif d['status'] == 'finished': |
|
progress(0.8, desc="Processing download...") |
|
|
|
ydl_opts['progress_hooks'] = [progress_hook] |
|
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
info = ydl.extract_info(url, download=True) |
|
|
|
progress(0.9, desc="Copying to Downloads folder...") |
|
|
|
|
|
downloaded_file_temp = None |
|
|
|
for file in os.listdir(self.temp_downloads): |
|
if timestamp in file: |
|
downloaded_file_temp = os.path.join(self.temp_downloads, file) |
|
break |
|
|
|
if not downloaded_file_temp: |
|
return None, "β Downloaded file not found in temp directory" |
|
|
|
|
|
final_filename = os.path.basename(downloaded_file_temp) |
|
final_path = os.path.join(self.downloads_folder, final_filename) |
|
|
|
try: |
|
shutil.copy2(downloaded_file_temp, final_path) |
|
copy_success = True |
|
except Exception as e: |
|
print(f"Warning: Could not copy to Downloads folder: {e}") |
|
copy_success = False |
|
final_path = "File downloaded to temp location only" |
|
|
|
progress(1.0, desc="β
Download complete!") |
|
|
|
success_msg = f"""β
Download successful! |
|
π Temp file (for download): {os.path.basename(downloaded_file_temp)} |
|
π Permanent location: {final_path if copy_success else 'Copy failed'} |
|
π― File size: {os.path.getsize(downloaded_file_temp) / (1024*1024):.1f} MB""" |
|
|
|
return downloaded_file_temp, success_msg |
|
|
|
except Exception as e: |
|
return None, f"β Download failed: {str(e)}" |
|
|
|
|
|
downloader = YouTubeDownloader() |
|
|
|
def configure_api_key(api_key): |
|
"""Configure Gemini API key""" |
|
if not api_key or not api_key.strip(): |
|
return "β Please enter a valid Google API key", gr.update(visible=False) |
|
|
|
success, message = downloader.configure_gemini(api_key.strip()) |
|
|
|
if success: |
|
return message, gr.update(visible=True) |
|
else: |
|
return message, gr.update(visible=False) |
|
|
|
def analyze_with_cookies(url, cookies_file, progress=gr.Progress()): |
|
"""Main analysis function""" |
|
try: |
|
progress(0.05, desc="Starting analysis...") |
|
|
|
cookiefile = None |
|
if cookies_file and os.path.exists(cookies_file): |
|
cookiefile = cookies_file |
|
|
|
info, msg = downloader.get_video_info(url, progress=progress, cookiefile=cookiefile) |
|
|
|
if info: |
|
progress(0.95, desc="Generating comprehensive report...") |
|
formatted_info = downloader.format_video_info(info) |
|
progress(1.0, desc="β
Complete!") |
|
return formatted_info |
|
else: |
|
return f"β Analysis Failed: {msg}" |
|
|
|
except Exception as e: |
|
return f"β System Error: {str(e)}" |
|
|
|
def download_with_cookies(url, quality, audio_only, cookies_file, progress=gr.Progress()): |
|
"""Main download function""" |
|
try: |
|
progress(0.05, desc="Preparing download...") |
|
|
|
cookiefile = None |
|
if cookies_file and os.path.exists(cookies_file): |
|
cookiefile = cookies_file |
|
|
|
file_path, msg = downloader.download_video(url, quality, audio_only, progress=progress, cookiefile=cookiefile) |
|
|
|
if file_path: |
|
return file_path, msg |
|
else: |
|
return None, msg |
|
|
|
except Exception as e: |
|
return None, f"β System Error: {str(e)}" |
|
|
|
def create_interface(): |
|
"""Create and configure the Gradio interface""" |
|
with gr.Blocks(theme=gr.themes.Soft(), title="π₯ YouTube Video Analyzer & Downloader Pro") as interface: |
|
|
|
gr.HTML("<h1>π₯ YouTube Video Analyzer & Downloader Pro</h1>") |
|
|
|
|
|
with gr.Group(): |
|
gr.HTML("<h3>π Google Gemini API Configuration</h3>") |
|
with gr.Row(): |
|
api_key_input = gr.Textbox( |
|
label="π Google API Key", |
|
placeholder="Enter your Google API Key for enhanced AI analysis...", |
|
type="password", |
|
value="" |
|
) |
|
configure_btn = gr.Button("π§ Configure API", variant="secondary") |
|
|
|
api_status = gr.Textbox( |
|
label="API Status", |
|
value="β Gemini API not configured - Using fallback analysis", |
|
interactive=False, |
|
lines=1 |
|
) |
|
|
|
|
|
main_interface = gr.Group(visible=False) |
|
|
|
with main_interface: |
|
with gr.Row(): |
|
url_input = gr.Textbox( |
|
label="π YouTube URL", |
|
placeholder="Paste your YouTube video URL here...", |
|
value="" |
|
) |
|
|
|
cookies_input = gr.File( |
|
label="πͺ Upload cookies.txt (Optional)", |
|
file_types=[".txt"], |
|
type="filepath" |
|
) |
|
|
|
with gr.Tabs(): |
|
with gr.TabItem("π Video Analysis"): |
|
analyze_btn = gr.Button("π Analyze Video", variant="primary") |
|
|
|
analysis_output = gr.Textbox( |
|
label="π Analysis Report", |
|
lines=25, |
|
show_copy_button=True |
|
) |
|
|
|
analyze_btn.click( |
|
fn=analyze_with_cookies, |
|
inputs=[url_input, cookies_input], |
|
outputs=analysis_output, |
|
show_progress=True |
|
) |
|
|
|
with gr.TabItem("β¬οΈ Video Download"): |
|
with gr.Row(): |
|
quality_dropdown = gr.Dropdown( |
|
choices=["best", "720p", "480p"], |
|
value="best", |
|
label="πΊ Video Quality" |
|
) |
|
|
|
audio_only_checkbox = gr.Checkbox( |
|
label="π΅ Audio Only (MP3)", |
|
value=False |
|
) |
|
|
|
download_btn = gr.Button("β¬οΈ Download Video", variant="primary") |
|
|
|
download_status = gr.Textbox( |
|
label="π₯ Download Status", |
|
lines=5, |
|
show_copy_button=True |
|
) |
|
|
|
download_file = gr.File( |
|
label="π Downloaded File", |
|
visible=False |
|
) |
|
|
|
def download_and_update(url, quality, audio_only, cookies_file, progress=gr.Progress()): |
|
file_path, status = download_with_cookies(url, quality, audio_only, cookies_file, progress) |
|
if file_path and os.path.exists(file_path): |
|
return status, gr.update(value=file_path, visible=True) |
|
else: |
|
return status, gr.update(visible=False) |
|
|
|
download_btn.click( |
|
fn=download_and_update, |
|
inputs=[url_input, quality_dropdown, audio_only_checkbox, cookies_input], |
|
outputs=[download_status, download_file], |
|
show_progress=True |
|
) |
|
|
|
|
|
configure_btn.click( |
|
fn=configure_api_key, |
|
inputs=[api_key_input], |
|
outputs=[api_status, main_interface] |
|
) |
|
|
|
|
|
with gr.Row(): |
|
show_interface_btn = gr.Button("π Use Without Gemini API (Fallback Mode)", variant="secondary") |
|
|
|
def show_fallback_interface(): |
|
return "β οΈ Using fallback analysis mode", gr.update(visible=True) |
|
|
|
show_interface_btn.click( |
|
fn=show_fallback_interface, |
|
outputs=[api_status, main_interface] |
|
) |
|
|
|
gr.HTML(""" |
|
<div style="margin-top: 20px; padding: 15px; background-color: #f0f8ff; border-radius: 10px; border-left: 5px solid #4285f4;"> |
|
<h3>π How to Get Google API Key:</h3> |
|
<ol> |
|
<li>Go to <a href="https://console.cloud.google.com/" target="_blank">Google Cloud Console</a></li> |
|
<li>Create a new project or select an existing one</li> |
|
<li>Enable the "Generative Language API"</li> |
|
<li>Go to "Credentials" and create an API key</li> |
|
<li>Copy the API key and paste it above</li> |
|
</ol> |
|
<p><strong>β¨ Benefits of using Gemini API:</strong></p> |
|
<ul> |
|
<li>π€ AI-powered scene descriptions with contextual understanding</li> |
|
<li>π― More accurate content type detection</li> |
|
<li>π Enhanced analysis based on video content</li> |
|
<li>β° Intelligent timestamp segmentation</li> |
|
</ul> |
|
</div> |
|
""") |
|
|
|
return interface |
|
|
|
if __name__ == "__main__": |
|
demo = create_interface() |
|
import atexit |
|
atexit.register(downloader.cleanup) |
|
demo.launch(debug=True, show_error=True) |