Spaces:
Build error
Build error
import gradio as gr | |
import google.generativeai as genai | |
from parler_tts import ParlerTTSForConditionalGeneration | |
from transformers import AutoTokenizer | |
import soundfile as sf | |
import requests | |
from bs4 import BeautifulSoup | |
from fake_useragent import UserAgent | |
from moviepy.editor import (ImageClip, AudioFileClip, concatenate_audioclips, | |
concatenate_videoclips, CompositeVideoClip, TextClip, | |
VideoFileClip, vfx) # Added VideoFileClip and vfx | |
from googleapiclient.discovery import build | |
import yt_dlp | |
import os | |
import re | |
import time | |
import shutil | |
import random | |
from dotenv import load_dotenv | |
from urllib.parse import quote_plus | |
# --- CONFIGURATION --- | |
load_dotenv() # Load environment variables from .env file | |
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") | |
YOUTUBE_API_KEY = os.getenv("YOUTUBE_API_KEY") | |
if not GEMINI_API_KEY: | |
print("WARNING: GEMINI_API_KEY not found in .env file or environment.") | |
if not YOUTUBE_API_KEY: | |
print("WARNING: YOUTUBE_API_KEY not found in .env file or environment.") | |
TEMP_DIR = "temp_files_youtube_creator" # Unique temp dir name | |
SPEAKER_DESCRIPTION_FOR_TTS = "A clear, engaging, and expressive male voice with a standard American accent, speaking at a moderate pace. The recording is of high quality with minimal background noise." | |
IMAGES_PER_SEGMENT = 1 | |
VIDEO_WIDTH = 1280 # Adjusted for faster processing, 1920x1080 is also good | |
VIDEO_HEIGHT = 720 | |
VIDEO_FPS = 24 | |
MAX_SCRIPT_SEGMENTS_FOR_DEMO = 5 # To keep processing time reasonable for Gradio | |
# --- END CONFIGURATION --- | |
# --- Initialize Models (Global for efficiency if Gradio doesn't reload everything) --- | |
gemini_model = None | |
parler_model = None | |
parler_tokenizer = None | |
parler_description_tokenizer = None | |
youtube_service = None | |
ua = UserAgent() | |
def initialize_models(): | |
global gemini_model, parler_model, parler_tokenizer, parler_description_tokenizer, youtube_service | |
if GEMINI_API_KEY and gemini_model is None: | |
try: | |
genai.configure(api_key=GEMINI_API_KEY) | |
gemini_model = genai.GenerativeModel("gemini-1.5-flash-latest") # Using latest flash | |
print("Gemini model initialized.") | |
except Exception as e: | |
print(f"Error initializing Gemini model: {e}") | |
gemini_model = None # Ensure it's None if init fails | |
if parler_model is None: | |
try: | |
print("Loading Parler-TTS models...") | |
device = "cuda:0" if torch.cuda.is_available() else "cpu" | |
parler_model = ParlerTTSForConditionalGeneration.from_pretrained("parler-tts/parler-tts-mini-v1.1").to(device) | |
parler_tokenizer = AutoTokenizer.from_pretrained("parler-tts/parler-tts-mini-v1.1") | |
parler_description_tokenizer = AutoTokenizer.from_pretrained(parler_model.config.text_encoder._name_or_path) | |
print("Parler-TTS models loaded.") | |
except Exception as e: | |
print(f"Error initializing Parler-TTS models: {e}") | |
parler_model = None | |
if YOUTUBE_API_KEY and youtube_service is None: | |
try: | |
youtube_service = build('youtube', 'v3', developerKey=YOUTUBE_API_KEY) | |
print("YouTube service initialized.") | |
except Exception as e: | |
print(f"Error initializing YouTube service: {e}") | |
youtube_service = None | |
# Call initialization at the start | |
initialize_models() | |
# --- Prompts --- | |
def get_idea_generation_prompt_template(niche): | |
return f""" | |
Generate 5 diverse and highly engaging YouTube video ideas for the niche: '{niche}'. | |
For each idea, provide: | |
1. **Title:** A very catchy, short, and SEO-friendly Title (max 10 words). | |
2. **Description:** A compelling 1-2 sentence hook. | |
3. **Keywords:** 3-5 specific keywords for YouTube search. | |
Format each idea clearly, separated by '---'. | |
Example: | |
Title: Zen Masters' Morning Secrets | |
Description: Unlock ancient Zen rituals for a peaceful and productive morning. Transform your day before it even begins! | |
Keywords: zen, morning routine, mindfulness, productivity, meditation | |
--- | |
""" | |
def get_viral_selection_prompt_template(ideas_text): | |
return f""" | |
Analyze the following YouTube video ideas. Select the ONE idea with the highest potential for virality and broad appeal within its niche. | |
Consider factors like curiosity gap, emotional impact, and shareability. | |
Provide ONLY the Title of the selected idea. No extra text. | |
Video Ideas: | |
{ideas_text} | |
Most Viral Title: | |
""" | |
def get_script_generation_prompt_template(title, description, target_duration_seconds=60): # Shorter for demo | |
return f""" | |
Create a captivating YouTube video script for: | |
Title: "{title}" | |
Description: "{description}" | |
The script should be for a video of approximately {target_duration_seconds} seconds. | |
Break it into distinct scenes/segments. For each scene: | |
1. **VOICEOVER:** (The text to be spoken) | |
2. **IMAGE_KEYWORDS:** [keyword1, keyword2, visual detail] (Suggest 2-3 descriptive keywords for Unsplash image search for this scene) | |
The voiceover should be conversational, engaging, and clear. | |
Include an intro, main points, and a concluding call to action (e.g., subscribe). | |
Each voiceover part should be a few sentences long, suitable for a single visual scene. | |
Example Scene: | |
VOICEOVER: Imagine a world where time slows down, and every moment is an opportunity for peace. [serene landscape, misty mountains, calm lake] | |
--- | |
Script: | |
""" | |
# --- Gemini Handler --- | |
def query_gemini(prompt_text): | |
if not gemini_model: | |
return "Error: Gemini model not initialized. Check API Key." | |
try: | |
response = gemini_model.generate_content(prompt_text) | |
return response.text | |
except Exception as e: | |
return f"Error calling Gemini API: {e}" | |
def parse_generated_ideas(text): | |
ideas = [] | |
# Improved regex to handle variations and ensure all parts are captured | |
idea_blocks = re.split(r'\n\s*---\s*\n', text.strip()) | |
for block in idea_blocks: | |
if not block.strip(): | |
continue | |
title_match = re.search(r"Title:\s*(.*)", block, re.IGNORECASE) | |
desc_match = re.search(r"Description:\s*(.*)", block, re.IGNORECASE) | |
keywords_match = re.search(r"Keywords:\s*(.*)", block, re.IGNORECASE) | |
if title_match and desc_match: | |
title = title_match.group(1).strip() | |
description = desc_match.group(1).strip() | |
keywords_raw = keywords_match.group(1).strip() if keywords_match else "" | |
keywords = [k.strip() for k in keywords_raw.split(',') if k.strip()] | |
ideas.append({"title": title, "description": description, "keywords": keywords}) | |
return ideas | |
def parse_generated_script(text): | |
segments = [] | |
# Regex to capture VOICEOVER and IMAGE_KEYWORDS blocks | |
pattern = re.compile(r"VOICEOVER:\s*(.*?)\s*IMAGE_KEYWORDS:\s*\[(.*?)\]", re.DOTALL | re.IGNORECASE) | |
matches = pattern.findall(text) | |
for vo, kw_str in matches: | |
keywords = [k.strip() for k in kw_str.split(',') if k.strip()] | |
segments.append({ | |
"voiceover": vo.strip(), | |
"image_keywords": keywords if keywords else ["general background"] # Default | |
}) | |
if not segments and "VOICEOVER:" in text: # Fallback if structure is slightly off | |
parts = text.split("---") | |
for part in parts: | |
vo_match = re.search(r"VOICEOVER:\s*(.*)", part, re.DOTALL | re.IGNORECASE) | |
kw_match = re.search(r"IMAGE_KEYWORDS:\s*\[(.*?)\]", part, re.DOTALL | re.IGNORECASE) | |
if vo_match: | |
vo = vo_match.group(1).strip() | |
kws = [] | |
if kw_match: | |
kws = [k.strip() for k in kw_match.group(1).split(',') if k.strip()] | |
segments.append({"voiceover": vo, "image_keywords": kws if kws else ["general background"]}) | |
return segments[:MAX_SCRIPT_SEGMENTS_FOR_DEMO] # Limit for demo | |
# --- TTS Handler --- | |
def text_to_speech(text_prompt, speaker_desc, output_filename="segment_audio.wav"): | |
if not parler_model: | |
return "Error: Parler-TTS model not initialized." | |
device = "cuda:0" if torch.cuda.is_available() else "cpu" | |
try: | |
input_ids = parler_description_tokenizer(speaker_desc, return_tensors="pt").input_ids.to(device) | |
prompt_input_ids = parler_tokenizer(text_prompt, return_tensors="pt").input_ids.to(device) | |
generation = parler_model.generate(input_ids=input_ids, prompt_input_ids=prompt_input_ids, do_sample=True, temperature=0.7, repetition_penalty=1.1) # Added some generation params | |
audio_arr = generation.cpu().numpy().squeeze() | |
full_output_path = os.path.join(TEMP_DIR, "audio_segments", output_filename) | |
sf.write(full_output_path, audio_arr, parler_model.config.sampling_rate) | |
return full_output_path | |
except Exception as e: | |
print(f"Parler-TTS Error for '{text_prompt[:30]}...': {e}") | |
return None | |
# --- Image Scraper (Improved Unsplash Scraper) --- | |
def fetch_unsplash_images(keywords, num_images=1): | |
if not keywords: | |
keywords = ["video background"] # More generic default | |
query = "+".join(quote_plus(k) for k in keywords) # URL encode keywords | |
# Try more specific search, e.g., landscape or portrait based on video aspect ratio | |
# For now, general search | |
search_url = f"https://unsplash.com/s/photos/{query}" | |
image_urls = [] | |
downloaded_image_paths = [] | |
headers = {'User-Agent': ua.random, 'Accept-Language': 'en-US,en;q=0.5'} | |
try: | |
print(f"Searching Unsplash: {search_url}") | |
response = requests.get(search_url, headers=headers, timeout=15) | |
response.raise_for_status() | |
soup = BeautifulSoup(response.content, 'html.parser') | |
# Unsplash structure is dynamic. This is a common pattern. | |
# Look for figure tags, then img tags within them with srcset | |
# Or links that contain '/photos/' | |
# Attempt 1: Figure tags with img having srcset (often high quality) | |
figures = soup.find_all('figure', itemprop="image") | |
for fig in figures: | |
img_tag = fig.find('img', srcset=True) | |
if img_tag: | |
# Get the highest resolution from srcset (often the last one) | |
# Example srcset: url1 300w, url2 600w, url3 1000w | |
srcset_parts = img_tag['srcset'].split(',') | |
best_url = srcset_parts[-1].strip().split(' ')[0] | |
if best_url not in image_urls: | |
image_urls.append(best_url) | |
if len(image_urls) >= num_images * 2: # Fetch a bit more to choose from | |
break | |
# Attempt 2: Links to photo pages (if first attempt fails or yields few) | |
if len(image_urls) < num_images: | |
links = soup.find_all('a', href=True) | |
for link in links: | |
href = link['href'] | |
if href.startswith('/photos/') and 'plus.unsplash.com' not in href: # Avoid premium | |
photo_id = href.split('/')[-1].split('?')[0] | |
# Construct a potential direct image URL (might not always work) | |
# Unsplash often uses source.unsplash.com for direct links by ID | |
direct_img_url = f"https://source.unsplash.com/{photo_id}/{VIDEO_WIDTH}x{VIDEO_HEIGHT}" | |
if direct_img_url not in image_urls: | |
image_urls.append(direct_img_url) | |
if len(image_urls) >= num_images * 2: | |
break | |
# Attempt 3: Generic placeholder if all else fails | |
if not image_urls: | |
print("Using placeholder image as Unsplash scraping yielded no results.") | |
for i in range(num_images): | |
downloaded_image_paths.append(get_placeholder_images(keywords, 1)[0]) # Use the placeholder fn | |
return downloaded_image_paths | |
print(f"Found {len(image_urls)} potential image URLs for '{query}'. Downloading {num_images}...") | |
os.makedirs(os.path.join(TEMP_DIR, "images"), exist_ok=True) | |
selected_urls = random.sample(image_urls, min(num_images, len(image_urls))) | |
for i, img_url in enumerate(selected_urls): | |
try: | |
time.sleep(random.uniform(0.5, 1.5)) # Respectful delay | |
img_response = requests.get(img_url, headers=headers, timeout=10, stream=True) | |
img_response.raise_for_status() | |
# Sanitize filename from keywords | |
safe_keywords = "".join(c if c.isalnum() else "_" for c in "_".join(keywords)) | |
filename = f"unsplash_{safe_keywords}_{i}.jpg" | |
filepath = os.path.join(TEMP_DIR, "images", filename) | |
with open(filepath, 'wb') as f: | |
for chunk in img_response.iter_content(chunk_size=8192): | |
f.write(chunk) | |
downloaded_image_paths.append(filepath) | |
print(f"Downloaded: {filepath}") | |
except Exception as e_img: | |
print(f"Failed to download image {img_url}: {e_img}") | |
except requests.exceptions.RequestException as e_req: | |
print(f"Request error scraping Unsplash for '{query}': {e_req}") | |
except Exception as e_gen: | |
print(f"General error scraping Unsplash: {e_gen}") | |
# If not enough images downloaded, fill with placeholders | |
while len(downloaded_image_paths) < num_images: | |
print("Not enough images from Unsplash, adding placeholder.") | |
placeholder = get_placeholder_images(["generic"], 1) | |
if placeholder: | |
downloaded_image_paths.append(placeholder[0]) | |
else: # Absolute fallback | |
break | |
return downloaded_image_paths | |
# --- Music Handler --- | |
def find_and_download_music(keywords, output_dir=TEMP_DIR): | |
if not youtube_service: | |
return "Error: YouTube service not initialized. Check API Key.", None | |
search_query = " ".join(keywords) + " copyright free instrumental background music" | |
try: | |
search_response = youtube_service.search().list( | |
q=search_query, | |
part='id,snippet', | |
maxResults=5, # Get a few options | |
type='video', | |
videoLicense='creativeCommon' | |
).execute() | |
if not search_response.get('items'): | |
return "No Creative Commons music found on YouTube.", None | |
# Simple selection: pick the first one. Could add logic to pick based on duration, views etc. | |
video = search_response['items'][0] | |
video_id = video['id']['videoId'] | |
video_title = video['snippet']['title'] | |
status_msg = f"Found music: '{video_title}'. Downloading..." | |
print(status_msg) | |
audio_path = os.path.join(output_dir, "background_music.mp3") | |
ydl_opts = { | |
'format': 'bestaudio/best', | |
'outtmpl': audio_path, | |
'postprocessors': [{'key': 'FFmpegExtractAudio', 'preferredcodec': 'mp3', 'preferredquality': '192'}], | |
'quiet': True, 'no_warnings': True | |
} | |
with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
ydl.download([f"https://www.youtube.com/watch?v={video_id}"]) | |
return f"{status_msg} Downloaded to {audio_path}", audio_path | |
except Exception as e: | |
return f"Error fetching/downloading music: {e}", None | |
# --- Video Processor --- | |
def create_video(image_paths, voiceover_audio_paths, script_segments, background_music_path=None): | |
video_clips_list = [] | |
min_segment_duration = 2.0 # Minimum duration for a scene | |
if not image_paths or not voiceover_audio_paths or len(image_paths) != len(voiceover_audio_paths) or len(voiceover_audio_paths) != len(script_segments): | |
ΠΊΠΎΠΌΠΏΠ»Π΅ΠΊΡΠ°ΡΠΈΡ print(f"Warning: Mismatch in number of images ({len(image_paths)}), voiceovers ({len(voiceover_audio_paths)}), or script segments ({len(script_segments)}). Adjusting.") | |
# This needs careful handling. For demo, we'll try to proceed with minimum available. | |
num_segments = min(len(image_paths), len(voiceover_audio_paths), len(script_segments)) | |
if num_segments == 0: | |
return "Error: Not enough assets to create video segments.", None | |
image_paths = image_paths[:num_segments] | |
voiceover_audio_paths = voiceover_audio_paths[:num_segments] | |
script_segments = script_segments[:num_segments] | |
for i in range(len(voiceover_audio_paths)): | |
img_path = image_paths[i] | |
vo_path = voiceover_audio_paths[i] | |
script_text = script_segments[i]['voiceover'] | |
try: | |
audio_clip = AudioFileClip(vo_path) | |
# Ensure segment duration is at least min_segment_duration | |
segment_dur = max(audio_clip.duration, min_segment_duration) | |
# Image with Ken Burns effect (simple zoom and pan) | |
img = (ImageClip(img_path) | |
.set_duration(segment_dur) | |
.resize(height=VIDEO_HEIGHT) # Resize to fit height | |
.set_fps(VIDEO_FPS)) | |
# Make image slightly larger for Ken Burns | |
img_zoomed = img.resize(1.2) # Zoom by 20% | |
# Pan from left to right (or other variations) | |
# img_animated = img_zoomed.set_position(lambda t: (-(img_zoomed.w - VIDEO_WIDTH) * (t / segment_dur), 'center')) | |
# Simpler: Crop to create a slight zoom/pan effect | |
img_animated = img_zoomed.fx(vfx.crop, width=VIDEO_WIDTH, height=VIDEO_HEIGHT, x_center=img_zoomed.w/2, y_center=img_zoomed.h/2) | |
# Subtitle styling (more polished) | |
txt = (TextClip(script_text, fontsize=30, color='yellow', font='Arial-Unicode-MS', # Try a font known for good char support | |
bg_color='rgba(0,0,0,0.5)', size=(VIDEO_WIDTH*0.9, None), | |
method='caption', align='South') | |
.set_duration(audio_clip.duration) # Sync with actual voiceover length | |
.set_start(0) # Start text when audio starts | |
.set_position(('center', 'bottom'))) | |
video_segment = CompositeVideoClip([img_animated, txt], size=(VIDEO_WIDTH, VIDEO_HEIGHT)).set_audio(audio_clip) | |
video_clips_list.append(video_segment) | |
except Exception as e: | |
print(f"Error processing segment {i+1} with image {img_path} and audio {vo_path}: {e}") | |
continue # Skip problematic segment | |
if not video_clips_list: | |
return "Error: No video segments could be created.", None | |
final_vid = concatenate_videoclips(video_clips_list, method="compose", transition=VideoFileClip.crossfadein(0.5)) # Crossfade transition | |
if background_music_path and os.path.exists(background_music_path): | |
music = AudioFileClip(background_music_path).volumex(0.15) # Lower volume | |
if music.duration > final_vid.duration: | |
music = music.subclip(0, final_vid.duration) | |
# Ensure final_vid has an audio track before composing | |
if final_vid.audio is None and video_clips_list and video_clips_list[0].audio: | |
# If concatenate_videoclips dropped audio, re-add from first segment (or combine all) | |
# This can happen if first clip has no audio. Better to combine all VOs first. | |
combined_vo = concatenate_audioclips([vc.audio for vc in video_clips_list if vc.audio]) | |
final_vid = final_vid.set_audio(combined_vo) | |
if final_vid.audio: # Check again | |
final_audio = CompositeAudioClip([final_vid.audio, music]) | |
final_vid = final_vid.set_audio(final_audio) | |
else: | |
print("Warning: Final video has no primary audio track to mix music with.") | |
final_vid = final_vid.set_audio(music) # Use only music if no VOs | |
output_filepath = os.path.join(TEMP_DIR, "final_output_video.mp4") | |
try: | |
final_vid.write_videofile(output_filepath, codec="libx264", audio_codec="aac", fps=VIDEO_FPS, threads=4, preset='medium') # Added threads and preset | |
return f"Video created: {output_filepath}", output_filepath | |
except Exception as e: | |
return f"Error writing final video: {e}", None | |
finally: | |
# Close all clips | |
for clip in video_clips_list: | |
if clip.audio: clip.audio.close() | |
clip.close() | |
if 'music' in locals() and music.reader: music.close() | |
if final_vid.audio: final_vid.audio.close() | |
if final_vid.reader: final_vid.close() | |
# --- Main Gradio Function --- | |
def generate_youtube_video(niche_input, progress=gr.Progress(track_tqdm=True)): | |
if not GEMINI_API_KEY or not YOUTUBE_API_KEY or not parler_model or not youtube_service or not gemini_model: | |
missing = [] | |
if not GEMINI_API_KEY: missing.append("Gemini API Key") | |
if not YOUTUBE_API_KEY: missing.append("YouTube API Key") | |
if not parler_model: missing.append("Parler-TTS models") | |
if not youtube_service: missing.append("YouTube service") | |
if not gemini_model: missing.append("Gemini service") | |
return None, f"ERROR: Required services/API keys not initialized: {', '.join(missing)}. Please check your .env file and console logs." | |
cleanup_temp_files() | |
log_messages = ["Process Started...\n"] | |
progress(0.05, desc="Generating video ideas...") | |
log_messages.append("1. Generating Video Ideas...") | |
ideas_prompt = get_idea_generation_prompt_template(niche_input) | |
raw_ideas_text = query_gemini(ideas_prompt) | |
if "Error:" in raw_ideas_text: | |
log_messages.append(raw_ideas_text) | |
return None, "\n".join(log_messages) | |
parsed_ideas = parse_generated_ideas(raw_ideas_text) | |
if not parsed_ideas: | |
log_messages.append("Error: No ideas parsed from Gemini response.") | |
return None, "\n".join(log_messages) | |
log_messages.append(f"Generated {len(parsed_ideas)} ideas.") | |
# For UI, let's display the ideas (optional) | |
# log_messages.append("Ideas:\n" + "\n".join([f"- {i['title']}" for i in parsed_ideas])) | |
progress(0.15, desc="Selecting viral idea...") | |
log_messages.append("\n2. Selecting Most Viral Idea...") | |
ideas_for_selection_prompt = "\n---\n".join([f"Title: {i['title']}\nDescription: {i['description']}" for i in parsed_ideas]) | |
selection_prompt = get_viral_selection_prompt_template(ideas_for_selection_prompt) | |
selected_title_raw = query_gemini(selection_prompt) | |
if "Error:" in selected_title_raw: | |
log_messages.append(f"Error selecting idea: {selected_title_raw}. Using first idea.") | |
chosen_idea = parsed_ideas[0] | |
else: | |
selected_title = selected_title_raw.replace("Most Viral Title:", "").strip() | |
chosen_idea = next((idea for idea in parsed_ideas if idea["title"].strip().lower() == selected_title.lower()), parsed_ideas[0]) | |
log_messages.append(f"Chosen Idea: '{chosen_idea['title']}'") | |
progress(0.25, desc="Generating script...") | |
log_messages.append(f"\n3. Generating Script for '{chosen_idea['title']}'...") | |
script_prompt = get_script_generation_prompt_template(chosen_idea['title'], chosen_idea['description']) | |
raw_script_text = query_gemini(script_prompt) | |
if "Error:" in raw_script_text: | |
log_messages.append(raw_script_text) | |
Ρ ΠΈΠΌΠΈΡΠ΅ΡΠΊΠΈΠΉ return None, "\n".join(log_messages) | |
script_segments = parse_generated_script(raw_script_text) | |
if not script_segments: | |
log_messages.append("Error: No script segments parsed.") | |
return None, "\n".join(log_messages) | |
log_messages.append(f"Script generated with {len(script_segments)} segments (limited to {MAX_SCRIPT_SEGMENTS_FOR_DEMO} for demo).") | |
progress(0.40, desc="Generating voiceovers...") | |
log_messages.append("\n4. Generating Voiceovers...") | |
voiceover_paths = [] | |
for i, segment in enumerate(progress.tqdm(script_segments, desc="TTS Progress")): | |
vo_text = segment['voiceover'] | |
if not vo_text: continue # Skip if no voiceover text | |
audio_filename = f"segment_{i+1}_audio.wav" | |
path = text_to_speech(vo_text, SPEAKER_DESCRIPTION_FOR_TTS, audio_filename) | |
if path: | |
voiceover_paths.append(path) | |
log_messages.append(f" - Voiceover for segment {i+1} created.") | |
else: | |
log_messages.append(f" - Failed voiceover for segment {i+1}.") | |
if not voiceover_paths or len(voiceover_paths) < len(script_segments): | |
log_messages.append("Warning: Not all voiceovers could be generated.") | |
if not voiceover_paths: | |
return None, "\n".join(log_messages) # Critical failure if NO voiceovers | |
progress(0.60, desc="Fetching images...") | |
log_messages.append("\n5. Fetching Images...") | |
all_image_paths_for_video = [] | |
for i, segment in enumerate(progress.tqdm(script_segments, desc="Image Fetching")): | |
keywords = segment['image_keywords'] | |
if not keywords: keywords = [chosen_idea['title']] # Fallback to title | |
# Fetch one image per segment | |
img_path_list = fetch_unsplash_images(keywords, num_images=IMAGES_PER_SEGMENT) | |
if img_path_list: | |
all_image_paths_for_video.append(img_path_list[0]) # Take the first image found | |
log_messages.append(f" - Image for segment {i+1} using keywords '{', '.join(keywords)}' fetched: {os.path.basename(img_path_list[0])}") | |
else: | |
log_messages.append(f" - No image found for segment {i+1} with keywords '{', '.join(keywords)}'. Using placeholder.") | |
placeholder_img = get_placeholder_images(keywords,1) # Use the function that creates/downloads a placeholder | |
if placeholder_img: | |
all_image_paths_for_video.append(placeholder_img[0]) | |
else: # Absolute fallback | |
log_messages.append(" - CRITICAL: Could not get even a placeholder image. Video might fail.") | |
# For robustness, ensure a default image exists if this happens | |
default_img_path = os.path.join(TEMP_DIR, "images", "default_img.jpg") | |
if not os.path.exists(default_img_path): # Create a dummy if it doesn't exist | |
try: | |
from PIL import Image | |
Image.new('RGB', (VIDEO_WIDTH, VIDEO_HEIGHT), color = 'black').save(default_img_path) | |
all_image_paths_for_video.append(default_img_path) | |
except ImportError: | |
log_messages.append("PIL/Pillow not installed, cannot create dummy image.") | |
return None, "\n".join(log_messages) # Can't proceed without images | |
else: | |
all_image_paths_for_video.append(default_img_path) | |
if len(all_image_paths_for_video) < len(voiceover_paths): | |
log_messages.append("Warning: Not enough images fetched for all voiceover segments. Video might be shorter or reuse images.") | |
# Pad with last image if necessary, or a default | |
while len(all_image_paths_for_video) < len(voiceover_paths) and all_image_paths_for_video: | |
all_image_paths_for_video.append(all_image_paths_for_video[-1]) | |
if not all_image_paths_for_video: # Still no images | |
log_messages.append("Fatal Error: No images available for video creation.") | |
return None, "\n".join(log_messages) | |
progress(0.75, desc="Finding background music...") | |
log_messages.append("\n6. Finding Background Music...") | |
music_search_keywords = chosen_idea.get("keywords", []) + [niche_input, "cinematic", "calm"] | |
music_status, music_file_path = find_and_download_music(music_search_keywords) | |
log_messages.append(f" - {music_status}") | |
progress(0.85, desc="Assembling video...") | |
log_messages.append("\n7. Assembling Video...") | |
# Make sure number of images matches number of VOs for the video processor | |
# The video processor already has some logic, but let's be explicit here | |
final_images = all_image_paths_for_video[:len(voiceover_paths)] | |
video_status, final_video_path = create_video(final_images, voiceover_paths, script_segments, music_file_path) | |
log_messages.append(f" - {video_status}") | |
if not final_video_path: | |
return None, "\n".join(log_messages) | |
progress(1.0, desc="Process Complete!") | |
log_messages.append("\nProcess Complete! Video ready.") | |
return final_video_path, "\n".join(log_messages) | |
# --- Gradio UI --- | |
css = """ | |
.gradio-container { font-family: 'Roboto', sans-serif; } | |
.gr-button { background-color: #FF7F50; color: white; border-radius: 8px; } | |
.gr-button:hover { background-color: #FF6347; } | |
footer {display: none !important;} | |
""" # Hide default Gradio footer | |
with gr.Blocks(theme=gr.themes.Soft(primary_hue="orange", secondary_hue="red"), css=css) as demo: | |
gr.Markdown( | |
""" | |
<div style="text-align: center;"> | |
<img src="https://i.imgur.com/J20hQ9h.png" alt="RoboNuggets Logo" style="width:100px; height:auto; margin-bottom: 5px;"> | |
<h1>AI YouTube Video Creator (R28 LongForm Style)</h1> | |
<p>Automate your YouTube content creation! Enter a niche, and let AI handle the rest.</p> | |
</div> | |
""" | |
) | |
with gr.Row(): | |
niche_textbox = gr.Textbox( | |
label="Enter Video Niche or Specific Topic", | |
placeholder="e.g., 'The Philosophy of Stoicism for Modern Life', 'Beginner's Guide to Urban Gardening'", | |
value="The Stoic Lion: Finding Calm in Chaos" # Default value from video | |
) | |
create_button = gr.Button("β¨ Create Video β¨", variant="primary") | |
with gr.Accordion("π Process Log & Output", open=True): | |
log_output = gr.Textbox(label="Log", lines=15, interactive=False, placeholder="Process updates will appear here...") | |
video_output = gr.Video(label="Generated Video") | |
create_button.click( | |
fn=generate_youtube_video, | |
inputs=[niche_textbox], | |
outputs=[video_output, log_output] | |
) | |
gr.Markdown( | |
""" | |
--- | |
*Powered by RoboNuggets AI* | |
*(Note: This is a demo. Image scraping from Unsplash can be unreliable. Ensure API keys are set in .env)* | |
""" | |
) | |
if __name__ == "__main__": | |
# Ensure temp directory exists | |
os.makedirs(os.path.join(TEMP_DIR, "images"), exist_ok=True) | |
os.makedirs(os.path.join(TEMP_DIR, "audio_segments"), exist_ok=True) | |
print("Starting Gradio App...") | |
demo.launch(debug=True, share=False) # share=True for public link (use with caution and ngrok) |