video / app.py
testdeep123's picture
Update app.py
479d5b6 verified
raw
history blame
51.3 kB
import os
import shutil # Added for directory cleanup
import requests
import io
import time
import re
import random
import tempfile # Added for use in create_clip
import math
import cv2
import numpy as np
import soundfile as sf
import torch
import gradio as gr
import pysrt
from bs4 import BeautifulSoup
from urllib.parse import quote
from PIL import Image, ImageDraw, ImageFont
from gtts import gTTS
from pydub import AudioSegment
from pydub.generators import Sine
# Import moviepy components correctly
try:
from moviepy.editor import (
VideoFileClip, AudioFileClip, ImageClip, concatenate_videoclips,
CompositeVideoClip, TextClip, CompositeAudioClip
)
import moviepy.video.fx.all as vfx
import moviepy.config as mpy_config
# Set ImageMagick binary (adjust path if necessary for your environment)
# Check if ImageMagick is available, otherwise TextClip might fail
try:
# Attempt to find ImageMagick automatically or use a common path
# If running locally, ensure ImageMagick is installed and in your PATH
# If on Hugging Face Spaces, add 'imagemagick' to a packages.txt file
mpy_config.change_settings({"IMAGEMAGICK_BINARY": "/usr/bin/convert"}) # Common Linux path
print("ImageMagick path set.")
# You might need to verify this path works in your specific deployment environment
except Exception as e:
print(f"Warning: Could not configure ImageMagick path. TextClip might fail. Error: {e}")
# Consider adding a fallback or disabling text if ImageMagick is essential and not found
except ImportError:
print("Error: moviepy library not found. Please install it using 'pip install moviepy'.")
# Optionally, exit or raise a more specific error if moviepy is critical
exit() # Exit if moviepy is absolutely required
# Import Kokoro (ensure it's installed)
try:
from kokoro import KPipeline
# Initialize Kokoro TTS pipeline
# Using 'en' as a placeholder, adjust 'a' if it was intentional and valid for Kokoro
pipeline = KPipeline(lang_code='en')
print("Kokoro Pipeline Initialized.")
except ImportError:
print("Warning: Kokoro library not found. TTS generation will rely solely on gTTS.")
pipeline = None
except Exception as e:
print(f"Warning: Failed to initialize Kokoro Pipeline. TTS generation will rely solely on gTTS. Error: {e}")
pipeline = None
# Global Configuration
# --- IMPORTANT: Replace placeholders with your actual keys or use environment variables ---
PEXELS_API_KEY = os.getenv('PEXELS_API_KEY', 'YOUR_PEXELS_API_KEY_HERE') # Use environment variable or replace
OPENROUTER_API_KEY = os.getenv('OPENROUTER_API_KEY', 'YOUR_OPENROUTER_API_KEY_HERE') # Use environment variable or replace
# --- ---
if PEXELS_API_KEY == 'YOUR_PEXELS_API_KEY_HERE' or OPENROUTER_API_KEY == 'YOUR_OPENROUTER_API_KEY_HERE':
print("\n*** WARNING: API keys are not set. Please set PEXELS_API_KEY and OPENROUTER_API_KEY environment variables or replace the placeholders in the script. ***\n")
OPENROUTER_MODEL = "mistralai/mistral-small-3.1-24b-instruct:free" # Or choose another model
TEMP_FOLDER = "temp_video_processing"
OUTPUT_VIDEO_FILENAME = "final_video.mp4"
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
BACKGROUND_MUSIC_PATH = "background_music.mp3" # Ensure this file exists
# --- Global variables to be set by Gradio input ---
TARGET_RESOLUTION = (1920, 1080) # Default, will be updated
CAPTION_COLOR = "white" # Default, will be updated
# Helper Functions
def generate_script(user_input):
"""Generates a script using the OpenRouter API based on user input."""
if not OPENROUTER_API_KEY or OPENROUTER_API_KEY == 'YOUR_OPENROUTER_API_KEY_HERE':
print("Error: OpenRouter API Key not configured.")
return None
headers = {
'Authorization': f'Bearer {OPENROUTER_API_KEY}',
'HTTP-Referer': 'https://your-app-name.hf.space', # Optional: Replace with your app's URL if deployed
'X-Title': 'AI Documentary Maker' # Optional
}
# Updated prompt based on original instructions
prompt = f"""Short Documentary Script Generator Instructions:
If I say "use this," just output the script exactly as I gave it.
If I only give topics, generate a script based on them.
If I provide a full script, rewrite it without any changes.
Make everything short, simple, humorous, funny, and act serious but humorous.
Don't say anything off-topic.
Always include a funny statement to subscribe based on the video topic at the end.
Use normal conversational text like a normal person talking, avoid AI phrases, make statements humanized and normal conversational.
Always talk negative views, don't really want any positive statements.
The full script will be one video about one topic, so keep that in mind.
Formatting Rules:
Title in Square Brackets: Each section starts with a one-word title inside [ ] (max two words if necessary). This title will be used as a search term for Pexels footage.
Casual & Funny Narration: Each section has 5-15 words of narration. Keep it natural, funny, and unpredictable (not robotic, poetic, or rhythmic).
No Special Formatting: No bold, italics, or special characters. You are an assistant AI; your task is to create the script. You aren't a chatbot. So, don't write extra text.
Generalized Search Terms: If a term is too specific, make it more general for Pexels search.
Scene-Specific Writing: Each section describes only what should be shown in the video.
Output Only the Script: No extra text, just the script.
Example Output:
[North Korea]
Top 5 unknown facts about North Korea, maybe.
[Invisibility]
North Korea’s internet speed is so fast… it’s basically dial-up from 1998.
[Leadership]
Kim Jong-un once won an election with 100% votes… because who would vote against him?
[Magic]
North Korea discovered unicorns. They're delicious, apparently.
[Warning]
Subscribe now, or Kim Jong-un might send you a strongly worded letter.
[Freedom]
North Korean citizens enjoy unparalleled freedom... to agree with the government.
Now here is the Topic/script: {user_input}
"""
data = {
'model': OPENROUTER_MODEL,
'messages': [{'role': 'user', 'content': prompt}],
'temperature': 0.5, # Slightly increased for more variety in humor
'max_tokens': 1000 # Reduced slightly, adjust if scripts get cut off
}
try:
response = requests.post(
'https://openrouter.ai/api/v1/chat/completions',
headers=headers,
json=data,
timeout=45 # Increased timeout
)
response.raise_for_status() # Raise an exception for bad status codes (4xx or 5xx)
response_data = response.json()
if 'choices' in response_data and len(response_data['choices']) > 0:
script_content = response_data['choices'][0]['message']['content']
# Basic cleaning: remove potential preamble/postamble if the model adds it
script_content = re.sub(r'^.*?\n*\[', '[', script_content, flags=re.DOTALL) # Remove text before first bracket
script_content = script_content.strip()
print(f"Generated Script:\n{script_content}") # Log the script
return script_content
else:
print(f"Error: No choices found in OpenRouter response. Response: {response_data}")
return None
except requests.exceptions.RequestException as e:
print(f"Error calling OpenRouter API: {e}")
return None
except Exception as e:
print(f"An unexpected error occurred during script generation: {e}")
return None
def parse_script(script_text):
"""Parses the generated script text into structured elements."""
if not script_text:
return []
sections = {}
current_title = None
current_text = ""
try:
for line in script_text.splitlines():
line = line.strip()
if not line: # Skip empty lines
continue
match = re.match(r'^\[([^\]]+)\](.*)', line)
if match:
# If a title was being processed, save it
if current_title is not None and current_text:
sections[current_title] = current_text.strip()
current_title = match.group(1).strip()
current_text = match.group(2).strip()
elif current_title: # Append to the text of the current title
current_text += " " + line # Add space between lines
# Save the last section
if current_title is not None and current_text:
sections[current_title] = current_text.strip()
elements = []
if not sections:
print("Warning: Script parsing resulted in no sections.")
# Maybe try a simpler split if the regex fails?
lines = [l.strip() for l in script_text.splitlines() if l.strip()]
if len(lines) >= 2: # Basic fallback: assume first line is title, second is text
print("Attempting basic fallback parsing.")
title = lines[0].replace('[','').replace(']','')
narration = ' '.join(lines[1:])
sections[title] = narration
print(f"Parsed Sections: {sections}") # Log parsed sections
for title, narration in sections.items():
if not title or not narration:
print(f"Skipping empty section: Title='{title}', Narration='{narration}'")
continue
# Use title as media prompt
media_element = {"type": "media", "prompt": title, "effects": "random"} # Use random Ken Burns
# Calculate rough duration based on words
words = narration.split()
# Duration: Base 2s + 0.4s per word, capped at ~10s unless very long
duration = min(10.0, max(3.0, 2.0 + len(words) * 0.4))
tts_element = {"type": "tts", "text": narration, "voice": "en", "duration": duration} # Duration is approximate here
elements.append(media_element)
elements.append(tts_element)
if not elements:
print("Error: No elements created after parsing.")
return elements
except Exception as e:
print(f"Error parsing script: {e}\nScript content was:\n{script_text}")
return []
def search_pexels(query, api_key, media_type="videos"):
"""Searches Pexels for videos or images."""
if not api_key or api_key == 'YOUR_PEXELS_API_KEY_HERE':
print("Error: Pexels API Key not configured.")
return None
headers = {'Authorization': api_key}
base_url = f"https://api.pexels.com/{media_type}/search"
results = []
# Search multiple pages for better results
for page in range(1, 4): # Check first 3 pages
try:
params = {"query": query, "per_page": 15, "page": page}
if media_type == "videos":
params["orientation"] = "landscape" if TARGET_RESOLUTION[0] > TARGET_RESOLUTION[1] else "portrait"
else: # images
params["orientation"] = "landscape" if TARGET_RESOLUTION[0] > TARGET_RESOLUTION[1] else "portrait"
response = requests.get(base_url, headers=headers, params=params, timeout=15)
response.raise_for_status()
data = response.json()
if media_type == "videos":
media_items = data.get("videos", [])
for item in media_items:
video_files = item.get("video_files", [])
# Prioritize HD or FHD based on target resolution, fallback to highest available
target_quality = "hd" # 1280x720 or 1920x1080
if TARGET_RESOLUTION[0] >= 1920 or TARGET_RESOLUTION[1] >= 1920:
target_quality = "fhd" # Often not available, but check anyway
link = None
for file in video_files:
# Pexels uses 'hd' for 1920x1080 too sometimes
if file.get("quality") == target_quality or file.get("quality") == "hd":
link = file.get("link")
break
if not link and video_files: # Fallback to the first link if specific quality not found
link = video_files[0].get("link")
if link:
results.append(link)
else: # images
media_items = data.get("photos", [])
for item in media_items:
# Get original size, resizing happens later
link = item.get("src", {}).get("original")
if link:
results.append(link)
except requests.exceptions.RequestException as e:
print(f"Warning: Pexels API request failed for '{query}' (page {page}, {media_type}): {e}")
# Don't stop searching on a single page failure
continue
except Exception as e:
print(f"Warning: Unexpected error during Pexels search for '{query}': {e}")
continue
if results:
print(f"Found {len(results)} Pexels {media_type} for '{query}'. Choosing one randomly.")
return random.choice(results)
else:
print(f"Warning: No Pexels {media_type} found for query: '{query}'")
return None
def search_google_images(query):
"""Searches Google Images (use cautiously, scraping can be fragile)."""
print(f"Attempting Google Image search for: '{query}' (Use with caution)")
try:
search_url = f"https://www.google.com/search?q={quote(query)}&tbm=isch&safe=active" # Added safe search
headers = {"User-Agent": USER_AGENT}
response = requests.get(search_url, headers=headers, timeout=15)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
image_urls = []
# Google changes its structure often, this might need updates
# Look for image data embedded in script tags or specific img tags
# This is a common pattern, but highly unstable
img_tags = soup.find_all("img")
for img in img_tags:
src = img.get("src") or img.get("data-src")
if src and src.startswith("http") and "gstatic" not in src and "googlelogo" not in src:
# Basic check for valid image extensions or base64
if re.search(r'\.(jpg|jpeg|png|webp)$', src, re.IGNORECASE) or src.startswith('data:image'):
image_urls.append(src)
# Limit the number of results to avoid processing too many
image_urls = image_urls[:10] # Consider first 10 potential images
if image_urls:
print(f"Found {len(image_urls)} potential Google Images for '{query}'. Choosing one.")
return random.choice(image_urls)
else:
print(f"Warning: No suitable Google Images found for query: '{query}'")
return None
except requests.exceptions.RequestException as e:
print(f"Warning: Google Image search failed for '{query}': {e}")
return None
except Exception as e:
print(f"Warning: Error parsing Google Image search results for '{query}': {e}")
return None
def download_media(media_url, filename):
"""Downloads media (image or video) from a URL."""
try:
headers = {"User-Agent": USER_AGENT} # Use User-Agent for downloads too
response = requests.get(media_url, headers=headers, stream=True, timeout=30) # Increased timeout for large files
response.raise_for_status()
with open(filename, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"Successfully downloaded media to {filename}")
# Verify image integrity and convert if necessary
if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.webp')):
try:
img = Image.open(filename)
img.verify() # Verify that it is, in fact an image
# Re-open image for conversion check
img = Image.open(filename)
if img.mode != 'RGB':
print(f"Converting image {filename} to RGB.")
img = img.convert('RGB')
img.save(filename, "JPEG") # Save as JPEG for compatibility
img.close()
except (IOError, SyntaxError, Image.UnidentifiedImageError) as img_e:
print(f"Warning: Downloaded file {filename} is not a valid image or is corrupted: {img_e}. Removing.")
os.remove(filename)
return None
# Basic video check (can be expanded)
elif filename.lower().endswith(('.mp4', '.mov', '.avi')):
if os.path.getsize(filename) < 1024: # Check if file is too small (likely error)
print(f"Warning: Downloaded video file {filename} is suspiciously small. Removing.")
os.remove(filename)
return None
return filename
except requests.exceptions.RequestException as e:
print(f"Error downloading media from {media_url}: {e}")
if os.path.exists(filename):
os.remove(filename)
return None
except Exception as e:
print(f"An unexpected error occurred during media download: {e}")
if os.path.exists(filename):
os.remove(filename)
return None
def generate_media(prompt, current_index=0, total_segments=1):
"""Generates media (video or image) based on the prompt."""
safe_prompt = re.sub(r'[^\w\s-]', '', prompt).strip().replace(' ', '_')
if not safe_prompt: safe_prompt = f"media_{current_index}" # Fallback filename
print(f"\n--- Generating Media for Prompt: '{prompt}' ---")
# --- Strategy ---
# 1. Try Pexels Video
# 2. Try Pexels Image
# 3. If prompt contains 'news' or similar, try Google Image as fallback
# 4. Use generic Pexels image as last resort
# 1. Try Pexels Video
video_url = search_pexels(prompt, PEXELS_API_KEY, media_type="videos")
if video_url:
video_file = os.path.join(TEMP_FOLDER, f"{safe_prompt}_video.mp4")
if download_media(video_url, video_file):
print(f"Using Pexels video for '{prompt}'")
return {"path": video_file, "asset_type": "video"}
else:
print(f"Failed to download Pexels video for '{prompt}'.")
# 2. Try Pexels Image
image_url = search_pexels(prompt, PEXELS_API_KEY, media_type="photos")
if image_url:
image_file = os.path.join(TEMP_FOLDER, f"{safe_prompt}_image.jpg")
if download_media(image_url, image_file):
print(f"Using Pexels image for '{prompt}'")
return {"path": image_file, "asset_type": "image"}
else:
print(f"Failed to download Pexels image for '{prompt}'.")
# 3. Try Google Image (especially for specific/newsy terms)
if "news" in prompt.lower() or "breaking" in prompt.lower() or len(prompt.split()) > 4: # Heuristic for specific terms
google_image_url = search_google_images(prompt)
if google_image_url:
google_image_file = os.path.join(TEMP_FOLDER, f"{safe_prompt}_google_image.jpg")
if download_media(google_image_url, google_image_file):
print(f"Using Google image for '{prompt}' as fallback.")
return {"path": google_image_file, "asset_type": "image"}
else:
print(f"Failed to download Google image for '{prompt}'.")
# 4. Fallback to generic Pexels images
print(f"Could not find specific media for '{prompt}'. Using generic fallback.")
fallback_terms = ["abstract", "technology", "texture", "nature", "cityscape"]
random.shuffle(fallback_terms) # Try different fallbacks
for term in fallback_terms:
fallback_url = search_pexels(term, PEXELS_API_KEY, media_type="photos")
if fallback_url:
fallback_file = os.path.join(TEMP_FOLDER, f"fallback_{term}_{current_index}.jpg")
if download_media(fallback_url, fallback_file):
print(f"Using fallback Pexels image ('{term}')")
return {"path": fallback_file, "asset_type": "image"}
else:
print(f"Failed to download fallback Pexels image ('{term}')")
print(f"Error: Failed to generate any media for prompt: '{prompt}'")
return None # Failed to get any media
def generate_tts(text, voice="en"):
"""Generates Text-to-Speech audio using Kokoro or gTTS."""
safe_text = re.sub(r'[^\w\s-]', '', text[:15]).strip().replace(' ', '_')
if not safe_text: safe_text = f"tts_{random.randint(1000, 9999)}"
file_path = os.path.join(TEMP_FOLDER, f"{safe_text}.wav")
# Attempt Kokoro first if available
if pipeline:
try:
print(f"Generating TTS with Kokoro for: '{text[:30]}...'")
# Kokoro specific voice if needed, 'en' might map internally or use a default
# The original code used 'af_heart' for 'en', let's try that logic
kokoro_voice = 'af_heart' if voice == 'en' else voice # Adjust if Kokoro has different voice codes
generator = pipeline(text, voice=kokoro_voice, speed=0.95, split_pattern=r'\n+|[.!?]+') # Adjust speed/split
audio_segments = [audio for _, _, audio in generator]
if not audio_segments:
raise ValueError("Kokoro returned no audio segments.")
# Ensure segments are numpy arrays before concatenating
valid_segments = [seg for seg in audio_segments if isinstance(seg, np.ndarray) and seg.size > 0]
if not valid_segments:
raise ValueError("Kokoro returned empty or invalid audio segments.")
full_audio = np.concatenate(valid_segments) if len(valid_segments) > 0 else valid_segments[0]
# Ensure audio is float32 for soundfile
if full_audio.dtype != np.float32:
full_audio = full_audio.astype(np.float32)
# Normalize if needed (Kokoro might output integers)
max_val = np.max(np.abs(full_audio))
if max_val > 1.0:
full_audio /= max_val
sf.write(file_path, full_audio, 24000) # Kokoro typically outputs at 24kHz
print(f"Kokoro TTS generated successfully: {file_path}")
return file_path
except Exception as e:
print(f"Warning: Kokoro TTS failed: {e}. Falling back to gTTS.")
# Fall through to gTTS
# Fallback to gTTS
try:
print(f"Generating TTS with gTTS for: '{text[:30]}...'")
tts = gTTS(text=text, lang=voice, slow=False) # Use voice as language code for gTTS
# Save as mp3 first, then convert
mp3_path = os.path.join(TEMP_FOLDER, f"tts_{safe_text}.mp3")
tts.save(mp3_path)
audio = AudioSegment.from_mp3(mp3_path)
# Export as WAV for consistency with moviepy
audio.export(file_path, format="wav")
os.remove(mp3_path) # Clean up temporary mp3
print(f"gTTS TTS generated successfully: {file_path}")
return file_path
except Exception as e:
print(f"Error: gTTS also failed: {e}. Generating silence.")
# Final fallback: generate silence
try:
# Estimate duration based on text length (similar to parsing logic)
words = text.split()
duration_seconds = min(10.0, max(3.0, 2.0 + len(words) * 0.4))
samplerate = 24000 # Match Kokoro's typical rate
num_samples = int(duration_seconds * samplerate)
silence = np.zeros(num_samples, dtype=np.float32)
sf.write(file_path, silence, samplerate)
print(f"Generated silence fallback: {file_path} ({duration_seconds:.1f}s)")
return file_path
except Exception as silence_e:
print(f"Error: Failed even to generate silence: {silence_e}")
return None # Complete failure
def apply_kenburns_effect(clip, target_resolution, effect_type="random"):
"""Applies a Ken Burns effect (zoom/pan) to an ImageClip."""
target_w, target_h = target_resolution
# Ensure clip has dimensions (might be needed if ImageClip wasn't fully initialized)
if not hasattr(clip, 'w') or not hasattr(clip, 'h') or clip.w == 0 or clip.h == 0:
print("Warning: Clip dimensions not found for Ken Burns effect. Using target resolution.")
# Attempt to get frame to determine size, or default
try:
frame = clip.get_frame(0)
clip.w, clip.h = frame.shape[1], frame.shape[0]
except:
clip.w, clip.h = target_w, target_h # Fallback
# Resize image to cover target area while maintaining aspect ratio
clip_aspect = clip.w / clip.h
target_aspect = target_w / target_h
if clip_aspect > target_aspect: # Image is wider than target
new_height = target_h
new_width = int(new_height * clip_aspect)
else: # Image is taller than target
new_width = target_w
new_height = int(new_width / clip_aspect)
# Resize slightly larger than needed for the effect
base_scale = 1.20 # Zoom factor range
zoom_width = int(new_width * base_scale)
zoom_height = int(new_height * base_scale)
# Use PIL for initial resize - often better quality for large changes
try:
pil_img = Image.fromarray(clip.get_frame(0)) # Get frame as PIL image
resized_pil = pil_img.resize((zoom_width, zoom_height), Image.Resampling.LANCZOS)
resized_clip = ImageClip(np.array(resized_pil)).set_duration(clip.duration)
clip = resized_clip # Use the better resized clip
clip.w, clip.h = zoom_width, zoom_height # Update dimensions
except Exception as pil_e:
print(f"Warning: PIL resize failed ({pil_e}). Using moviepy resize.")
clip = clip.resize(newsize=(zoom_width, zoom_height))
# Calculate max offsets for panning
max_offset_x = max(0, clip.w - target_w)
max_offset_y = max(0, clip.h - target_h)
# Define effect types
available_effects = ["zoom-in", "zoom-out", "pan-left", "pan-right", "pan-up", "pan-down", "slow-zoom"]
if effect_type == "random":
effect_type = random.choice(available_effects)
print(f"Applying Ken Burns effect: {effect_type}")
# Determine start/end zoom and center positions based on effect
start_zoom, end_zoom = 1.0, 1.0
start_center_x, start_center_y = clip.w / 2, clip.h / 2
end_center_x, end_center_y = clip.w / 2, clip.h / 2
if effect_type == "zoom-in":
start_zoom = 1.0
end_zoom = 1 / base_scale # Zoom factor applied to crop size
elif effect_type == "zoom-out":
start_zoom = 1 / base_scale
end_zoom = 1.0
elif effect_type == "slow-zoom":
start_zoom = 1.0
end_zoom = 1 / 1.05 # Very subtle zoom in
elif effect_type == "pan-left":
start_center_x = target_w / 2
end_center_x = clip.w - target_w / 2
start_center_y = end_center_y = clip.h / 2 # Center vertically
elif effect_type == "pan-right":
start_center_x = clip.w - target_w / 2
end_center_x = target_w / 2
start_center_y = end_center_y = clip.h / 2
elif effect_type == "pan-up":
start_center_y = target_h / 2
end_center_y = clip.h - target_h / 2
start_center_x = end_center_x = clip.w / 2 # Center horizontally
elif effect_type == "pan-down":
start_center_y = clip.h - target_h / 2
end_center_y = target_h / 2
start_center_x = end_center_x = clip.w / 2
# Add more effects like diagonal pans if desired
def transform_frame(get_frame, t):
frame = get_frame(t) # Get the frame from the (potentially PIL-resized) clip
# Smooth interpolation (ease-in, ease-out)
ratio = 0.5 - 0.5 * math.cos(math.pi * t / clip.duration) if clip.duration > 0 else 0
current_zoom = start_zoom + (end_zoom - start_zoom) * ratio
crop_w = int(target_w / current_zoom)
crop_h = int(target_h / current_zoom)
# Ensure crop dimensions are not larger than the frame itself
crop_w = min(crop_w, clip.w)
crop_h = min(crop_h, clip.h)
current_center_x = start_center_x + (end_center_x - start_center_x) * ratio
current_center_y = start_center_y + (end_center_y - start_center_y) * ratio
# Clamp center position to avoid cropping outside the image boundaries
min_center_x = crop_w / 2
max_center_x = clip.w - crop_w / 2
min_center_y = crop_h / 2
max_center_y = clip.h - crop_h / 2
current_center_x = max(min_center_x, min(current_center_x, max_center_x))
current_center_y = max(min_center_y, min(current_center_y, max_center_y))
# Perform the crop using cv2.getRectSubPix for subpixel accuracy
# Ensure frame is contiguous array for cv2
if not frame.flags['C_CONTIGUOUS']:
frame = np.ascontiguousarray(frame)
try:
cropped_frame = cv2.getRectSubPix(frame, (crop_w, crop_h), (current_center_x, current_center_y))
# Resize the cropped area to the final target resolution
# Using LANCZOS4 for potentially better quality resizing
final_frame = cv2.resize(cropped_frame, (target_w, target_h), interpolation=cv2.INTER_LANCZOS4)
return final_frame
except cv2.error as cv2_err:
print(f"Error during cv2 operation in Ken Burns: {cv2_err}")
print(f"Frame shape: {frame.shape}, Crop W/H: {crop_w}/{crop_h}, Center X/Y: {current_center_x}/{current_center_y}")
# Fallback: return uncropped frame resized? Or black frame?
return cv2.resize(frame, (target_w, target_h), interpolation=cv2.INTER_LINEAR) # Fallback resize
# Apply the transformation function to the clip
return clip.fl(transform_frame, apply_to=['mask']) # Apply to mask if it exists
def resize_to_fill(clip, target_resolution):
"""Resizes a video clip to fill the target resolution, cropping if necessary."""
target_w, target_h = target_resolution
clip_w, clip_h = clip.w, clip.h
if clip_w == 0 or clip_h == 0:
print("Warning: Clip has zero dimensions before resize_to_fill. Cannot resize.")
# Return a black clip of the target size?
return ColorClip(size=target_resolution, color=(0,0,0), duration=clip.duration)
clip_aspect = clip_w / clip_h
target_aspect = target_w / target_h
if math.isclose(clip_aspect, target_aspect, rel_tol=1e-3):
# Aspect ratios are close enough, just resize
print(f"Resizing video clip {clip.filename} to {target_resolution} (aspect match).")
return clip.resize(newsize=target_resolution)
elif clip_aspect > target_aspect:
# Clip is wider than target aspect ratio, resize to target height and crop width
print(f"Resizing video clip {clip.filename} to height {target_h} and cropping width.")
clip = clip.resize(height=target_h)
# Calculate amount to crop from each side
crop_amount = (clip.w - target_w) / 2
if crop_amount < 0: # Avoid negative crop
print("Warning: Negative crop amount calculated in resize_to_fill (width). Resizing only.")
return clip.resize(newsize=target_resolution)
return clip.crop(x1=crop_amount, width=target_w)
else:
# Clip is taller than target aspect ratio, resize to target width and crop height
print(f"Resizing video clip {clip.filename} to width {target_w} and cropping height.")
clip = clip.resize(width=target_w)
# Calculate amount to crop from top/bottom
crop_amount = (clip.h - target_h) / 2
if crop_amount < 0: # Avoid negative crop
print("Warning: Negative crop amount calculated in resize_to_fill (height). Resizing only.")
return clip.resize(newsize=target_resolution)
return clip.crop(y1=crop_amount, height=target_h)
def add_background_music(final_video, bg_music_path=BACKGROUND_MUSIC_PATH, bg_music_volume=0.08):
"""Adds background music to the final video."""
if not os.path.exists(bg_music_path):
print(f"Warning: Background music file not found at {bg_music_path}. Skipping.")
return final_video
try:
print("Adding background music...")
bg_music = AudioFileClip(bg_music_path)
if final_video.duration is None or final_video.duration <= 0:
print("Warning: Final video has no duration. Cannot add background music.")
return final_video
if bg_music.duration is None or bg_music.duration <= 0:
print("Warning: Background music has no duration. Skipping.")
return final_video
# Loop or trim background music to match video duration
if bg_music.duration < final_video.duration:
loops_needed = math.ceil(final_video.duration / bg_music.duration)
print(f"Looping background music {loops_needed} times.")
bg_music = concatenate_audioclips([bg_music] * loops_needed)
# Trim to exact duration
bg_music = bg_music.subclip(0, final_video.duration)
# Adjust volume
bg_music = bg_music.volumex(bg_music_volume)
# Combine with existing audio (if any)
video_audio = final_video.audio
if video_audio:
# Normalize main audio slightly? Optional.
# video_audio = video_audio.volumex(1.0) # Keep original volume
print("Mixing existing audio with background music.")
mixed_audio = CompositeAudioClip([video_audio, bg_music])
else:
print("No existing audio found. Using only background music.")
mixed_audio = bg_music
# Set the new audio track
final_video = final_video.set_audio(mixed_audio)
print("Background music added successfully.")
return final_video
except Exception as e:
print(f"Error adding background music: {e}")
# Return the original video without crashing
return final_video
def create_clip(media_path, asset_type, tts_path, duration=None, effects=None, narration_text=None, segment_index=0):
"""Creates a single video clip segment with media, audio, and optional captions."""
print(f"\n--- Creating Clip Segment {segment_index} ---")
print(f"Media: {media_path} ({asset_type})")
print(f"TTS: {tts_path}")
print(f"Narration: '{narration_text[:50]}...'")
try:
# Validate inputs
if not media_path or not os.path.exists(media_path):
print(f"Error: Media path not found or invalid: {media_path}")
return None
if not tts_path or not os.path.exists(tts_path):
print(f"Error: TTS path not found or invalid: {tts_path}")
# Attempt to use media without audio? Or fail? Let's fail for now.
return None
# Load audio first to determine duration
audio_clip = AudioFileClip(tts_path)
# Add slight fade out to avoid abrupt cuts
audio_clip = audio_clip.audio_fadeout(0.2)
target_duration = audio_clip.duration
if target_duration is None or target_duration <= 0.1: # Check for valid duration
print(f"Warning: Audio clip {tts_path} has invalid duration ({target_duration}). Estimating 3 seconds.")
target_duration = 3.0 # Fallback duration
# Recreate audio clip with fixed duration if possible? Or just use the duration.
audio_clip = audio_clip.set_duration(target_duration)
print(f"Audio Duration: {target_duration:.2f}s")
# --- Create Video/Image Clip ---
clip = None
if asset_type == "video":
try:
clip = VideoFileClip(media_path, target_resolution=TARGET_RESOLUTION[::-1]) # Provide target res hint
# Ensure video has audio track removed initially if we overlay TTS fully
clip = clip.without_audio()
# Resize/Crop to fill target resolution
clip = resize_to_fill(clip, TARGET_RESOLUTION)
# Loop or cut video to match audio duration
if clip.duration < target_duration:
print(f"Looping video (duration {clip.duration:.2f}s) to match audio.")
clip = clip.loop(duration=target_duration)
else:
# Start from a random point if video is longer? Or just take the start?
start_time = 0
# Optional: random start time if video is much longer
# if clip.duration > target_duration + 2:
# start_time = random.uniform(0, clip.duration - target_duration)
print(f"Subclipping video from {start_time:.2f}s to {start_time + target_duration:.2f}s.")
clip = clip.subclip(start_time, start_time + target_duration)
# Add fade in/out for smoother transitions
clip = clip.fadein(0.3).fadeout(0.3)
except Exception as video_e:
print(f"Error processing video file {media_path}: {video_e}")
# Fallback to a black screen?
clip = ColorClip(size=TARGET_RESOLUTION, color=(0,0,0), duration=target_duration)
elif asset_type == "image":
try:
# Use tempfile for converted image if needed (handled in download now)
# Load image clip
clip = ImageClip(media_path).set_duration(target_duration)
# Apply Ken Burns effect
clip = apply_kenburns_effect(clip, TARGET_RESOLUTION, effect_type=effects or "random")
# Fades are good for images too
clip = clip.fadein(0.3).fadeout(0.3)
except Exception as img_e:
print(f"Error processing image file {media_path}: {img_e}")
# Fallback to a grey screen?
clip = ColorClip(size=TARGET_RESOLUTION, color=(50,50,50), duration=target_duration)
else:
print(f"Error: Unknown asset type '{asset_type}'")
return None # Unknown type
# Ensure clip has the correct duration after processing
clip = clip.set_duration(target_duration)
# --- Add Captions ---
subtitle_clips = []
if narration_text and CAPTION_COLOR != "transparent":
print("Adding captions...")
try:
# Simple word splitting for timing (can be improved with proper SRT/timing info)
words = narration_text.split()
words_per_chunk = 5 # Adjust number of words per caption line
chunks = [' '.join(words[i:i+words_per_chunk]) for i in range(0, len(words), words_per_chunk)]
if not chunks: chunks = [narration_text] # Handle empty or short text
chunk_duration = target_duration / len(chunks) if len(chunks) > 0 else target_duration
# Calculate font size based on resolution (heuristic)
font_size = int(TARGET_RESOLUTION[1] / 25) # Adjust divisor as needed
# Position captions towards the bottom
subtitle_y_position = int(TARGET_RESOLUTION[1] * 0.85) # Lower position
for i, chunk_text in enumerate(chunks):
start_time = i * chunk_duration
# Ensure end time doesn't exceed clip duration
end_time = min((i + 1) * chunk_duration, target_duration)
# Avoid zero-duration captions
if end_time <= start_time: end_time = start_time + 0.1
# Create TextClip for the chunk
# Ensure font is available in the environment (Arial is common, but might need install)
# Added stroke for better visibility
txt_clip = TextClip(
chunk_text,
fontsize=font_size,
font='Arial-Bold', # Ensure this font is available or choose another like 'Liberation-Sans-Bold'
color=CAPTION_COLOR,
bg_color='rgba(0, 0, 0, 0.5)', # Slightly darker background
method='caption', # Wraps text
align='center',
stroke_color='black', # Black stroke
stroke_width=max(1, font_size // 20), # Stroke width relative to font size
size=(TARGET_RESOLUTION[0] * 0.85, None) # Limit width
).set_start(start_time).set_duration(end_time - start_time).set_position(('center', subtitle_y_position))
subtitle_clips.append(txt_clip)
# Composite the main clip with subtitles
if subtitle_clips:
clip = CompositeVideoClip([clip] + subtitle_clips, size=TARGET_RESOLUTION)
print(f"Added {len(subtitle_clips)} caption segments.")
except Exception as caption_e:
# This often happens if ImageMagick or fonts are missing/misconfigured
print(f"ERROR: Failed to create captions: {caption_e}")
print("Check if ImageMagick is installed and configured, and if the font (e.g., Arial-Bold) is available.")
# Continue without captions if they fail
# Set the audio track
clip = clip.set_audio(audio_clip)
print(f"Clip Segment {segment_index} created successfully.")
return clip
except Exception as e:
print(f"FATAL ERROR creating clip segment {segment_index}: {e}")
import traceback
traceback.print_exc() # Print detailed traceback for debugging
# Return a short, silent black clip to avoid crashing the concatenation
return ColorClip(size=TARGET_RESOLUTION, color=(0,0,0), duration=1.0).set_audio(None)
# Main Gradio Function
def generate_video(video_concept, resolution_choice, caption_option):
"""The main function called by Gradio to generate the video."""
print("\n\n--- Starting Video Generation ---")
print(f"Concept: {video_concept}")
print(f"Resolution: {resolution_choice}")
print(f"Captions: {caption_option}")
global TARGET_RESOLUTION, CAPTION_COLOR
# Set global config based on input
if resolution_choice == "Short (9:16)":
TARGET_RESOLUTION = (1080, 1920)
else: # Default to Full HD
TARGET_RESOLUTION = (1920, 1080)
CAPTION_COLOR = "white" if caption_option == "Yes" else "transparent" # Use "transparent" to disable
# --- Cleanup and Setup ---
if os.path.exists(TEMP_FOLDER):
print(f"Removing existing temp folder: {TEMP_FOLDER}")
shutil.rmtree(TEMP_FOLDER)
try:
os.makedirs(TEMP_FOLDER)
print(f"Created temp folder: {TEMP_FOLDER}")
except OSError as e:
print(f"Error creating temp folder {TEMP_FOLDER}: {e}")
return f"Error: Could not create temporary directory. Check permissions. {e}" # Return error message to Gradio
# --- Script Generation ---
print("Generating script...")
script = generate_script(video_concept)
if not script:
print("Error: Failed to generate script.")
shutil.rmtree(TEMP_FOLDER) # Clean up
return "Error: Failed to generate script from AI. Please try a different concept or check API keys." # Return error message
# --- Script Parsing ---
print("Parsing script...")
elements = parse_script(script)
if not elements:
print("Error: Failed to parse script into elements.")
shutil.rmtree(TEMP_FOLDER) # Clean up
return "Error: Failed to parse the generated script. The script might be malformed." # Return error message
# Pair media prompts with TTS elements
paired_elements = []
if len(elements) >= 2:
for i in range(0, len(elements), 2):
if i + 1 < len(elements) and elements[i]['type'] == 'media' and elements[i+1]['type'] == 'tts':
paired_elements.append((elements[i], elements[i+1]))
else:
print(f"Warning: Skipping mismatched elements at index {i}")
if not paired_elements:
print("Error: No valid media/TTS pairs found after parsing.")
shutil.rmtree(TEMP_FOLDER) # Clean up
return "Error: Could not find valid [Title]/Narration pairs in the script." # Return error message
print(f"Found {len(paired_elements)} pairs of media prompts and narrations.")
# --- Clip Generation Loop ---
clips = []
total_segments = len(paired_elements)
for idx, (media_elem, tts_elem) in enumerate(paired_elements):
print(f"\nProcessing Segment {idx+1}/{total_segments}: Prompt='{media_elem['prompt']}'")
# 1. Generate Media (Video/Image)
media_asset = generate_media(media_elem['prompt'], current_index=idx, total_segments=total_segments)
if not media_asset or not media_asset.get('path'):
print(f"Warning: Failed to generate media for '{media_elem['prompt']}'. Skipping segment.")
# Option: Create a placeholder clip instead of skipping?
# clips.append(ColorClip(size=TARGET_RESOLUTION, color=(20,0,0), duration=3.0)) # Short red flash?
continue # Skip this segment
# 2. Generate TTS
tts_path = generate_tts(tts_elem['text'], tts_elem['voice'])
if not tts_path:
print(f"Warning: Failed to generate TTS for segment {idx}. Skipping segment.")
# Option: Create clip without audio? Requires adjusting create_clip
continue # Skip this segment
# 3. Create MoviePy Clip Segment
clip = create_clip(
media_path=media_asset['path'],
asset_type=media_asset['asset_type'],
tts_path=tts_path,
duration=tts_elem['duration'], # Duration hint (create_clip prioritizes actual audio length)
effects=media_elem.get('effects', 'random'),
narration_text=tts_elem['text'],
segment_index=idx
)
if clip:
clips.append(clip)
else:
print(f"Warning: Failed to create clip for segment {idx}. Skipping.")
# Maybe add a fallback black clip here too?
# --- Final Video Assembly ---
if not clips:
print("Error: No clips were successfully created.")
shutil.rmtree(TEMP_FOLDER) # Clean up
return "Error: Failed to create any video segments. Check logs for media/TTS/clip creation errors." # Return error message
print(f"\nConcatenating {len(clips)} video clips...")
try:
# Concatenate all the generated clips
final_video = concatenate_videoclips(clips, method="compose") # 'compose' handles transparency if needed
except Exception as concat_e:
print(f"Error during video concatenation: {concat_e}")
shutil.rmtree(TEMP_FOLDER)
return f"Error: Failed to combine video segments: {concat_e}"
# --- Add Background Music ---
final_video = add_background_music(final_video, bg_music_volume=0.08) # Adjust volume as needed
# --- Write Output File ---
print(f"Writing final video to {OUTPUT_VIDEO_FILENAME}...")
try:
# Write the final video file
# Use preset 'medium' or 'slow' for better quality/compression ratio if time allows
# Use 'libx264' for wide compatibility, 'aac' for audio codec
# threads=4 can speed up encoding on multi-core CPUs
final_video.write_videofile(
OUTPUT_VIDEO_FILENAME,
codec='libx264',
audio_codec='aac',
fps=24, # Standard frame rate
preset='medium', # 'veryfast', 'fast', 'medium', 'slow', 'veryslow'
threads=4, # Adjust based on CPU cores
logger='bar' # Show progress bar
)
print("Final video written successfully.")
except Exception as write_e:
print(f"Error writing final video file: {write_e}")
shutil.rmtree(TEMP_FOLDER)
return f"Error: Failed to write the final video file: {write_e}"
finally:
# --- Cleanup ---
# Close clips to release file handles (important on some OS)
for clip in clips:
clip.close()
if final_video:
final_video.close()
if 'bg_music' in locals() and bg_music: # Close bg music if loaded
bg_music.close()
if 'audio_clip' in locals() and audio_clip: # Close last audio clip
audio_clip.close()
print(f"Cleaning up temporary folder: {TEMP_FOLDER}")
shutil.rmtree(TEMP_FOLDER)
print("--- Video Generation Complete ---")
# Return the path to the generated video for Gradio
return OUTPUT_VIDEO_FILENAME
# --- Gradio Interface Definition ---
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown(
"""
# 🎬 AI Documentary Video Generator 🎥
Enter a concept or topic, and the AI will generate a short, humorous documentary-style video.
Configure API keys (Pexels, OpenRouter) and ensure `background_music.mp3` exists before running.
"""
)
with gr.Row():
with gr.Column(scale=2):
video_concept = gr.Textbox(
label="Video Concept / Topic / Script",
placeholder="e.g., 'The secret life of squirrels', 'Why cats secretly judge us', or paste a full script starting with [Title]...",
lines=4
)
with gr.Row():
resolution = gr.Dropdown(
["Full HD (16:9)", "Short (9:16)"],
label="Resolution",
value="Full HD (16:9)"
)
caption_option = gr.Dropdown(
["Yes", "No"],
label="Add Captions",
value="Yes"
)
generate_btn = gr.Button("✨ Generate Video ✨", variant="primary")
with gr.Column(scale=3):
output_video = gr.Video(label="Generated Video")
status_message = gr.Textbox(label="Status", interactive=False) # To show errors or progress
# Connect button click to the main function
generate_btn.click(
fn=generate_video,
inputs=[video_concept, resolution, caption_option],
outputs=[output_video] # Can also output to status_message if needed
# Example with status: outputs=[output_video, status_message]
)
# Launch the Gradio app
if __name__ == "__main__":
# Check for background music file on startup
if not os.path.exists(BACKGROUND_MUSIC_PATH):
print(f"\n*** WARNING: Background music file '{BACKGROUND_MUSIC_PATH}' not found. Background music will be skipped. ***\n")
demo.launch(debug=True) # debug=True provides more detailed logs