Spaces:
Build error
Build error
import os | |
import shutil # Added for directory cleanup | |
import requests | |
import io | |
import time | |
import re | |
import random | |
import tempfile # Added for use in create_clip | |
import math | |
import cv2 | |
import numpy as np | |
import soundfile as sf | |
import torch | |
import gradio as gr | |
import pysrt | |
from bs4 import BeautifulSoup | |
from urllib.parse import quote | |
from PIL import Image, ImageDraw, ImageFont | |
from gtts import gTTS | |
from pydub import AudioSegment | |
from pydub.generators import Sine | |
# Import moviepy components correctly | |
try: | |
from moviepy.editor import ( | |
VideoFileClip, AudioFileClip, ImageClip, concatenate_videoclips, | |
CompositeVideoClip, TextClip, CompositeAudioClip | |
) | |
import moviepy.video.fx.all as vfx | |
import moviepy.config as mpy_config | |
# Set ImageMagick binary (adjust path if necessary for your environment) | |
# Check if ImageMagick is available, otherwise TextClip might fail | |
try: | |
# Attempt to find ImageMagick automatically or use a common path | |
# If running locally, ensure ImageMagick is installed and in your PATH | |
# If on Hugging Face Spaces, add 'imagemagick' to a packages.txt file | |
mpy_config.change_settings({"IMAGEMAGICK_BINARY": "/usr/bin/convert"}) # Common Linux path | |
print("ImageMagick path set.") | |
# You might need to verify this path works in your specific deployment environment | |
except Exception as e: | |
print(f"Warning: Could not configure ImageMagick path. TextClip might fail. Error: {e}") | |
# Consider adding a fallback or disabling text if ImageMagick is essential and not found | |
except ImportError: | |
print("Error: moviepy library not found. Please install it using 'pip install moviepy'.") | |
# Optionally, exit or raise a more specific error if moviepy is critical | |
exit() # Exit if moviepy is absolutely required | |
# Import Kokoro (ensure it's installed) | |
try: | |
from kokoro import KPipeline | |
# Initialize Kokoro TTS pipeline | |
# Using 'en' as a placeholder, adjust 'a' if it was intentional and valid for Kokoro | |
pipeline = KPipeline(lang_code='en') | |
print("Kokoro Pipeline Initialized.") | |
except ImportError: | |
print("Warning: Kokoro library not found. TTS generation will rely solely on gTTS.") | |
pipeline = None | |
except Exception as e: | |
print(f"Warning: Failed to initialize Kokoro Pipeline. TTS generation will rely solely on gTTS. Error: {e}") | |
pipeline = None | |
# Global Configuration | |
# --- IMPORTANT: Replace placeholders with your actual keys or use environment variables --- | |
PEXELS_API_KEY = os.getenv('PEXELS_API_KEY', 'YOUR_PEXELS_API_KEY_HERE') # Use environment variable or replace | |
OPENROUTER_API_KEY = os.getenv('OPENROUTER_API_KEY', 'YOUR_OPENROUTER_API_KEY_HERE') # Use environment variable or replace | |
# --- --- | |
if PEXELS_API_KEY == 'YOUR_PEXELS_API_KEY_HERE' or OPENROUTER_API_KEY == 'YOUR_OPENROUTER_API_KEY_HERE': | |
print("\n*** WARNING: API keys are not set. Please set PEXELS_API_KEY and OPENROUTER_API_KEY environment variables or replace the placeholders in the script. ***\n") | |
OPENROUTER_MODEL = "mistralai/mistral-small-3.1-24b-instruct:free" # Or choose another model | |
TEMP_FOLDER = "temp_video_processing" | |
OUTPUT_VIDEO_FILENAME = "final_video.mp4" | |
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" | |
BACKGROUND_MUSIC_PATH = "background_music.mp3" # Ensure this file exists | |
# --- Global variables to be set by Gradio input --- | |
TARGET_RESOLUTION = (1920, 1080) # Default, will be updated | |
CAPTION_COLOR = "white" # Default, will be updated | |
# Helper Functions | |
def generate_script(user_input): | |
"""Generates a script using the OpenRouter API based on user input.""" | |
if not OPENROUTER_API_KEY or OPENROUTER_API_KEY == 'YOUR_OPENROUTER_API_KEY_HERE': | |
print("Error: OpenRouter API Key not configured.") | |
return None | |
headers = { | |
'Authorization': f'Bearer {OPENROUTER_API_KEY}', | |
'HTTP-Referer': 'https://your-app-name.hf.space', # Optional: Replace with your app's URL if deployed | |
'X-Title': 'AI Documentary Maker' # Optional | |
} | |
# Updated prompt based on original instructions | |
prompt = f"""Short Documentary Script Generator Instructions: | |
If I say "use this," just output the script exactly as I gave it. | |
If I only give topics, generate a script based on them. | |
If I provide a full script, rewrite it without any changes. | |
Make everything short, simple, humorous, funny, and act serious but humorous. | |
Don't say anything off-topic. | |
Always include a funny statement to subscribe based on the video topic at the end. | |
Use normal conversational text like a normal person talking, avoid AI phrases, make statements humanized and normal conversational. | |
Always talk negative views, don't really want any positive statements. | |
The full script will be one video about one topic, so keep that in mind. | |
Formatting Rules: | |
Title in Square Brackets: Each section starts with a one-word title inside [ ] (max two words if necessary). This title will be used as a search term for Pexels footage. | |
Casual & Funny Narration: Each section has 5-15 words of narration. Keep it natural, funny, and unpredictable (not robotic, poetic, or rhythmic). | |
No Special Formatting: No bold, italics, or special characters. You are an assistant AI; your task is to create the script. You aren't a chatbot. So, don't write extra text. | |
Generalized Search Terms: If a term is too specific, make it more general for Pexels search. | |
Scene-Specific Writing: Each section describes only what should be shown in the video. | |
Output Only the Script: No extra text, just the script. | |
Example Output: | |
[North Korea] | |
Top 5 unknown facts about North Korea, maybe. | |
[Invisibility] | |
North Korea’s internet speed is so fast… it’s basically dial-up from 1998. | |
[Leadership] | |
Kim Jong-un once won an election with 100% votes… because who would vote against him? | |
[Magic] | |
North Korea discovered unicorns. They're delicious, apparently. | |
[Warning] | |
Subscribe now, or Kim Jong-un might send you a strongly worded letter. | |
[Freedom] | |
North Korean citizens enjoy unparalleled freedom... to agree with the government. | |
Now here is the Topic/script: {user_input} | |
""" | |
data = { | |
'model': OPENROUTER_MODEL, | |
'messages': [{'role': 'user', 'content': prompt}], | |
'temperature': 0.5, # Slightly increased for more variety in humor | |
'max_tokens': 1000 # Reduced slightly, adjust if scripts get cut off | |
} | |
try: | |
response = requests.post( | |
'https://openrouter.ai/api/v1/chat/completions', | |
headers=headers, | |
json=data, | |
timeout=45 # Increased timeout | |
) | |
response.raise_for_status() # Raise an exception for bad status codes (4xx or 5xx) | |
response_data = response.json() | |
if 'choices' in response_data and len(response_data['choices']) > 0: | |
script_content = response_data['choices'][0]['message']['content'] | |
# Basic cleaning: remove potential preamble/postamble if the model adds it | |
script_content = re.sub(r'^.*?\n*\[', '[', script_content, flags=re.DOTALL) # Remove text before first bracket | |
script_content = script_content.strip() | |
print(f"Generated Script:\n{script_content}") # Log the script | |
return script_content | |
else: | |
print(f"Error: No choices found in OpenRouter response. Response: {response_data}") | |
return None | |
except requests.exceptions.RequestException as e: | |
print(f"Error calling OpenRouter API: {e}") | |
return None | |
except Exception as e: | |
print(f"An unexpected error occurred during script generation: {e}") | |
return None | |
def parse_script(script_text): | |
"""Parses the generated script text into structured elements.""" | |
if not script_text: | |
return [] | |
sections = {} | |
current_title = None | |
current_text = "" | |
try: | |
for line in script_text.splitlines(): | |
line = line.strip() | |
if not line: # Skip empty lines | |
continue | |
match = re.match(r'^\[([^\]]+)\](.*)', line) | |
if match: | |
# If a title was being processed, save it | |
if current_title is not None and current_text: | |
sections[current_title] = current_text.strip() | |
current_title = match.group(1).strip() | |
current_text = match.group(2).strip() | |
elif current_title: # Append to the text of the current title | |
current_text += " " + line # Add space between lines | |
# Save the last section | |
if current_title is not None and current_text: | |
sections[current_title] = current_text.strip() | |
elements = [] | |
if not sections: | |
print("Warning: Script parsing resulted in no sections.") | |
# Maybe try a simpler split if the regex fails? | |
lines = [l.strip() for l in script_text.splitlines() if l.strip()] | |
if len(lines) >= 2: # Basic fallback: assume first line is title, second is text | |
print("Attempting basic fallback parsing.") | |
title = lines[0].replace('[','').replace(']','') | |
narration = ' '.join(lines[1:]) | |
sections[title] = narration | |
print(f"Parsed Sections: {sections}") # Log parsed sections | |
for title, narration in sections.items(): | |
if not title or not narration: | |
print(f"Skipping empty section: Title='{title}', Narration='{narration}'") | |
continue | |
# Use title as media prompt | |
media_element = {"type": "media", "prompt": title, "effects": "random"} # Use random Ken Burns | |
# Calculate rough duration based on words | |
words = narration.split() | |
# Duration: Base 2s + 0.4s per word, capped at ~10s unless very long | |
duration = min(10.0, max(3.0, 2.0 + len(words) * 0.4)) | |
tts_element = {"type": "tts", "text": narration, "voice": "en", "duration": duration} # Duration is approximate here | |
elements.append(media_element) | |
elements.append(tts_element) | |
if not elements: | |
print("Error: No elements created after parsing.") | |
return elements | |
except Exception as e: | |
print(f"Error parsing script: {e}\nScript content was:\n{script_text}") | |
return [] | |
def search_pexels(query, api_key, media_type="videos"): | |
"""Searches Pexels for videos or images.""" | |
if not api_key or api_key == 'YOUR_PEXELS_API_KEY_HERE': | |
print("Error: Pexels API Key not configured.") | |
return None | |
headers = {'Authorization': api_key} | |
base_url = f"https://api.pexels.com/{media_type}/search" | |
results = [] | |
# Search multiple pages for better results | |
for page in range(1, 4): # Check first 3 pages | |
try: | |
params = {"query": query, "per_page": 15, "page": page} | |
if media_type == "videos": | |
params["orientation"] = "landscape" if TARGET_RESOLUTION[0] > TARGET_RESOLUTION[1] else "portrait" | |
else: # images | |
params["orientation"] = "landscape" if TARGET_RESOLUTION[0] > TARGET_RESOLUTION[1] else "portrait" | |
response = requests.get(base_url, headers=headers, params=params, timeout=15) | |
response.raise_for_status() | |
data = response.json() | |
if media_type == "videos": | |
media_items = data.get("videos", []) | |
for item in media_items: | |
video_files = item.get("video_files", []) | |
# Prioritize HD or FHD based on target resolution, fallback to highest available | |
target_quality = "hd" # 1280x720 or 1920x1080 | |
if TARGET_RESOLUTION[0] >= 1920 or TARGET_RESOLUTION[1] >= 1920: | |
target_quality = "fhd" # Often not available, but check anyway | |
link = None | |
for file in video_files: | |
# Pexels uses 'hd' for 1920x1080 too sometimes | |
if file.get("quality") == target_quality or file.get("quality") == "hd": | |
link = file.get("link") | |
break | |
if not link and video_files: # Fallback to the first link if specific quality not found | |
link = video_files[0].get("link") | |
if link: | |
results.append(link) | |
else: # images | |
media_items = data.get("photos", []) | |
for item in media_items: | |
# Get original size, resizing happens later | |
link = item.get("src", {}).get("original") | |
if link: | |
results.append(link) | |
except requests.exceptions.RequestException as e: | |
print(f"Warning: Pexels API request failed for '{query}' (page {page}, {media_type}): {e}") | |
# Don't stop searching on a single page failure | |
continue | |
except Exception as e: | |
print(f"Warning: Unexpected error during Pexels search for '{query}': {e}") | |
continue | |
if results: | |
print(f"Found {len(results)} Pexels {media_type} for '{query}'. Choosing one randomly.") | |
return random.choice(results) | |
else: | |
print(f"Warning: No Pexels {media_type} found for query: '{query}'") | |
return None | |
def search_google_images(query): | |
"""Searches Google Images (use cautiously, scraping can be fragile).""" | |
print(f"Attempting Google Image search for: '{query}' (Use with caution)") | |
try: | |
search_url = f"https://www.google.com/search?q={quote(query)}&tbm=isch&safe=active" # Added safe search | |
headers = {"User-Agent": USER_AGENT} | |
response = requests.get(search_url, headers=headers, timeout=15) | |
response.raise_for_status() | |
soup = BeautifulSoup(response.text, "html.parser") | |
image_urls = [] | |
# Google changes its structure often, this might need updates | |
# Look for image data embedded in script tags or specific img tags | |
# This is a common pattern, but highly unstable | |
img_tags = soup.find_all("img") | |
for img in img_tags: | |
src = img.get("src") or img.get("data-src") | |
if src and src.startswith("http") and "gstatic" not in src and "googlelogo" not in src: | |
# Basic check for valid image extensions or base64 | |
if re.search(r'\.(jpg|jpeg|png|webp)$', src, re.IGNORECASE) or src.startswith('data:image'): | |
image_urls.append(src) | |
# Limit the number of results to avoid processing too many | |
image_urls = image_urls[:10] # Consider first 10 potential images | |
if image_urls: | |
print(f"Found {len(image_urls)} potential Google Images for '{query}'. Choosing one.") | |
return random.choice(image_urls) | |
else: | |
print(f"Warning: No suitable Google Images found for query: '{query}'") | |
return None | |
except requests.exceptions.RequestException as e: | |
print(f"Warning: Google Image search failed for '{query}': {e}") | |
return None | |
except Exception as e: | |
print(f"Warning: Error parsing Google Image search results for '{query}': {e}") | |
return None | |
def download_media(media_url, filename): | |
"""Downloads media (image or video) from a URL.""" | |
try: | |
headers = {"User-Agent": USER_AGENT} # Use User-Agent for downloads too | |
response = requests.get(media_url, headers=headers, stream=True, timeout=30) # Increased timeout for large files | |
response.raise_for_status() | |
with open(filename, 'wb') as f: | |
for chunk in response.iter_content(chunk_size=8192): | |
f.write(chunk) | |
print(f"Successfully downloaded media to {filename}") | |
# Verify image integrity and convert if necessary | |
if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.webp')): | |
try: | |
img = Image.open(filename) | |
img.verify() # Verify that it is, in fact an image | |
# Re-open image for conversion check | |
img = Image.open(filename) | |
if img.mode != 'RGB': | |
print(f"Converting image {filename} to RGB.") | |
img = img.convert('RGB') | |
img.save(filename, "JPEG") # Save as JPEG for compatibility | |
img.close() | |
except (IOError, SyntaxError, Image.UnidentifiedImageError) as img_e: | |
print(f"Warning: Downloaded file {filename} is not a valid image or is corrupted: {img_e}. Removing.") | |
os.remove(filename) | |
return None | |
# Basic video check (can be expanded) | |
elif filename.lower().endswith(('.mp4', '.mov', '.avi')): | |
if os.path.getsize(filename) < 1024: # Check if file is too small (likely error) | |
print(f"Warning: Downloaded video file {filename} is suspiciously small. Removing.") | |
os.remove(filename) | |
return None | |
return filename | |
except requests.exceptions.RequestException as e: | |
print(f"Error downloading media from {media_url}: {e}") | |
if os.path.exists(filename): | |
os.remove(filename) | |
return None | |
except Exception as e: | |
print(f"An unexpected error occurred during media download: {e}") | |
if os.path.exists(filename): | |
os.remove(filename) | |
return None | |
def generate_media(prompt, current_index=0, total_segments=1): | |
"""Generates media (video or image) based on the prompt.""" | |
safe_prompt = re.sub(r'[^\w\s-]', '', prompt).strip().replace(' ', '_') | |
if not safe_prompt: safe_prompt = f"media_{current_index}" # Fallback filename | |
print(f"\n--- Generating Media for Prompt: '{prompt}' ---") | |
# --- Strategy --- | |
# 1. Try Pexels Video | |
# 2. Try Pexels Image | |
# 3. If prompt contains 'news' or similar, try Google Image as fallback | |
# 4. Use generic Pexels image as last resort | |
# 1. Try Pexels Video | |
video_url = search_pexels(prompt, PEXELS_API_KEY, media_type="videos") | |
if video_url: | |
video_file = os.path.join(TEMP_FOLDER, f"{safe_prompt}_video.mp4") | |
if download_media(video_url, video_file): | |
print(f"Using Pexels video for '{prompt}'") | |
return {"path": video_file, "asset_type": "video"} | |
else: | |
print(f"Failed to download Pexels video for '{prompt}'.") | |
# 2. Try Pexels Image | |
image_url = search_pexels(prompt, PEXELS_API_KEY, media_type="photos") | |
if image_url: | |
image_file = os.path.join(TEMP_FOLDER, f"{safe_prompt}_image.jpg") | |
if download_media(image_url, image_file): | |
print(f"Using Pexels image for '{prompt}'") | |
return {"path": image_file, "asset_type": "image"} | |
else: | |
print(f"Failed to download Pexels image for '{prompt}'.") | |
# 3. Try Google Image (especially for specific/newsy terms) | |
if "news" in prompt.lower() or "breaking" in prompt.lower() or len(prompt.split()) > 4: # Heuristic for specific terms | |
google_image_url = search_google_images(prompt) | |
if google_image_url: | |
google_image_file = os.path.join(TEMP_FOLDER, f"{safe_prompt}_google_image.jpg") | |
if download_media(google_image_url, google_image_file): | |
print(f"Using Google image for '{prompt}' as fallback.") | |
return {"path": google_image_file, "asset_type": "image"} | |
else: | |
print(f"Failed to download Google image for '{prompt}'.") | |
# 4. Fallback to generic Pexels images | |
print(f"Could not find specific media for '{prompt}'. Using generic fallback.") | |
fallback_terms = ["abstract", "technology", "texture", "nature", "cityscape"] | |
random.shuffle(fallback_terms) # Try different fallbacks | |
for term in fallback_terms: | |
fallback_url = search_pexels(term, PEXELS_API_KEY, media_type="photos") | |
if fallback_url: | |
fallback_file = os.path.join(TEMP_FOLDER, f"fallback_{term}_{current_index}.jpg") | |
if download_media(fallback_url, fallback_file): | |
print(f"Using fallback Pexels image ('{term}')") | |
return {"path": fallback_file, "asset_type": "image"} | |
else: | |
print(f"Failed to download fallback Pexels image ('{term}')") | |
print(f"Error: Failed to generate any media for prompt: '{prompt}'") | |
return None # Failed to get any media | |
def generate_tts(text, voice="en"): | |
"""Generates Text-to-Speech audio using Kokoro or gTTS.""" | |
safe_text = re.sub(r'[^\w\s-]', '', text[:15]).strip().replace(' ', '_') | |
if not safe_text: safe_text = f"tts_{random.randint(1000, 9999)}" | |
file_path = os.path.join(TEMP_FOLDER, f"{safe_text}.wav") | |
# Attempt Kokoro first if available | |
if pipeline: | |
try: | |
print(f"Generating TTS with Kokoro for: '{text[:30]}...'") | |
# Kokoro specific voice if needed, 'en' might map internally or use a default | |
# The original code used 'af_heart' for 'en', let's try that logic | |
kokoro_voice = 'af_heart' if voice == 'en' else voice # Adjust if Kokoro has different voice codes | |
generator = pipeline(text, voice=kokoro_voice, speed=0.95, split_pattern=r'\n+|[.!?]+') # Adjust speed/split | |
audio_segments = [audio for _, _, audio in generator] | |
if not audio_segments: | |
raise ValueError("Kokoro returned no audio segments.") | |
# Ensure segments are numpy arrays before concatenating | |
valid_segments = [seg for seg in audio_segments if isinstance(seg, np.ndarray) and seg.size > 0] | |
if not valid_segments: | |
raise ValueError("Kokoro returned empty or invalid audio segments.") | |
full_audio = np.concatenate(valid_segments) if len(valid_segments) > 0 else valid_segments[0] | |
# Ensure audio is float32 for soundfile | |
if full_audio.dtype != np.float32: | |
full_audio = full_audio.astype(np.float32) | |
# Normalize if needed (Kokoro might output integers) | |
max_val = np.max(np.abs(full_audio)) | |
if max_val > 1.0: | |
full_audio /= max_val | |
sf.write(file_path, full_audio, 24000) # Kokoro typically outputs at 24kHz | |
print(f"Kokoro TTS generated successfully: {file_path}") | |
return file_path | |
except Exception as e: | |
print(f"Warning: Kokoro TTS failed: {e}. Falling back to gTTS.") | |
# Fall through to gTTS | |
# Fallback to gTTS | |
try: | |
print(f"Generating TTS with gTTS for: '{text[:30]}...'") | |
tts = gTTS(text=text, lang=voice, slow=False) # Use voice as language code for gTTS | |
# Save as mp3 first, then convert | |
mp3_path = os.path.join(TEMP_FOLDER, f"tts_{safe_text}.mp3") | |
tts.save(mp3_path) | |
audio = AudioSegment.from_mp3(mp3_path) | |
# Export as WAV for consistency with moviepy | |
audio.export(file_path, format="wav") | |
os.remove(mp3_path) # Clean up temporary mp3 | |
print(f"gTTS TTS generated successfully: {file_path}") | |
return file_path | |
except Exception as e: | |
print(f"Error: gTTS also failed: {e}. Generating silence.") | |
# Final fallback: generate silence | |
try: | |
# Estimate duration based on text length (similar to parsing logic) | |
words = text.split() | |
duration_seconds = min(10.0, max(3.0, 2.0 + len(words) * 0.4)) | |
samplerate = 24000 # Match Kokoro's typical rate | |
num_samples = int(duration_seconds * samplerate) | |
silence = np.zeros(num_samples, dtype=np.float32) | |
sf.write(file_path, silence, samplerate) | |
print(f"Generated silence fallback: {file_path} ({duration_seconds:.1f}s)") | |
return file_path | |
except Exception as silence_e: | |
print(f"Error: Failed even to generate silence: {silence_e}") | |
return None # Complete failure | |
def apply_kenburns_effect(clip, target_resolution, effect_type="random"): | |
"""Applies a Ken Burns effect (zoom/pan) to an ImageClip.""" | |
target_w, target_h = target_resolution | |
# Ensure clip has dimensions (might be needed if ImageClip wasn't fully initialized) | |
if not hasattr(clip, 'w') or not hasattr(clip, 'h') or clip.w == 0 or clip.h == 0: | |
print("Warning: Clip dimensions not found for Ken Burns effect. Using target resolution.") | |
# Attempt to get frame to determine size, or default | |
try: | |
frame = clip.get_frame(0) | |
clip.w, clip.h = frame.shape[1], frame.shape[0] | |
except: | |
clip.w, clip.h = target_w, target_h # Fallback | |
# Resize image to cover target area while maintaining aspect ratio | |
clip_aspect = clip.w / clip.h | |
target_aspect = target_w / target_h | |
if clip_aspect > target_aspect: # Image is wider than target | |
new_height = target_h | |
new_width = int(new_height * clip_aspect) | |
else: # Image is taller than target | |
new_width = target_w | |
new_height = int(new_width / clip_aspect) | |
# Resize slightly larger than needed for the effect | |
base_scale = 1.20 # Zoom factor range | |
zoom_width = int(new_width * base_scale) | |
zoom_height = int(new_height * base_scale) | |
# Use PIL for initial resize - often better quality for large changes | |
try: | |
pil_img = Image.fromarray(clip.get_frame(0)) # Get frame as PIL image | |
resized_pil = pil_img.resize((zoom_width, zoom_height), Image.Resampling.LANCZOS) | |
resized_clip = ImageClip(np.array(resized_pil)).set_duration(clip.duration) | |
clip = resized_clip # Use the better resized clip | |
clip.w, clip.h = zoom_width, zoom_height # Update dimensions | |
except Exception as pil_e: | |
print(f"Warning: PIL resize failed ({pil_e}). Using moviepy resize.") | |
clip = clip.resize(newsize=(zoom_width, zoom_height)) | |
# Calculate max offsets for panning | |
max_offset_x = max(0, clip.w - target_w) | |
max_offset_y = max(0, clip.h - target_h) | |
# Define effect types | |
available_effects = ["zoom-in", "zoom-out", "pan-left", "pan-right", "pan-up", "pan-down", "slow-zoom"] | |
if effect_type == "random": | |
effect_type = random.choice(available_effects) | |
print(f"Applying Ken Burns effect: {effect_type}") | |
# Determine start/end zoom and center positions based on effect | |
start_zoom, end_zoom = 1.0, 1.0 | |
start_center_x, start_center_y = clip.w / 2, clip.h / 2 | |
end_center_x, end_center_y = clip.w / 2, clip.h / 2 | |
if effect_type == "zoom-in": | |
start_zoom = 1.0 | |
end_zoom = 1 / base_scale # Zoom factor applied to crop size | |
elif effect_type == "zoom-out": | |
start_zoom = 1 / base_scale | |
end_zoom = 1.0 | |
elif effect_type == "slow-zoom": | |
start_zoom = 1.0 | |
end_zoom = 1 / 1.05 # Very subtle zoom in | |
elif effect_type == "pan-left": | |
start_center_x = target_w / 2 | |
end_center_x = clip.w - target_w / 2 | |
start_center_y = end_center_y = clip.h / 2 # Center vertically | |
elif effect_type == "pan-right": | |
start_center_x = clip.w - target_w / 2 | |
end_center_x = target_w / 2 | |
start_center_y = end_center_y = clip.h / 2 | |
elif effect_type == "pan-up": | |
start_center_y = target_h / 2 | |
end_center_y = clip.h - target_h / 2 | |
start_center_x = end_center_x = clip.w / 2 # Center horizontally | |
elif effect_type == "pan-down": | |
start_center_y = clip.h - target_h / 2 | |
end_center_y = target_h / 2 | |
start_center_x = end_center_x = clip.w / 2 | |
# Add more effects like diagonal pans if desired | |
def transform_frame(get_frame, t): | |
frame = get_frame(t) # Get the frame from the (potentially PIL-resized) clip | |
# Smooth interpolation (ease-in, ease-out) | |
ratio = 0.5 - 0.5 * math.cos(math.pi * t / clip.duration) if clip.duration > 0 else 0 | |
current_zoom = start_zoom + (end_zoom - start_zoom) * ratio | |
crop_w = int(target_w / current_zoom) | |
crop_h = int(target_h / current_zoom) | |
# Ensure crop dimensions are not larger than the frame itself | |
crop_w = min(crop_w, clip.w) | |
crop_h = min(crop_h, clip.h) | |
current_center_x = start_center_x + (end_center_x - start_center_x) * ratio | |
current_center_y = start_center_y + (end_center_y - start_center_y) * ratio | |
# Clamp center position to avoid cropping outside the image boundaries | |
min_center_x = crop_w / 2 | |
max_center_x = clip.w - crop_w / 2 | |
min_center_y = crop_h / 2 | |
max_center_y = clip.h - crop_h / 2 | |
current_center_x = max(min_center_x, min(current_center_x, max_center_x)) | |
current_center_y = max(min_center_y, min(current_center_y, max_center_y)) | |
# Perform the crop using cv2.getRectSubPix for subpixel accuracy | |
# Ensure frame is contiguous array for cv2 | |
if not frame.flags['C_CONTIGUOUS']: | |
frame = np.ascontiguousarray(frame) | |
try: | |
cropped_frame = cv2.getRectSubPix(frame, (crop_w, crop_h), (current_center_x, current_center_y)) | |
# Resize the cropped area to the final target resolution | |
# Using LANCZOS4 for potentially better quality resizing | |
final_frame = cv2.resize(cropped_frame, (target_w, target_h), interpolation=cv2.INTER_LANCZOS4) | |
return final_frame | |
except cv2.error as cv2_err: | |
print(f"Error during cv2 operation in Ken Burns: {cv2_err}") | |
print(f"Frame shape: {frame.shape}, Crop W/H: {crop_w}/{crop_h}, Center X/Y: {current_center_x}/{current_center_y}") | |
# Fallback: return uncropped frame resized? Or black frame? | |
return cv2.resize(frame, (target_w, target_h), interpolation=cv2.INTER_LINEAR) # Fallback resize | |
# Apply the transformation function to the clip | |
return clip.fl(transform_frame, apply_to=['mask']) # Apply to mask if it exists | |
def resize_to_fill(clip, target_resolution): | |
"""Resizes a video clip to fill the target resolution, cropping if necessary.""" | |
target_w, target_h = target_resolution | |
clip_w, clip_h = clip.w, clip.h | |
if clip_w == 0 or clip_h == 0: | |
print("Warning: Clip has zero dimensions before resize_to_fill. Cannot resize.") | |
# Return a black clip of the target size? | |
return ColorClip(size=target_resolution, color=(0,0,0), duration=clip.duration) | |
clip_aspect = clip_w / clip_h | |
target_aspect = target_w / target_h | |
if math.isclose(clip_aspect, target_aspect, rel_tol=1e-3): | |
# Aspect ratios are close enough, just resize | |
print(f"Resizing video clip {clip.filename} to {target_resolution} (aspect match).") | |
return clip.resize(newsize=target_resolution) | |
elif clip_aspect > target_aspect: | |
# Clip is wider than target aspect ratio, resize to target height and crop width | |
print(f"Resizing video clip {clip.filename} to height {target_h} and cropping width.") | |
clip = clip.resize(height=target_h) | |
# Calculate amount to crop from each side | |
crop_amount = (clip.w - target_w) / 2 | |
if crop_amount < 0: # Avoid negative crop | |
print("Warning: Negative crop amount calculated in resize_to_fill (width). Resizing only.") | |
return clip.resize(newsize=target_resolution) | |
return clip.crop(x1=crop_amount, width=target_w) | |
else: | |
# Clip is taller than target aspect ratio, resize to target width and crop height | |
print(f"Resizing video clip {clip.filename} to width {target_w} and cropping height.") | |
clip = clip.resize(width=target_w) | |
# Calculate amount to crop from top/bottom | |
crop_amount = (clip.h - target_h) / 2 | |
if crop_amount < 0: # Avoid negative crop | |
print("Warning: Negative crop amount calculated in resize_to_fill (height). Resizing only.") | |
return clip.resize(newsize=target_resolution) | |
return clip.crop(y1=crop_amount, height=target_h) | |
def add_background_music(final_video, bg_music_path=BACKGROUND_MUSIC_PATH, bg_music_volume=0.08): | |
"""Adds background music to the final video.""" | |
if not os.path.exists(bg_music_path): | |
print(f"Warning: Background music file not found at {bg_music_path}. Skipping.") | |
return final_video | |
try: | |
print("Adding background music...") | |
bg_music = AudioFileClip(bg_music_path) | |
if final_video.duration is None or final_video.duration <= 0: | |
print("Warning: Final video has no duration. Cannot add background music.") | |
return final_video | |
if bg_music.duration is None or bg_music.duration <= 0: | |
print("Warning: Background music has no duration. Skipping.") | |
return final_video | |
# Loop or trim background music to match video duration | |
if bg_music.duration < final_video.duration: | |
loops_needed = math.ceil(final_video.duration / bg_music.duration) | |
print(f"Looping background music {loops_needed} times.") | |
bg_music = concatenate_audioclips([bg_music] * loops_needed) | |
# Trim to exact duration | |
bg_music = bg_music.subclip(0, final_video.duration) | |
# Adjust volume | |
bg_music = bg_music.volumex(bg_music_volume) | |
# Combine with existing audio (if any) | |
video_audio = final_video.audio | |
if video_audio: | |
# Normalize main audio slightly? Optional. | |
# video_audio = video_audio.volumex(1.0) # Keep original volume | |
print("Mixing existing audio with background music.") | |
mixed_audio = CompositeAudioClip([video_audio, bg_music]) | |
else: | |
print("No existing audio found. Using only background music.") | |
mixed_audio = bg_music | |
# Set the new audio track | |
final_video = final_video.set_audio(mixed_audio) | |
print("Background music added successfully.") | |
return final_video | |
except Exception as e: | |
print(f"Error adding background music: {e}") | |
# Return the original video without crashing | |
return final_video | |
def create_clip(media_path, asset_type, tts_path, duration=None, effects=None, narration_text=None, segment_index=0): | |
"""Creates a single video clip segment with media, audio, and optional captions.""" | |
print(f"\n--- Creating Clip Segment {segment_index} ---") | |
print(f"Media: {media_path} ({asset_type})") | |
print(f"TTS: {tts_path}") | |
print(f"Narration: '{narration_text[:50]}...'") | |
try: | |
# Validate inputs | |
if not media_path or not os.path.exists(media_path): | |
print(f"Error: Media path not found or invalid: {media_path}") | |
return None | |
if not tts_path or not os.path.exists(tts_path): | |
print(f"Error: TTS path not found or invalid: {tts_path}") | |
# Attempt to use media without audio? Or fail? Let's fail for now. | |
return None | |
# Load audio first to determine duration | |
audio_clip = AudioFileClip(tts_path) | |
# Add slight fade out to avoid abrupt cuts | |
audio_clip = audio_clip.audio_fadeout(0.2) | |
target_duration = audio_clip.duration | |
if target_duration is None or target_duration <= 0.1: # Check for valid duration | |
print(f"Warning: Audio clip {tts_path} has invalid duration ({target_duration}). Estimating 3 seconds.") | |
target_duration = 3.0 # Fallback duration | |
# Recreate audio clip with fixed duration if possible? Or just use the duration. | |
audio_clip = audio_clip.set_duration(target_duration) | |
print(f"Audio Duration: {target_duration:.2f}s") | |
# --- Create Video/Image Clip --- | |
clip = None | |
if asset_type == "video": | |
try: | |
clip = VideoFileClip(media_path, target_resolution=TARGET_RESOLUTION[::-1]) # Provide target res hint | |
# Ensure video has audio track removed initially if we overlay TTS fully | |
clip = clip.without_audio() | |
# Resize/Crop to fill target resolution | |
clip = resize_to_fill(clip, TARGET_RESOLUTION) | |
# Loop or cut video to match audio duration | |
if clip.duration < target_duration: | |
print(f"Looping video (duration {clip.duration:.2f}s) to match audio.") | |
clip = clip.loop(duration=target_duration) | |
else: | |
# Start from a random point if video is longer? Or just take the start? | |
start_time = 0 | |
# Optional: random start time if video is much longer | |
# if clip.duration > target_duration + 2: | |
# start_time = random.uniform(0, clip.duration - target_duration) | |
print(f"Subclipping video from {start_time:.2f}s to {start_time + target_duration:.2f}s.") | |
clip = clip.subclip(start_time, start_time + target_duration) | |
# Add fade in/out for smoother transitions | |
clip = clip.fadein(0.3).fadeout(0.3) | |
except Exception as video_e: | |
print(f"Error processing video file {media_path}: {video_e}") | |
# Fallback to a black screen? | |
clip = ColorClip(size=TARGET_RESOLUTION, color=(0,0,0), duration=target_duration) | |
elif asset_type == "image": | |
try: | |
# Use tempfile for converted image if needed (handled in download now) | |
# Load image clip | |
clip = ImageClip(media_path).set_duration(target_duration) | |
# Apply Ken Burns effect | |
clip = apply_kenburns_effect(clip, TARGET_RESOLUTION, effect_type=effects or "random") | |
# Fades are good for images too | |
clip = clip.fadein(0.3).fadeout(0.3) | |
except Exception as img_e: | |
print(f"Error processing image file {media_path}: {img_e}") | |
# Fallback to a grey screen? | |
clip = ColorClip(size=TARGET_RESOLUTION, color=(50,50,50), duration=target_duration) | |
else: | |
print(f"Error: Unknown asset type '{asset_type}'") | |
return None # Unknown type | |
# Ensure clip has the correct duration after processing | |
clip = clip.set_duration(target_duration) | |
# --- Add Captions --- | |
subtitle_clips = [] | |
if narration_text and CAPTION_COLOR != "transparent": | |
print("Adding captions...") | |
try: | |
# Simple word splitting for timing (can be improved with proper SRT/timing info) | |
words = narration_text.split() | |
words_per_chunk = 5 # Adjust number of words per caption line | |
chunks = [' '.join(words[i:i+words_per_chunk]) for i in range(0, len(words), words_per_chunk)] | |
if not chunks: chunks = [narration_text] # Handle empty or short text | |
chunk_duration = target_duration / len(chunks) if len(chunks) > 0 else target_duration | |
# Calculate font size based on resolution (heuristic) | |
font_size = int(TARGET_RESOLUTION[1] / 25) # Adjust divisor as needed | |
# Position captions towards the bottom | |
subtitle_y_position = int(TARGET_RESOLUTION[1] * 0.85) # Lower position | |
for i, chunk_text in enumerate(chunks): | |
start_time = i * chunk_duration | |
# Ensure end time doesn't exceed clip duration | |
end_time = min((i + 1) * chunk_duration, target_duration) | |
# Avoid zero-duration captions | |
if end_time <= start_time: end_time = start_time + 0.1 | |
# Create TextClip for the chunk | |
# Ensure font is available in the environment (Arial is common, but might need install) | |
# Added stroke for better visibility | |
txt_clip = TextClip( | |
chunk_text, | |
fontsize=font_size, | |
font='Arial-Bold', # Ensure this font is available or choose another like 'Liberation-Sans-Bold' | |
color=CAPTION_COLOR, | |
bg_color='rgba(0, 0, 0, 0.5)', # Slightly darker background | |
method='caption', # Wraps text | |
align='center', | |
stroke_color='black', # Black stroke | |
stroke_width=max(1, font_size // 20), # Stroke width relative to font size | |
size=(TARGET_RESOLUTION[0] * 0.85, None) # Limit width | |
).set_start(start_time).set_duration(end_time - start_time).set_position(('center', subtitle_y_position)) | |
subtitle_clips.append(txt_clip) | |
# Composite the main clip with subtitles | |
if subtitle_clips: | |
clip = CompositeVideoClip([clip] + subtitle_clips, size=TARGET_RESOLUTION) | |
print(f"Added {len(subtitle_clips)} caption segments.") | |
except Exception as caption_e: | |
# This often happens if ImageMagick or fonts are missing/misconfigured | |
print(f"ERROR: Failed to create captions: {caption_e}") | |
print("Check if ImageMagick is installed and configured, and if the font (e.g., Arial-Bold) is available.") | |
# Continue without captions if they fail | |
# Set the audio track | |
clip = clip.set_audio(audio_clip) | |
print(f"Clip Segment {segment_index} created successfully.") | |
return clip | |
except Exception as e: | |
print(f"FATAL ERROR creating clip segment {segment_index}: {e}") | |
import traceback | |
traceback.print_exc() # Print detailed traceback for debugging | |
# Return a short, silent black clip to avoid crashing the concatenation | |
return ColorClip(size=TARGET_RESOLUTION, color=(0,0,0), duration=1.0).set_audio(None) | |
# Main Gradio Function | |
def generate_video(video_concept, resolution_choice, caption_option): | |
"""The main function called by Gradio to generate the video.""" | |
print("\n\n--- Starting Video Generation ---") | |
print(f"Concept: {video_concept}") | |
print(f"Resolution: {resolution_choice}") | |
print(f"Captions: {caption_option}") | |
global TARGET_RESOLUTION, CAPTION_COLOR | |
# Set global config based on input | |
if resolution_choice == "Short (9:16)": | |
TARGET_RESOLUTION = (1080, 1920) | |
else: # Default to Full HD | |
TARGET_RESOLUTION = (1920, 1080) | |
CAPTION_COLOR = "white" if caption_option == "Yes" else "transparent" # Use "transparent" to disable | |
# --- Cleanup and Setup --- | |
if os.path.exists(TEMP_FOLDER): | |
print(f"Removing existing temp folder: {TEMP_FOLDER}") | |
shutil.rmtree(TEMP_FOLDER) | |
try: | |
os.makedirs(TEMP_FOLDER) | |
print(f"Created temp folder: {TEMP_FOLDER}") | |
except OSError as e: | |
print(f"Error creating temp folder {TEMP_FOLDER}: {e}") | |
return f"Error: Could not create temporary directory. Check permissions. {e}" # Return error message to Gradio | |
# --- Script Generation --- | |
print("Generating script...") | |
script = generate_script(video_concept) | |
if not script: | |
print("Error: Failed to generate script.") | |
shutil.rmtree(TEMP_FOLDER) # Clean up | |
return "Error: Failed to generate script from AI. Please try a different concept or check API keys." # Return error message | |
# --- Script Parsing --- | |
print("Parsing script...") | |
elements = parse_script(script) | |
if not elements: | |
print("Error: Failed to parse script into elements.") | |
shutil.rmtree(TEMP_FOLDER) # Clean up | |
return "Error: Failed to parse the generated script. The script might be malformed." # Return error message | |
# Pair media prompts with TTS elements | |
paired_elements = [] | |
if len(elements) >= 2: | |
for i in range(0, len(elements), 2): | |
if i + 1 < len(elements) and elements[i]['type'] == 'media' and elements[i+1]['type'] == 'tts': | |
paired_elements.append((elements[i], elements[i+1])) | |
else: | |
print(f"Warning: Skipping mismatched elements at index {i}") | |
if not paired_elements: | |
print("Error: No valid media/TTS pairs found after parsing.") | |
shutil.rmtree(TEMP_FOLDER) # Clean up | |
return "Error: Could not find valid [Title]/Narration pairs in the script." # Return error message | |
print(f"Found {len(paired_elements)} pairs of media prompts and narrations.") | |
# --- Clip Generation Loop --- | |
clips = [] | |
total_segments = len(paired_elements) | |
for idx, (media_elem, tts_elem) in enumerate(paired_elements): | |
print(f"\nProcessing Segment {idx+1}/{total_segments}: Prompt='{media_elem['prompt']}'") | |
# 1. Generate Media (Video/Image) | |
media_asset = generate_media(media_elem['prompt'], current_index=idx, total_segments=total_segments) | |
if not media_asset or not media_asset.get('path'): | |
print(f"Warning: Failed to generate media for '{media_elem['prompt']}'. Skipping segment.") | |
# Option: Create a placeholder clip instead of skipping? | |
# clips.append(ColorClip(size=TARGET_RESOLUTION, color=(20,0,0), duration=3.0)) # Short red flash? | |
continue # Skip this segment | |
# 2. Generate TTS | |
tts_path = generate_tts(tts_elem['text'], tts_elem['voice']) | |
if not tts_path: | |
print(f"Warning: Failed to generate TTS for segment {idx}. Skipping segment.") | |
# Option: Create clip without audio? Requires adjusting create_clip | |
continue # Skip this segment | |
# 3. Create MoviePy Clip Segment | |
clip = create_clip( | |
media_path=media_asset['path'], | |
asset_type=media_asset['asset_type'], | |
tts_path=tts_path, | |
duration=tts_elem['duration'], # Duration hint (create_clip prioritizes actual audio length) | |
effects=media_elem.get('effects', 'random'), | |
narration_text=tts_elem['text'], | |
segment_index=idx | |
) | |
if clip: | |
clips.append(clip) | |
else: | |
print(f"Warning: Failed to create clip for segment {idx}. Skipping.") | |
# Maybe add a fallback black clip here too? | |
# --- Final Video Assembly --- | |
if not clips: | |
print("Error: No clips were successfully created.") | |
shutil.rmtree(TEMP_FOLDER) # Clean up | |
return "Error: Failed to create any video segments. Check logs for media/TTS/clip creation errors." # Return error message | |
print(f"\nConcatenating {len(clips)} video clips...") | |
try: | |
# Concatenate all the generated clips | |
final_video = concatenate_videoclips(clips, method="compose") # 'compose' handles transparency if needed | |
except Exception as concat_e: | |
print(f"Error during video concatenation: {concat_e}") | |
shutil.rmtree(TEMP_FOLDER) | |
return f"Error: Failed to combine video segments: {concat_e}" | |
# --- Add Background Music --- | |
final_video = add_background_music(final_video, bg_music_volume=0.08) # Adjust volume as needed | |
# --- Write Output File --- | |
print(f"Writing final video to {OUTPUT_VIDEO_FILENAME}...") | |
try: | |
# Write the final video file | |
# Use preset 'medium' or 'slow' for better quality/compression ratio if time allows | |
# Use 'libx264' for wide compatibility, 'aac' for audio codec | |
# threads=4 can speed up encoding on multi-core CPUs | |
final_video.write_videofile( | |
OUTPUT_VIDEO_FILENAME, | |
codec='libx264', | |
audio_codec='aac', | |
fps=24, # Standard frame rate | |
preset='medium', # 'veryfast', 'fast', 'medium', 'slow', 'veryslow' | |
threads=4, # Adjust based on CPU cores | |
logger='bar' # Show progress bar | |
) | |
print("Final video written successfully.") | |
except Exception as write_e: | |
print(f"Error writing final video file: {write_e}") | |
shutil.rmtree(TEMP_FOLDER) | |
return f"Error: Failed to write the final video file: {write_e}" | |
finally: | |
# --- Cleanup --- | |
# Close clips to release file handles (important on some OS) | |
for clip in clips: | |
clip.close() | |
if final_video: | |
final_video.close() | |
if 'bg_music' in locals() and bg_music: # Close bg music if loaded | |
bg_music.close() | |
if 'audio_clip' in locals() and audio_clip: # Close last audio clip | |
audio_clip.close() | |
print(f"Cleaning up temporary folder: {TEMP_FOLDER}") | |
shutil.rmtree(TEMP_FOLDER) | |
print("--- Video Generation Complete ---") | |
# Return the path to the generated video for Gradio | |
return OUTPUT_VIDEO_FILENAME | |
# --- Gradio Interface Definition --- | |
with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
gr.Markdown( | |
""" | |
# 🎬 AI Documentary Video Generator 🎥 | |
Enter a concept or topic, and the AI will generate a short, humorous documentary-style video. | |
Configure API keys (Pexels, OpenRouter) and ensure `background_music.mp3` exists before running. | |
""" | |
) | |
with gr.Row(): | |
with gr.Column(scale=2): | |
video_concept = gr.Textbox( | |
label="Video Concept / Topic / Script", | |
placeholder="e.g., 'The secret life of squirrels', 'Why cats secretly judge us', or paste a full script starting with [Title]...", | |
lines=4 | |
) | |
with gr.Row(): | |
resolution = gr.Dropdown( | |
["Full HD (16:9)", "Short (9:16)"], | |
label="Resolution", | |
value="Full HD (16:9)" | |
) | |
caption_option = gr.Dropdown( | |
["Yes", "No"], | |
label="Add Captions", | |
value="Yes" | |
) | |
generate_btn = gr.Button("✨ Generate Video ✨", variant="primary") | |
with gr.Column(scale=3): | |
output_video = gr.Video(label="Generated Video") | |
status_message = gr.Textbox(label="Status", interactive=False) # To show errors or progress | |
# Connect button click to the main function | |
generate_btn.click( | |
fn=generate_video, | |
inputs=[video_concept, resolution, caption_option], | |
outputs=[output_video] # Can also output to status_message if needed | |
# Example with status: outputs=[output_video, status_message] | |
) | |
# Launch the Gradio app | |
if __name__ == "__main__": | |
# Check for background music file on startup | |
if not os.path.exists(BACKGROUND_MUSIC_PATH): | |
print(f"\n*** WARNING: Background music file '{BACKGROUND_MUSIC_PATH}' not found. Background music will be skipped. ***\n") | |
demo.launch(debug=True) # debug=True provides more detailed logs |