video / app.py
testdeep123's picture
Update app.py
1125cdd verified
raw
history blame
74.4 kB
# Import necessary libraries (Ensure all are installed: moviepy, soundfile, torch,
# pydub, requests, pillow, numpy, beautifulsoup4, gtts, gradio, kokoro, opencv-python)
from kokoro import KPipeline
import soundfile as sf
import torch
import soundfile as sf
import os
from moviepy.editor import VideoFileClip, AudioFileClip, ImageClip
from PIL import Image
import tempfile
import random
import cv2
import math
import os, requests, io, time, re, random
from moviepy.editor import (
VideoFileClip, concatenate_videoclips, AudioFileClip, ImageClip,
CompositeVideoClip, TextClip, CompositeAudioClip
)
import moviepy.video.fx.all as vfx
import moviepy.config as mpy_config
from pydub import AudioSegment
from pydub.generators import Sine
from PIL import Image, ImageDraw, ImageFont
import numpy as np
from bs4 import BeautifulSoup
import base64
from urllib.parse import quote
# pysrt is imported but not used in the provided code snippets, keeping for completeness
# import pysrt
from gtts import gTTS
import gradio as gr # Import Gradio
import shutil # Needed for temp folder cleanup
# Initialize Kokoro TTS pipeline (using American English)
# Ensure you have the required voice models downloaded for Kokoro if needed,
# or it will fall back to gTTS. 'a' for American English uses voice 'af_heart'.
try:
pipeline = KPipeline(lang_code='a')
print("Kokoro TTS pipeline initialized.")
except Exception as e:
print(f"Warning: Could not initialize Kokoro TTS pipeline: {e}. Will rely on gTTS.")
pipeline = None # Set pipeline to None if initialization fails
# Ensure ImageMagick binary is set (Adjust path as needed for your system)
# This line requires imagemagick to be installed and the path correct.
# If TextClip fails, check ImageMagick installation and policy.xml (handled by fix_imagemagick_policy).
mpy_config.change_settings({"IMAGEMAGICK_BINARY": "/usr/bin/convert"})
# ---------------- Global Configuration (Some now used as defaults/initial values) ---------------- #
# Replace with your actual keys
PEXELS_API_KEY = os.environ.get('PEXELS_API_KEY', 'YOUR_PEXELS_API_KEY') # Use env var or placeholder
OPENROUTER_API_KEY = os.environ.get('OPENROUTER_API_KEY', 'YOUR_OPENROUTER_API_KEY') # Use env var or placeholder
OPENROUTER_MODEL = "mistralai/mistral-small-3.1-24b-instruct:free" # Or another preferred model
OUTPUT_VIDEO_FILENAME = "final_video.mp4"
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
# Maximum number of script segments to display for editing
MAX_SEGMENTS_FOR_EDITING = 15
# Global placeholder for the temporary folder, will be created per run
TEMP_FOLDER = None
# ---------------- Helper Functions ---------------- #
# Most helper functions remain the same, but some might need minor adjustments
# to use passed parameters instead of global state (e.g., resolution).
# We'll update create_clip and add_background_music accordingly.
def generate_script(user_input, api_key, model):
"""Generate documentary script with proper OpenRouter handling."""
if not api_key or api_key == 'YOUR_OPENROUTER_API_KEY':
print("OpenRouter API key not set. Skipping script generation.")
return "[Error] API key not configured."
headers = {
'Authorization': f'Bearer {api_key}',
'HTTP-Referer': 'https://your-domain.com', # Replace with your app's domain
'X-Title': 'AI Documentary Maker'
}
# Updated prompt based on user's specific requirements for tone and style
prompt = f"""Short Documentary Script Generator
Instructions:
If I say "use this," output the script exactly as given.
If I only give topics, generate a script based on them.
If I provide a full script, rewrite it without any changes.
Make everything short, simple, humorous, funny, and act serious but be humorous.
Don't say anything off topic.
Always end with a funny call to subscribe based on the video topic.
Use normal conversational text, avoid AI phrases, humanize the statements.
Always focus on negative or cynical views, avoid positive statements.
The full script should be for a single video about one topic.
Formatting Rules:
Title in Square Brackets: Each section starts with a one-word title inside [ ] (max two words if necessary). This title will be used as a search term for Pexels footage.
Casual & Funny Narration: Each section has 5-15 words of narration. Keep it natural, funny, and unpredictable.
No Special Formatting: No bold, italics, or special characters.
Generalized Search Terms: If a term is too specific, make it more general for Pexels search.
Scene-Specific Writing: Each section describes only what should be shown in the video.
Output Only the Script, make it funny/humorous/hilarious, and add a funny subscribe statement.
No extra text, just the script.
Example Output:
[Cats]
They plot world domination while napping.
[Dogs]
Loyalty is just a bribe for snacks.
[Humans]
The only species that pays to live on a planet they destroy.
[Future]
It looks suspiciously like the present, but with more screens.
[Warning]
Subscribe or a cat will steal your bandwidth.
Now here is the Topic/script: {user_input}
"""
data = {
'model': model,
'messages': [{'role': 'user', 'content': prompt}],
'temperature': 0.7, # Increased temperature slightly for more unpredictable humor
'max_tokens': 500 # Limit token response to keep scripts short
}
try:
response = requests.post(
'https://openrouter.ai/api/v1/chat/completions',
headers=headers,
json=data,
timeout=45 # Increased timeout
)
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
response_data = response.json()
if 'choices' in response_data and len(response_data['choices']) > 0:
script_text = response_data['choices'][0]['message']['content']
# Basic post-processing to remove potential markdown code blocks
if script_text.startswith("```") and script_text.endswith("```"):
script_text = script_text[script_text.find('\n')+1:script_text.rfind('\n')].strip()
return script_text
else:
print("Unexpected response format:", response_data)
return "[Error] Unexpected API response format."
except requests.exceptions.RequestException as e:
print(f"API Request failed: {str(e)}")
return f"[Error] API request failed: {str(e)}"
except Exception as e:
print(f"An unexpected error occurred during script generation: {e}")
return f"[Error] An unexpected error occurred: {str(e)}"
def parse_script(script_text):
"""
Parse the generated script into a list of segment dictionaries.
Each dictionary includes original prompt, narration text, estimated duration, and placeholder for uploaded media.
Handles potential API errors returned as strings.
"""
if script_text.startswith("[Error]"):
print(f"Skipping parse due to script generation error: {script_text}")
return []
segments = []
current_title = None
current_text = ""
try:
lines = script_text.strip().splitlines()
if not lines:
print("Script text is empty.")
return []
for line in lines:
line = line.strip()
if line.startswith("[") and "]" in line:
bracket_start = line.find("[")
bracket_end = line.find("]", bracket_start)
if bracket_start != -1 and bracket_end != -1:
if current_title is not None and current_text.strip():
# Estimate duration based on word count (adjust factor as needed)
duration = max(2.0, len(current_text.split()) * 0.4) # Minimum 2s, approx 0.4s per word
segments.append({
"original_prompt": current_title.strip(),
"text": current_text.strip(),
"duration": duration,
"uploaded_media": None # Placeholder for user uploaded file path
})
current_title = line[bracket_start+1:bracket_end].strip()
current_text = line[bracket_end+1:].strip()
elif current_title: # Append text if no new title found but currently parsing
current_text += line + " "
elif current_title: # Append text to the current segment
current_text += line + " "
# Add the last segment
if current_title is not None and current_text.strip():
duration = max(2.0, len(current_text.split()) * 0.4)
segments.append({
"original_prompt": current_title.strip(),
"text": current_text.strip(),
"duration": duration,
"uploaded_media": None
})
# Limit segments to MAX_SEGMENTS_FOR_EDITING
if len(segments) > MAX_SEGMENTS_FOR_EDITING:
print(f"Warning: Script generated {len(segments)} segments, limiting to {MAX_SEGMENTS_FOR_EDITING} for editing.")
segments = segments[:MAX_SEGMENTS_FOR_EDITING]
print(f"Parsed {len(segments)} segments.")
return segments
except Exception as e:
print(f"Error parsing script: {e}")
return []
# Pexels and Google Image search and download functions remain unchanged
def search_pexels_videos(query, pexels_api_key):
"""Search for a video on Pexels by query and return a random HD video."""
if not pexels_api_key or pexels_api_key == 'YOUR_PEXELS_API_KEY':
print("Pexels API key not set. Skipping video search.")
return None
headers = {'Authorization': pexels_api_key}
base_url = "https://api.pexels.com/videos/search"
num_pages = 3
videos_per_page = 15
max_retries = 2 # Reduced retries for faster failure
retry_delay = 1
search_query = query
all_videos = []
for page in range(1, num_pages + 1):
for attempt in range(max_retries):
try:
params = {"query": search_query, "per_page": videos_per_page, "page": page}
response = requests.get(base_url, headers=headers, params=params, timeout=10)
if response.status_code == 200:
data = response.json()
videos = data.get("videos", [])
if not videos: break # No videos on this page
for video in videos:
video_files = video.get("video_files", [])
for file in video_files:
# Prioritize HD, fall back to SD if no HD found
if file.get("quality") == "hd":
all_videos.append(file.get("link"))
break # Found HD, move to next video
elif file.get("quality") == "sd": # Add SD as fallback
all_videos.append(file.get("link")) # Don't break, keep looking for HD
# After checking all files for a video, if HD was added, break inner loop
# If not, continue to next attempt if needed, otherwise break attempt loop
if any(link for link in all_videos if 'hd' in link.lower()): # Simple check if HD was added
break # Found some HD videos, move to next page or finish
elif response.status_code == 429:
print(f"Pexels rate limit hit (attempt {attempt+1}/{max_retries}). Retrying in {retry_delay}s...")
time.sleep(retry_delay)
retry_delay *= 2
else:
print(f"Pexels video search error {response.status_code}: {response.text}")
break # Non-recoverable error or too many retries
except requests.exceptions.RequestException as e:
print(f"Pexels video request exception (attempt {attempt+1}/{max_retries}): {e}")
if attempt < max_retries - 1:
time.sleep(retry_delay)
retry_delay *= 2
else:
break # Too many retries
if not videos and page > 1: break # If no videos found on subsequent pages, stop.
if all_videos:
# Try to pick an HD video if available, otherwise pick any
hd_videos = [link for link in all_videos if 'hd' in link.lower()]
if hd_videos:
random_video = random.choice(hd_videos)
print(f"Selected random HD video from {len(hd_videos)} options.")
else:
random_video = random.choice(all_videos)
print(f"Selected random SD video from {len(all_videos)} options (no HD found).")
return random_video
else:
print("No suitable videos found after searching all pages.")
return None
def search_pexels_images(query, pexels_api_key):
"""Search for an image on Pexels by query."""
if not pexels_api_key or pexels_api_key == 'YOUR_PEXELS_API_KEY':
print("Pexels API key not set. Skipping image search.")
return None
headers = {'Authorization': pexels_api_key}
url = "https://api.pexels.com/v1/search"
params = {"query": query, "per_page": 15, "orientation": "landscape"} # Increased per_page
max_retries = 2
retry_delay = 1
for attempt in range(max_retries):
try:
response = requests.get(url, headers=headers, params=params, timeout=10)
if response.status_code == 200:
data = response.json()
photos = data.get("photos", [])
if photos:
# Choose from the top results
photo = random.choice(photos[:min(10, len(photos))])
img_url = photo.get("src", {}).get("original")
print(f"Found {len(photos)} images, selected one.")
return img_url
else:
print(f"No images found for query: {query} on Pexels.")
return None
elif response.status_code == 429:
print(f"Pexels rate limit hit (attempt {attempt+1}/{max_retries}). Retrying in {retry_delay}s...")
time.sleep(retry_delay)
retry_delay *= 2
else:
print(f"Pexels image search error {response.status_code}: {response.text}")
break # Non-recoverable error or too many retries
except requests.exceptions.RequestException as e:
print(f"Pexels image request exception (attempt {attempt+1}/{max_retries}): {e}")
if attempt < max_retries - 1:
time.sleep(retry_delay)
retry_delay *= 2
else:
break # Too many retries
print(f"No Pexels images found for query: {query} after all attempts")
return None
def search_google_images(query):
"""Search for images on Google Images (fallback/news)"""
try:
# Using a simple text search method; dedicated Google Image Search APIs are better but may require setup.
# This is prone to breaking if Google changes its HTML structure.
search_url = f"https://www.google.com/search?q={quote(query)}&tbm=isch"
headers = {"User-Agent": USER_AGENT}
print(f"Searching Google Images for: {query}")
response = requests.get(search_url, headers=headers, timeout=15)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
# Find img tags, look for src attributes
# This is a very fragile parsing method, might need adjustment
img_tags = soup.find_all("img")
image_urls = []
# Look for src attributes that start with http and aren't data URIs or specific gstatic patterns
# This is a heuristic and might grab incorrect URLs
for img in img_tags:
src = img.get("src", "")
if src.startswith("http") and "encrypted" not in src and "base64" not in src: # Basic filtering
image_urls.append(src)
elif img.get("data-src", "").startswith("http"): # Some sites use data-src
image_urls.append(img.get("data-src", ""))
# Filter out potential tiny icons or invalid URLs
valid_image_urls = [url for url in image_urls if url and "gstatic" not in url and url.split('.')[-1].lower() in ['jpg', 'jpeg', 'png', 'gif', 'bmp']]
if valid_image_urls:
print(f"Found {len(valid_image_urls)} potential Google Images, picking one.")
return random.choice(valid_image_urls[:min(10, len(valid_image_urls))])
else:
print(f"No valid Google Images found for query: {query}")
return None
except Exception as e:
print(f"Error in Google Images search: {e}")
return None
def download_image(image_url, filename):
"""Download an image from a URL to a local file with enhanced error handling."""
if not image_url:
print("No image URL provided for download.")
return None
try:
headers = {"User-Agent": USER_AGENT}
print(f"Attempting to download image from: {image_url}")
response = requests.get(image_url, headers=headers, stream=True, timeout=20) # Increased timeout
response.raise_for_status()
# Check content type before saving
content_type = response.headers.get('Content-Type', '')
if not content_type.startswith('image/'):
print(f"URL did not return an image Content-Type ({content_type}). Skipping download.")
return None
# Ensure the directory exists
os.makedirs(os.path.dirname(filename), exist_ok=True)
with open(filename, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"Potential image downloaded to: {filename}")
# Validate and process the image
try:
img = Image.open(filename)
img.verify() # Verify it's an image file
img = Image.open(filename) # Re-open after verify
if img.mode != 'RGB':
img = img.convert('RGB')
img.save(filename)
print(f"Image validated and converted to RGB: {filename}")
return filename
except Exception as e_validate:
print(f"Downloaded file is not a valid image or processing failed: {e_validate}")
if os.path.exists(filename):
os.remove(filename) # Clean up invalid file
return None
except requests.exceptions.RequestException as e_download:
print(f"Image download error for {image_url}: {e_download}")
if os.path.exists(filename):
os.remove(filename) # Clean up partially downloaded file
return None
except Exception as e_general:
print(f"General error during image download/processing: {e_general}")
if os.path.exists(filename):
os.remove(filename) # Clean up if needed
return None
def download_video(video_url, filename):
"""Download a video from a URL to a local file."""
if not video_url:
print("No video URL provided for download.")
return None
try:
headers = {"User-Agent": USER_AGENT} # Some sites block direct downloads
print(f"Attempting to download video from: {video_url}")
response = requests.get(video_url, stream=True, timeout=45) # Increased timeout for videos
response.raise_for_status()
# Check content type
content_type = response.headers.get('Content-Type', '')
if not content_type.startswith('video/'):
print(f"URL did not return a video Content-Type ({content_type}). Skipping download.")
return None
os.makedirs(os.path.dirname(filename), exist_ok=True)
with open(filename, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"Video downloaded successfully to: {filename}")
# Basic check if the file seems valid (not just 0 bytes)
if os.path.exists(filename) and os.path.getsize(filename) > 1024: # Check for > 1KB
return filename
else:
print(f"Downloaded video file {filename} is too small or empty.")
if os.path.exists(filename):
os.remove(filename)
return None
except requests.exceptions.RequestException as e:
print(f"Video download error for {video_url}: {e}")
if os.path.exists(filename):
os.remove(filename)
return None
except Exception as e_general:
print(f"General error during video download: {e_general}")
if os.path.exists(filename):
os.remove(filename)
return None
def generate_media_asset(prompt, uploaded_media_path):
"""
Generate a visual asset (video or image). Prioritizes user upload,
then searches Pexels video, then Pexels image, then Google Image.
Returns a dict: {'path': <file_path>, 'asset_type': 'video' or 'image'}.
"""
safe_prompt = re.sub(r'[^\w\s-]', '', prompt).strip().replace(' ', '_')
os.makedirs(TEMP_FOLDER, exist_ok=True) # Ensure temp folder exists
# 1. Use user uploaded media if provided
if uploaded_media_path and os.path.exists(uploaded_media_path):
print(f"Using user uploaded media: {uploaded_media_path}")
file_ext = os.path.splitext(uploaded_media_path)[1].lower()
asset_type = 'video' if file_ext in ['.mp4', '.mov', '.avi', '.webm'] else 'image'
# Copy the user file to temp folder to manage cleanup
temp_user_path = os.path.join(TEMP_FOLDER, f"user_upload_{os.path.basename(uploaded_media_path)}")
try:
shutil.copy2(uploaded_media_path, temp_user_path)
print(f"Copied user upload to temp: {temp_user_path}")
return {"path": temp_user_path, "asset_type": asset_type}
except Exception as e:
print(f"Error copying user file {uploaded_media_path}: {e}. Falling back to search.")
# 2. Search Pexels Videos (25% chance if no user upload)
# Let's slightly increase video search preference when available
if random.random() < 0.4: # Increase video search chance
video_file = os.path.join(TEMP_FOLDER, f"{safe_prompt}_video.mp4")
print(f"Attempting Pexels video search for: {prompt}")
video_url = search_pexels_videos(prompt, PEXELS_API_KEY)
if video_url:
downloaded_video = download_video(video_url, video_file)
if downloaded_video:
print(f"Pexels video asset saved to {downloaded_video}")
return {"path": downloaded_video, "asset_type": "video"}
else:
print(f"Pexels video search failed or found no video for: {prompt}")
# 3. Search Pexels Images
image_file = os.path.join(TEMP_FOLDER, f"{safe_prompt}.jpg")
print(f"Attempting Pexels image search for: {prompt}")
image_url = search_pexels_images(prompt, PEXELS_API_KEY)
if image_url:
downloaded_image = download_image(image_url, image_file)
if downloaded_image:
print(f"Pexels image asset saved to {downloaded_image}")
return {"path": downloaded_image, "asset_type": "image"}
else:
print(f"Pexels image search failed or found no image for: {prompt}")
# 4. Fallback: Search Google Images (especially useful for news/specific things Pexels might not have)
print(f"Attempting Google Images fallback for: {prompt}")
google_image_file = os.path.join(TEMP_FOLDER, f"{safe_prompt}_google.jpg")
google_image_url = search_google_images(prompt)
if google_image_url:
downloaded_google_image = download_image(google_image_url, google_image_file)
if downloaded_google_image:
print(f"Google Image asset saved to {downloaded_google_image}")
return {"path": downloaded_google_image, "asset_type": "image"}
else:
print(f"Google Images fallback failed for: {prompt}")
# 5. Final Fallback: Generic Images if specific search failed
fallback_terms = ["nature", "city", "abstract", "background"] # More generic fallbacks
for term in fallback_terms:
print(f"Trying generic fallback image search with term: {term}")
fallback_file = os.path.join(TEMP_FOLDER, f"fallback_{term}.jpg")
fallback_url = search_pexels_images(term, PEXELS_API_KEY) # Use Pexels for fallbacks
if fallback_url:
downloaded_fallback = download_image(fallback_url, fallback_file)
if downloaded_fallback:
print(f"Generic fallback image saved to {downloaded_fallback}")
return {"path": downloaded_fallback, "asset_type": "image"}
else:
print(f"Generic fallback image download failed for term: {term}")
else:
print(f"Generic fallback image search failed for term: {term}")
print(f"Failed to generate any visual asset for prompt: {prompt} after all attempts.")
return None
def generate_silent_audio(duration, sample_rate=24000):
"""Generate a silent WAV audio file lasting 'duration' seconds."""
print(f"Generating {duration:.2f}s of silent audio.")
num_samples = int(duration * sample_rate)
silence = np.zeros(num_samples, dtype=np.float32)
# Use unique filename to avoid conflicts
silent_path = os.path.join(TEMP_FOLDER, f"silent_{abs(hash(duration)) % (10**8)}_{int(time.time())}.wav")
try:
sf.write(silent_path, silence, sample_rate)
print(f"Silent audio generated: {silent_path}")
return silent_path
except Exception as e:
print(f"Error generating silent audio: {e}")
return None
def generate_tts(text, voice='en'):
"""
Generate TTS audio using Kokoro, falling back to gTTS or silent audio if needed.
Ensures temp folder exists.
"""
os.makedirs(TEMP_FOLDER, exist_ok=True) # Ensure temp folder exists
safe_text_hash = str(abs(hash(text)) % (10**10)) # Use a hash for potentially long text
file_path = os.path.join(TEMP_FOLDER, f"tts_{safe_text_hash}.wav")
if os.path.exists(file_path):
print(f"Using cached TTS for text hash '{safe_text_hash}'")
return file_path
target_duration = max(2.0, len(text.split()) * 0.4) # Estimate duration if TTS fails
if pipeline:
try:
print(f"Attempting Kokoro TTS for text: '{text[:50]}...'")
kokoro_voice = 'af_heart' if voice == 'en' else voice # Kokoro default American English voice
# Kokoro pipeline might return multiple segments for long text
generator = pipeline(text, voice=kokoro_voice, speed=1.0, split_pattern=r'\n+') # Use speed 1.0
audio_segments = []
total_duration = 0
for i, (gs, ps, audio) in enumerate(generator):
audio_segments.append(audio)
total_duration += len(audio) / 24000.0 # Assuming 24000 Hz sample rate
if audio_segments:
full_audio = np.concatenate(audio_segments) if len(audio_segments) > 1 else audio_segments[0]
sf.write(file_path, full_audio, 24000) # Use 24000Hz standard
print(f"TTS audio saved to {file_path} (Kokoro, {total_duration:.2f}s)")
return file_path
else:
print("Kokoro pipeline returned no audio segments.")
except Exception as e:
print(f"Error with Kokoro TTS: {e}")
# Continue to gTTS fallback
try:
print(f"Falling back to gTTS for text: '{text[:50]}...'")
tts = gTTS(text=text, lang='en', slow=False) # Use standard speed
mp3_path = os.path.join(TEMP_FOLDER, f"tts_{safe_text_hash}.mp3")
tts.save(mp3_path)
audio = AudioSegment.from_mp3(mp3_path)
audio.export(file_path, format="wav")
os.remove(mp3_path)
print(f"Fallback TTS saved to {file_path} (gTTS, {audio.duration_seconds:.2f}s)")
return file_path
except Exception as fallback_error:
print(f"Both TTS methods failed for text: '{text[:50]}...'. Error: {fallback_error}")
# Use the estimated duration for silent audio
return generate_silent_audio(duration=target_duration)
def apply_kenburns_effect(clip, target_resolution, effect_type=None):
"""Apply a smooth Ken Burns effect with a single movement pattern."""
target_w, target_h = target_resolution
clip_aspect = clip.w / clip.h
target_aspect = target_w / target_h
# Resize clip to fill target resolution while maintaining aspect ratio, then scale up
if clip_aspect > target_aspect:
# Wider than target: match height, scale width
clip = clip.resize(height=target_h)
initial_w, initial_h = clip.size
scale_factor = 1.15
new_width = int(initial_w * scale_factor)
new_height = int(initial_h * scale_factor)
clip = clip.resize(newsize=(new_width, new_height))
else:
# Taller than target: match width, scale height
clip = clip.resize(width=target_w)
initial_w, initial_h = clip.size
scale_factor = 1.15
new_width = int(initial_w * scale_factor)
new_height = int(initial_h * scale_factor)
clip = clip.resize(newsize=(new_width, new_height))
max_offset_x = new_width - target_w
max_offset_y = new_height - target_h
available_effects = ["zoom-in", "zoom-out", "pan-left", "pan-right", "pan-up", "pan-down", "up-left", "down-right"]
if effect_type is None or effect_type == "random":
effect_type = random.choice(available_effects)
# Define start and end scale factors and positions relative to the scaled image size
# Position is the top-left corner of the target resolution frame within the scaled image
start_scale = 1.0 / (1.15 * 1.0) # Scale is relative to the final cropped size. Let's use position instead.
end_scale = 1.0 / (1.15 * 1.0)
# Start and end positions of the top-left corner of the target_resolution window
start_x, start_y = 0, 0
end_x, end_y = 0, 0
start_zoom_factor = 1.0
end_zoom_factor = 1.0
if effect_type == "zoom-in":
start_zoom_factor = 1.0
end_zoom_factor = 1.15
# Stay centered
start_x = max_offset_x / 2
start_y = max_offset_y / 2
end_x = max_offset_x / 2
end_y = max_offset_y / 2
elif effect_type == "zoom-out":
start_zoom_factor = 1.15
end_zoom_factor = 1.0
# Stay centered
start_x = max_offset_x / 2
start_y = max_offset_y / 2
end_x = max_offset_x / 2
end_y = max_offset_y / 2
elif effect_type == "pan-left":
start_x = max_offset_x
start_y = max_offset_y / 2
end_x = 0
end_y = max_offset_y / 2
elif effect_type == "pan-right":
start_x = 0
start_y = max_offset_y / 2
end_x = max_offset_x
end_y = max_offset_y / 2
elif effect_type == "pan-up":
start_x = max_offset_x / 2
start_y = max_offset_y
end_x = max_offset_x / 2
end_y = 0
elif effect_type == "pan-down":
start_x = max_offset_x / 2
start_y = 0
end_x = max_offset_x / 2
end_y = max_offset_y
elif effect_type == "up-left":
start_x = max_offset_x
start_y = max_offset_y
end_x = 0
end_y = 0
elif effect_type == "down-right":
start_x = 0
start_y = 0
end_x = max_offset_x
end_y = max_offset_y
else:
# Default to pan-right if type is random but somehow invalid
effect_type = 'pan-right'
start_x = 0
start_y = max_offset_y / 2
end_x = max_offset_x
end_y = max_offset_y / 2
def transform_frame(get_frame, t):
frame = get_frame(t)
# Use a smooth ease-in/ease-out function
progress = t / clip.duration if clip.duration > 0 else 0
eased_progress = 0.5 - 0.5 * math.cos(math.pi * progress) # Cosine easing
# Interpolate position
current_x = start_x + (end_x - start_x) * eased_progress
current_y = start_y + (end_y - start_y) * eased_progress
# Interpolate zoom (relative to the scaled-up size)
current_zoom_factor = start_zoom_factor + (end_zoom_factor - start_zoom_factor) * eased_progress
# Calculate crop size based on current zoom
crop_w = int(target_w / current_zoom_factor)
crop_h = int(target_h / current_zoom_factor)
# Calculate the center point of the crop window
center_x = current_x + crop_w / 2
center_y = current_y + crop_h / 2
# Ensure center stays within the bounds of the scaled image
center_x = max(crop_w / 2, min(center_x, new_width - crop_w / 2))
center_y = max(crop_h / 2, min(center_y, new_height - crop_h / 2))
# Perform the crop using cv2.getRectSubPix (expects floating point center)
# Ensure frame is a numpy array (moviepy returns numpy arrays)
try:
cropped_frame = cv2.getRectSubPix(frame, (crop_w, crop_h), (center_x, center_y))
# Resize the cropped frame back to the target resolution
resized_frame = cv2.resize(cropped_frame, (target_w, target_h), interpolation=cv2.INTER_LANCZOS4)
return resized_frame
except Exception as e:
print(f"Error applying Ken Burns transform at t={t:.2f}s: {e}")
# Return a black frame or placeholder in case of error
return np.zeros((target_h, target_w, 3), dtype=np.uint8)
# Need to return a new clip instance with the effect applied
return clip.fl(transform_frame)
def resize_to_fill(clip, target_resolution):
"""Resize and crop a clip to fill the target resolution while maintaining aspect ratio."""
target_w, target_h = target_resolution
clip_aspect = clip.w / clip.h
target_aspect = target_w / target_h
if clip_aspect > target_aspect: # Clip is wider than target
clip = clip.resize(height=target_h)
# Calculate crop amount to make width match target_w
crop_amount_x = (clip.w - target_w) / 2
clip = clip.crop(x1=crop_amount_x, x2=clip.w - crop_amount_x, y1=0, y2=clip.h)
else: # Clip is taller than target or same aspect
clip = clip.resize(width=target_w)
# Calculate crop amount to make height match target_h
crop_amount_y = (clip.h - target_h) / 2
clip = clip.crop(x1=0, x2=clip.w, y1=crop_amount_y, y2=clip.h - crop_amount_y)
# Ensure dimensions are exactly target_resolution after crop
if clip.size != target_resolution:
print(f"Warning: Clip size {clip.size} after resize_to_fill does not match target {target_resolution}. Resizing again.")
clip = clip.resize(newsize=target_resolution)
return clip
def find_mp3_files():
"""Search for any MP3 files in the current directory and subdirectories."""
mp3_files = []
# Check relative paths first
for root, dirs, files in os.walk('.'):
for file in files:
if file.lower().endswith('.mp3'):
mp3_path = os.path.join(root, file)
mp3_files.append(mp3_path)
print(f"Found MP3 file: {mp3_path}")
if mp3_files:
return mp3_files[0] # Return the first one found
else:
print("No MP3 files found in the current directory or subdirectories.")
return None
def add_background_music(final_video, bg_music_path, bg_music_volume=0.08):
"""Add background music to the final video."""
if not bg_music_path or not os.path.exists(bg_music_path):
print("No valid background music path provided or file not found. Skipping background music.")
return final_video
try:
print(f"Adding background music from: {bg_music_path}")
bg_music = AudioFileClip(bg_music_path)
# Loop background music if shorter than video
if bg_music.duration < final_video.duration:
loops_needed = math.ceil(final_video.duration / bg_music.duration)
bg_segments = [bg_music] * loops_needed
bg_music = concatenate_audioclips(bg_segments)
# Subclip background music to match video duration
bg_music = bg_music.subclip(0, final_video.duration)
# Adjust volume
bg_music = bg_music.volumex(bg_music_volume)
# Composite audio
video_audio = final_video.audio
if video_audio:
mixed_audio = CompositeAudioClip([video_audio, bg_music])
else:
# Handle case where video might not have audio track initially
mixed_audio = bg_music
print("Warning: Video had no audio track, only adding background music.")
final_video = final_video.set_audio(mixed_audio)
print("Background music added successfully.")
return final_video
except Exception as e:
print(f"Error adding background music: {e}")
print("Continuing without background music.")
return final_video
def create_clip(media_asset, tts_path, duration, target_resolution,
caption_enabled, caption_color, caption_size, caption_position,
caption_bg_color, caption_stroke_color, caption_stroke_width,
narration_text, segment_index):
"""Create a video clip with synchronized subtitles and narration."""
try:
print(f"Creating clip #{segment_index} from asset: {media_asset.get('path')}, type: {media_asset.get('asset_type')}")
media_path = media_asset.get('path')
asset_type = media_asset.get('asset_type')
if not media_path or not os.path.exists(media_path):
print(f"Skipping clip {segment_index}: Missing media file {media_path}")
# Create a black clip with silent audio for this segment duration
black_clip = ColorClip(size=target_resolution, color=(0,0,0), duration=duration)
silent_audio_path = generate_silent_audio(duration)
if silent_audio_path and os.path.exists(silent_audio_path):
silent_audio_clip = AudioFileClip(silent_audio_path)
if silent_audio_clip.duration < duration: # Should not happen if silent_audio is correct
silent_audio_clip = silent_audio_clip.loop(duration=duration)
black_clip = black_clip.set_audio(silent_audio_clip.subclip(0, duration))
print(f"Created placeholder black clip for segment {segment_index}")
# Add placeholder text if captions are enabled
if caption_enabled and narration_text and caption_color != "transparent":
txt_clip = TextClip(
"[Missing Media]\n" + narration_text, # Indicate missing media
fontsize=caption_size,
font='Arial-Bold',
color=caption_color,
bg_color=caption_bg_color,
method='caption',
align='center',
stroke_width=caption_stroke_width,
stroke_color=caption_stroke_color,
size=(target_resolution[0] * 0.9, None)
).set_position('center').set_duration(duration) # Duration matches black clip
black_clip = CompositeVideoClip([black_clip, txt_clip])
return black_clip
# Determine actual audio duration
audio_clip = None
audio_duration = duration # Default to estimated duration
if tts_path and os.path.exists(tts_path):
try:
audio_clip = AudioFileClip(tts_path).audio_fadeout(0.2)
audio_duration = audio_clip.duration
# Ensure clip duration is slightly longer than audio for transitions/padding
target_clip_duration = audio_duration + 0.3 # Add a small buffer
print(f"TTS audio duration: {audio_duration:.2f}s. Target clip duration: {target_clip_duration:.2f}s")
except Exception as e:
print(f"Error loading TTS audio clip {tts_path}: {e}. Using estimated duration {duration:.2f}s.")
audio_clip = None # Ensure audio_clip is None if loading fails
target_clip_duration = duration # Fallback to estimated duration
if asset_type == "video":
try:
clip = VideoFileClip(media_path)
print(f"Loaded video clip with duration {clip.duration:.2f}s")
clip = resize_to_fill(clip, target_resolution)
if clip.duration < target_clip_duration:
print("Looping video clip")
clip = clip.loop(duration=target_clip_duration)
else:
clip = clip.subclip(0, target_clip_duration)
clip = clip.fadein(0.2).fadeout(0.2) # Add simple transitions
except Exception as e:
print(f"Error processing video clip {media_path}: {e}")
# Fallback to a black clip if video processing fails
print(f"Creating placeholder black clip instead for segment {segment_index}")
clip = ColorClip(size=target_resolution, color=(0,0,0), duration=target_clip_duration)
elif asset_type == "image":
try:
img = Image.open(media_path)
# Ensure image is in RGB format before passing to ImageClip
if img.mode != 'RGB':
print("Converting image to RGB")
img = img.convert('RGB')
# Save back to a temp file or pass numpy array directly if ImageClip supports it
# ImageClip accepts numpy arrays, let's convert
img_array = np.array(img)
img.close() # Close the PIL image
clip = ImageClip(img_array).set_duration(target_clip_duration)
else:
img.close() # Close the PIL image
clip = ImageClip(media_path).set_duration(target_clip_duration)
print(f"Loaded image clip with duration {clip.duration:.2f}s")
clip = apply_kenburns_effect(clip, target_resolution, effect_type="random") # Random Ken Burns
clip = clip.fadein(0.3).fadeout(0.3) # Add simple transitions
except Exception as e:
print(f"Error processing image clip {media_path}: {e}")
# Fallback to a black clip if image processing fails
print(f"Creating placeholder black clip instead for segment {segment_index}")
clip = ColorClip(size=target_resolution, color=(0,0,0), duration=target_clip_duration)
else:
print(f"Unknown asset type {asset_type} for segment {segment_index}. Skipping.")
return None
# Set the audio for the clip if audio_clip was loaded successfully
if audio_clip:
# Ensure audio clip duration matches video clip duration after processing
if abs(audio_clip.duration - clip.duration) > 0.1: # Allow slight difference
print(f"Adjusting audio duration ({audio_clip.duration:.2f}s) to match video duration ({clip.duration:.2f}s)")
audio_clip = audio_clip.fx(vfx.speedx, factor=audio_clip.duration / clip.duration)
clip = clip.set_audio(audio_clip)
else:
# If TTS failed or audio loading failed, ensure video clip has no audio or silent audio
print(f"No valid audio for clip {segment_index}. Setting silent audio.")
silent_audio_path = generate_silent_audio(clip.duration)
if silent_audio_path and os.path.exists(silent_audio_path):
silent_audio_clip = AudioFileClip(silent_audio_path)
if abs(silent_audio_clip.duration - clip.duration) > 0.1:
silent_audio_clip = silent_audio_clip.fx(vfx.speedx, factor=silent_audio_clip.duration / clip.duration)
clip = clip.set_audio(silent_audio_clip)
else:
clip = clip.set_audio(None) # Set audio to None if silent audio fails
# Add subtitles if enabled
if caption_enabled and narration_text and caption_color != "transparent":
try:
# Simple word-based chunking for subtitles
words = narration_text.split()
# Calculate word timings based on total audio duration and word count
# This is a simple approach; for better sync, use a forced aligner or whisper
words_per_second = len(words) / audio_duration if audio_duration > 0 else len(words)
word_duration = 1.0 / words_per_second if words_per_second > 0 else 0.5 # Default if 0
subtitle_clips = []
current_time = 0
chunk_size = 6 # Words per caption chunk (adjust as needed)
for i in range(0, len(words), chunk_size):
chunk_words = words[i:i+chunk_size]
chunk_text = ' '.join(chunk_words)
# Estimate chunk duration based on word count * average word duration
estimated_chunk_duration = len(chunk_words) * word_duration
start_time = current_time
end_time = min(current_time + estimated_chunk_duration, clip.duration) # Ensure end time doesn't exceed clip duration
if start_time >= end_time: break # Avoid 0 or negative duration clips
# Determine vertical position
if caption_position == "Top":
subtitle_y_position = int(target_resolution[1] * 0.1)
elif caption_position == "Middle":
subtitle_y_position = int(target_resolution[1] * 0.5)
else: # Default to Bottom
subtitle_y_position = int(target_resolution[1] * 0.85) # Closer to bottom
txt_clip = TextClip(
chunk_text,
fontsize=caption_size,
font='Arial-Bold', # Ensure this font is available or use a common system font
color=caption_color,
bg_color=caption_bg_color, # Use background color
method='caption', # Enables text wrapping
align='center',
stroke_width=caption_stroke_width, # Use stroke
stroke_color=caption_stroke_color, # Use stroke color
size=(target_resolution[0] * 0.9, None) # Caption width max 90% of video width
).set_start(start_time).set_end(end_time)
txt_clip = txt_clip.set_position(('center', subtitle_y_position))
subtitle_clips.append(txt_clip)
current_time = end_time # Move to the end of the current chunk
if subtitle_clips:
clip = CompositeVideoClip([clip] + subtitle_clips)
print(f"Added {len(subtitle_clips)} subtitle chunks to clip {segment_index}.")
else:
print(f"No subtitle clips generated for segment {segment_index}.")
except Exception as sub_error:
print(f"Error adding subtitles for segment {segment_index}: {sub_error}")
# Fallback to a single centered text overlay if detailed subtitling fails
try:
txt_clip = TextClip(
narration_text,
fontsize=caption_size,
font='Arial-Bold',
color=caption_color,
bg_color=caption_bg_color,
method='caption',
align='center',
stroke_width=caption_stroke_width,
stroke_color=caption_stroke_color,
size=(target_resolution[0] * 0.8, None)
).set_position(('center', int(target_resolution[1] * 0.75))).set_duration(clip.duration)
clip = CompositeVideoClip([clip, txt_clip])
print(f"Added simple fallback subtitle for segment {segment_index}.")
except Exception as fallback_sub_error:
print(f"Simple fallback subtitle failed for segment {segment_index}: {fallback_sub_error}")
# Ensure final clip duration is set
clip = clip.set_duration(clip.duration) # This might seem redundant but can help fix issues
print(f"Clip {segment_index} created: {clip.duration:.2f}s")
return clip
except Exception as e:
print(f"Critical error in create_clip for segment {segment_index}: {str(e)}")
# Create a black clip with error message if anything goes wrong
error_duration = duration if duration else 3 # Use estimated duration or default
black_clip = ColorClip(size=target_resolution, color=(0,0,0), duration=error_duration)
error_text = f"Error in segment {segment_index}"
if narration_text: error_text += f":\n{narration_text[:50]}..."
error_txt_clip = TextClip(
error_text,
fontsize=30,
color="red",
align='center',
size=(target_resolution[0] * 0.9, None)
).set_position('center').set_duration(error_duration)
clip = CompositeVideoClip([black_clip, error_txt_clip])
silent_audio_path = generate_silent_audio(error_duration)
if silent_audio_path and os.path.exists(silent_audio_path):
clip = clip.set_audio(AudioFileClip(silent_audio_path))
print(f"Created error placeholder clip for segment {segment_index}.")
return clip
def fix_imagemagick_policy():
"""Attempt to fix ImageMagick security policies required by TextClip."""
print("Attempting to fix ImageMagick security policies...")
policy_paths = [
"/etc/ImageMagick-6/policy.xml",
"/etc/ImageMagick-7/policy.xml",
"/etc/ImageMagick/policy.xml", # Common symlink path
"/usr/local/etc/ImageMagick-7/policy.xml", # macports/homebrew path
"/usr/share/ImageMagick/policy.xml", # Another common path
"/usr/share/ImageMagick-6/policy.xml",
"/usr/share/ImageMagick-7/policy.xml",
# Add more paths if needed based on typical installations
]
found_policy = None
for path in policy_paths:
if os.path.exists(path):
found_policy = path
break
if not found_policy:
print("No policy.xml found in common locations. TextClip may fail.")
print("Consider installing ImageMagick and checking its installation path.")
return False
print(f"Attempting to modify policy file at {found_policy}")
try:
# Create a backup
backup_path = f"{found_policy}.bak_aivgen_{int(time.time())}"
shutil.copy2(found_policy, backup_path)
print(f"Created backup at {backup_path}")
# Read the original policy file
with open(found_policy, 'r') as f:
policy_content = f.read()
# Use regex to find and replace the specific policy lines
# Allow read and write rights for PDF, EPS, PS, etc. potentially restricted formats
# Also ensure path policies allow reading/writing files
modified_content = re.sub(
r'<policy domain="coder" rights="none" pattern="(PDF|EPS|PS|XPS)"',
r'<policy domain="coder" rights="read|write" pattern="\1"', # Changed rights to read|write
policy_content
)
modified_content = re.sub(
r'<policy domain="path" pattern="@\*"[^>]*>',
r'<policy domain="path" pattern="@*" rights="read|write"/>', # Ensure path rights are read|write
modified_content
)
# More general rights none replacement, be careful with this one
modified_content = re.sub(
r'<policy domain="[^"]+" rights="none"[^>]*>',
lambda m: m.group(0).replace('rights="none"', 'rights="read|write"'),
modified_content
)
# Write the modified content back
# Use sudo if running as a non-root user in a typical Linux install
try:
with open(found_policy, 'w') as f:
f.write(modified_content)
print("ImageMagick policies updated successfully (direct write).")
return True
except IOError as e:
print(f"Direct write failed: {e}. Attempting with sudo...")
# Fallback to using os.system with sudo if direct write fails
# This requires the user to be able to run sudo commands without a password prompt for the script's execution
temp_policy_file = os.path.join(TEMP_FOLDER, "temp_policy.xml")
with open(temp_policy_file, 'w') as f:
f.write(modified_content)
cmd = f"sudo cp {temp_policy_file} {found_policy}"
print(f"Executing: {cmd}")
result = os.system(cmd) # Returns 0 on success
if result == 0:
print("ImageMagick policies updated successfully using sudo.")
return True
else:
print(f"Failed to update ImageMagick policies using sudo. Result code: {result}.")
print("Please manually edit your policy.xml to grant read/write rights for coder and path domains.")
print("Example: Change <policy domain='coder' rights='none' pattern='PDF'> to <policy domain='coder' rights='read|write' pattern='PDF'>")
return False
finally:
if os.path.exists(temp_policy_file):
os.remove(temp_policy_file)
except Exception as e:
print(f"Error during ImageMagick policy modification: {e}")
print("Manual intervention may be required.")
return False
# ---------------- Gradio Interface Functions ---------------- #
def generate_script_and_show_editor(user_input, resolution_choice,
caption_enabled_choice, caption_color,
caption_size, caption_position, caption_bg_color,
caption_stroke_color, caption_stroke_width):
"""
Generates the script, parses it, stores segments in state,
and prepares the UI updates to show the editing interface.
"""
global TEMP_FOLDER
# Clean up previous run's temp folder if it exists
if TEMP_FOLDER and os.path.exists(TEMP_FOLDER):
print(f"Cleaning up previous temp folder: {TEMP_FOLDER}")
try:
shutil.rmtree(TEMP_FOLDER)
except Exception as e:
print(f"Error cleaning up temp folder {TEMP_FOLDER}: {e}")
# Create a new unique temporary folder for this run
TEMP_FOLDER = tempfile.mkdtemp()
print(f"Created new temp folder: {TEMP_FOLDER}")
# Store global style choices in state or use them directly (let's store in state)
# Gradio State can hold a single object. Let's use a dict.
run_config = {
"resolution": (1920, 1080) if resolution_choice == "Full" else (1080, 1920),
"caption_enabled": caption_enabled_choice == "Yes",
"caption_color": caption_color,
"caption_size": caption_size,
"caption_position": caption_position,
"caption_bg_color": caption_bg_color,
"caption_stroke_color": caption_stroke_color,
"caption_stroke_width": caption_stroke_width,
"temp_folder": TEMP_FOLDER # Store temp folder path
}
yield run_config, gr.update(value="Generating script...", visible=True), gr.update(visible=False) # Update status
script_text = generate_script(user_input, OPENROUTER_API_KEY, OPENROUTER_MODEL)
if not script_text or script_text.startswith("[Error]"):
yield run_config, gr.update(value=f"Script generation failed: {script_text}", visible=True), gr.update(visible=False)
return run_config, gr.update(visible=True), gr.update(visible=False), [], [], [], [] # Clear segment components
yield run_config, gr.update(value="Parsing script...", visible=True), gr.update(visible=False)
segments = parse_script(script_text)
if not segments:
yield run_config, gr.update(value="Failed to parse script or script is empty.", visible=True), gr.update(visible=False)
return run_config, gr.update(visible=True), gr.update(visible=False), [], [], [], [] # Clear segment components
# Prepare updates for dynamic editing components
# We need to return lists of gr.update() calls for the visibility and content
# of each textbox and file component in the editing groups.
textbox_updates = []
file_updates = []
group_visibility_updates = []
for i in range(MAX_SEGMENTS_FOR_EDITING):
if i < len(segments):
# Show group, populate text, clear file upload
textbox_updates.append(gr.update(value=segments[i]['text'], visible=True))
file_updates.append(gr.update(value=None, visible=True)) # Clear previous uploads
group_visibility_updates.append(gr.update(visible=True))
else:
# Hide unused groups
textbox_updates.append(gr.update(value="", visible=False))
file_updates.append(gr.update(value=None, visible=False))
group_visibility_updates.append(gr.update(visible=False))
yield (run_config,
gr.update(value="Script generated. Edit segments below.", visible=True),
gr.update(visible=True), # Show Generate Video button
group_visibility_updates, # Update visibility of groups
textbox_updates, # Update textboxes
file_updates, # Update file uploads
segments) # Update the state with parsed segments
def generate_video_from_edited(run_config, segments_data, segment_texts, segment_uploads):
"""
Takes the edited segment data (text, uploaded files) and configuration,
and generates the final video.
"""
if not segments_data:
yield "No segments to process. Generate script first.", None
return
global TEMP_FOLDER
# Ensure TEMP_FOLDER is correctly set from run_config
TEMP_FOLDER = run_config.get("temp_folder")
if not TEMP_FOLDER or not os.path.exists(TEMP_FOLDER):
yield "Error: Temporary folder not found. Please regenerate script.", None
return
# Extract config from run_config
TARGET_RESOLUTION = run_config["resolution"]
CAPTION_ENABLED = run_config["caption_enabled"]
CAPTION_COLOR = run_config["caption_color"]
CAPTION_SIZE = run_config["caption_size"]
CAPTION_POSITION = run_config["caption_position"]
CAPTION_BG_COLOR = run_config["caption_bg_color"]
CAPTION_STROKE_COLOR = run_config["caption_stroke_color"]
CAPTION_STROKE_WIDTH = run_config["caption_stroke_width"]
# Update segments_data with potentially edited text and uploaded file paths
# segment_texts and segment_uploads are lists of values from the Gradio components
processed_segments = []
for i, segment in enumerate(segments_data):
if i < len(segment_texts): # Ensure we have corresponding input values
processed_segment = segment.copy() # Make a copy
processed_segment['text'] = segment_texts[i] # Use the edited text
processed_segment['uploaded_media'] = segment_uploads[i] # Use the uploaded file path (None if not uploaded)
processed_segments.append(processed_segment)
else:
# This shouldn't happen if state and UI updates are in sync, but as a safeguard
print(f"Warning: Missing input value for segment index {i}. Skipping segment.")
# Or perhaps use the original segment data if no edited input? Let's skip for safety.
# processed_segments.append(segment) # Append original if no input? Depends on desired behavior.
if not processed_segments:
yield "No valid segments to process after editing.", None
return
yield "Fixing ImageMagick policy...", None
fix_imagemagick_policy() # Attempt policy fix before creating clips
clips = []
yield "Generating media and audio for clips...", None
total_segments = len(processed_segments)
for idx, segment in enumerate(processed_segments):
yield f"Processing segment {idx+1}/{total_segments}...", None
print(f"\nProcessing segment {idx+1}/{total_segments}...")
# Determine media source: uploaded or generated
media_asset = None
if segment.get('uploaded_media') and os.path.exists(segment['uploaded_media']):
print(f"Using uploaded media for segment {idx+1}: {segment['uploaded_media']}")
file_ext = os.path.splitext(segment['uploaded_media'])[1].lower()
asset_type = 'video' if file_ext in ['.mp4', '.mov', '.avi', '.webm'] else 'image'
# Need to copy the uploaded file to the temp folder if it's not already there
try:
temp_upload_path = os.path.join(TEMP_FOLDER, f"user_upload_{idx}{file_ext}")
shutil.copy2(segment['uploaded_media'], temp_upload_path)
media_asset = {"path": temp_upload_path, "asset_type": asset_type}
except Exception as e:
print(f"Error copying user upload {segment['uploaded_media']}: {e}. Attempting to generate media instead.")
media_asset = generate_media_asset(segment['original_prompt'], None) # Pass None for uploaded_media
else:
print(f"No user upload for segment {idx+1}. Generating media from prompt: '{segment['original_prompt']}'")
media_asset = generate_media_asset(segment['original_prompt'], None) # Pass None for uploaded_media
if not media_asset:
print(f"Failed to generate or use media asset for segment {idx+1}. Creating placeholder.")
# Create a dummy asset dict pointing to a non-existent path so create_clip makes a black clip
media_asset = {"path": os.path.join(TEMP_FOLDER, f"dummy_missing_media_{idx}.txt"), "asset_type": "image"} # Use image as dummy type
# Generate TTS audio
tts_path = generate_tts(segment['text'], voice='en') # Using 'en' voice
# Create the video clip for this segment
clip = create_clip(
media_asset=media_asset,
tts_path=tts_path,
duration=segment['duration'], # Use estimated duration as a fallback reference
target_resolution=TARGET_RESOLUTION,
caption_enabled=CAPTION_ENABLED,
caption_color=CAPTION_COLOR,
caption_size=CAPTION_SIZE,
caption_position=CAPTION_POSITION,
caption_bg_color=CAPTION_BG_COLOR,
caption_stroke_color=CAPTION_STROKE_COLOR,
caption_stroke_width=CAPTION_STROKE_WIDTH,
narration_text=segment['text'],
segment_index=idx+1
)
if clip:
clips.append(clip)
else:
print(f"Skipping segment {idx+1} due to clip creation failure.")
# Create a placeholder black clip if create_clip returned None
placeholder_duration = segment.get('duration', 3.0) # Use estimated duration or default
placeholder_clip = ColorClip(size=TARGET_RESOLUTION, color=(0,0,0), duration=placeholder_duration)
silent_audio_path = generate_silent_audio(placeholder_duration)
if silent_audio_path and os.path.exists(silent_audio_path):
placeholder_clip = placeholder_clip.set_audio(AudioFileClip(silent_audio_path))
error_text = f"Segment {idx+1} Failed"
if segment.get('text'): error_text += f":\n{segment['text'][:50]}..."
error_txt_clip = TextClip(error_text, fontsize=30, color="red", align='center', size=(TARGET_RESOLUTION[0] * 0.9, None)).set_position('center').set_duration(placeholder_duration)
placeholder_clip = CompositeVideoClip([placeholder_clip, error_txt_clip])
clips.append(placeholder_clip)
if not clips:
yield "No clips were successfully created. Video generation failed.", None
# Clean up
if TEMP_FOLDER and os.path.exists(TEMP_FOLDER):
try:
shutil.rmtree(TEMP_FOLDER)
print(f"Cleaned up temp folder: {TEMP_FOLDER}")
except Exception as e:
print(f"Error cleaning up temp folder {TEMP_FOLDER}: {e}")
TEMP_FOLDER = None # Reset global
return
yield "Concatenating clips...", None
print("\nConcatenating clips...")
final_video = concatenate_videoclips(clips, method="compose")
yield "Adding background music...", None
bg_music_path = find_mp3_files() # Find background music
final_video = add_background_music(final_video, bg_music_path, bg_music_volume=0.08) # Use default volume
yield f"Exporting final video to {OUTPUT_VIDEO_FILENAME}...", None
print(f"Exporting final video to {OUTPUT_VIDEO_FILENAME}...")
try:
# Use a temporary output file first for safety
temp_output_filename = os.path.join(TEMP_FOLDER, f"temp_{OUTPUT_VIDEO_FILENAME}")
final_video.write_videofile(temp_output_filename, codec='libx264', fps=24, preset='veryfast')
# Move the final file to the intended location after successful export
shutil.move(temp_output_filename, OUTPUT_VIDEO_FILENAME)
print(f"Final video saved as {OUTPUT_VIDEO_FILENAME}")
output_path = OUTPUT_VIDEO_FILENAME
except Exception as e:
print(f"Error exporting video: {e}")
output_path = None
yield f"Video export failed: {e}", None # Provide error message in status
# Clean up temporary folder
yield "Cleaning up temporary files...", output_path # Update status before cleanup
if TEMP_FOLDER and os.path.exists(TEMP_FOLDER):
try:
shutil.rmtree(TEMP_FOLDER)
print(f"Cleaned up temp folder: {TEMP_FOLDER}")
except Exception as e:
print(f"Error cleaning up temp folder {TEMP_FOLDER}: {e}")
TEMP_FOLDER = None # Reset global
yield "Done!", output_path # Final status update
# ---------------- Gradio Interface Definition (Blocks) ---------------- #
# Need lists to hold the dynamic UI components for segments
segment_editing_groups = []
segment_text_inputs = []
segment_file_inputs = []
with gr.Blocks() as demo:
gr.Markdown("# 🤖 AI Documentary Video Generator 🎬")
gr.Markdown("Enter a concept to generate a funny documentary script. You can then edit the script text and replace the suggested media for each segment before generating the final video.")
# --- Global Settings ---
with gr.Accordion("Global Settings", open=True):
user_concept_input = gr.Textbox(label="Video Concept", placeholder="e.g., The secret life of pigeons, Why socks disappear in the laundry, The futility of alarm clocks...")
with gr.Row():
resolution_radio = gr.Radio(["Full (1920x1080)", "Short (1080x1920)"], label="Video Resolution", value="Full (1920x1080)")
bg_music_volume_slider = gr.Slider(minimum=0, maximum=1.0, value=0.08, step=0.01, label="Background Music Volume")
# --- Caption Settings ---
with gr.Accordion("Caption Settings", open=False):
caption_enabled_radio = gr.Radio(["Yes", "No"], label="Show Captions?", value="Yes")
caption_color_picker = gr.ColorPicker(label="Caption Text Color", value="#FFFFFF") # Default white
caption_bg_color_picker = gr.ColorPicker(label="Caption Background Color (with transparency)", value="rgba(0, 0, 0, 0.25)") # Default semi-transparent black
caption_size_slider = gr.Slider(minimum=20, maximum=100, value=45, step=1, label="Caption Font Size")
caption_position_radio = gr.Radio(["Top", "Middle", "Bottom"], label="Caption Position", value="Bottom")
caption_stroke_color_picker = gr.ColorPicker(label="Caption Stroke Color", value="#000000") # Default black stroke
caption_stroke_width_slider = gr.Slider(minimum=0, maximum=5, value=2, step=0.5, label="Caption Stroke Width")
generate_script_btn = gr.Button("Generate Script", variant="primary")
# --- Status and Script Output ---
status_output = gr.Label(label="Status", value="")
script_preview_markdown = gr.Markdown("### Generated Script Preview\n\nScript will appear here...") # Optional raw script preview
# --- State to hold parsed segments data and run config ---
segments_state = gr.State([]) # List of segment dictionaries
run_config_state = gr.State({}) # Dictionary for run configuration
# --- Dynamic Editing Area (Initially hidden) ---
# We create MAX_SEGMENTS_FOR_EDITING groups, and show/hide them dynamically
with gr.Column(visible=False) as editing_area:
gr.Markdown("### Edit Script Segments")
gr.Markdown("Review the AI-generated text and media suggestions below. Edit the text and/or upload your own image/video for any segment. If no file is uploaded, AI will fetch media based on the original prompt.")
for i in range(MAX_SEGMENTS_FOR_EDITING):
with gr.Group(visible=False) as segment_group: # Each group represents one segment
segment_editing_groups.append(segment_group)
gr.Markdown(f"**Segment {i+1}** (Prompt: <span id='segment-prompt-{i}'></span>)") # Placeholder for prompt text
# Using JS to update prompt text because Textbox is used for narration
# Alternatively, could use a non-editable gr.Label or gr.Textbox for prompt
segment_text = gr.Textbox(label="Narration Text", lines=2, interactive=True)
segment_text_inputs.append(segment_text)
segment_file = gr.File(label="Upload Custom Media (Image or Video)", type="filepath", interactive=True)
segment_file_inputs.append(segment_file)
generate_video_btn = gr.Button("Generate Video", variant="primary")
# --- Final Video Output ---
final_video_output = gr.Video(label="Generated Video")
# --- Event Handlers ---
# Generate Script Button Click
generate_script_btn.click(
fn=generate_script_and_show_editor,
inputs=[
user_concept_input,
resolution_radio,
caption_enabled_radio,
caption_color_picker,
caption_size_slider,
caption_position_radio,
caption_bg_color_picker,
caption_stroke_color_picker,
caption_stroke_width_slider
],
outputs=[
run_config_state,
status_output,
editing_area, # Show the editing area
# Outputs to update visibility of segment groups
*segment_editing_groups,
# Outputs to update values of segment textboxes
*segment_text_inputs,
# Outputs to update values (clear) of segment file uploads
*segment_file_inputs,
# Output to update the segments_state
segments_state
]
)
# Generate Video Button Click
generate_video_btn.click(
fn=generate_video_from_edited,
inputs=[
run_config_state, # Pass run config
segments_state, # Pass the original parsed segments data
*segment_text_inputs, # Pass list of edited text values
*segment_file_inputs # Pass list of uploaded file paths
],
outputs=[status_output, final_video_output] # Yield status updates and final video
)
# Add JS to update segment prompt labels after script generation
# This requires defining IDs in the Markdown previously
demo.load(
None,
None,
None,
_js=f"""
function updateSegmentPrompts(segments_data) {{
if (!segments_data) return;
for (let i = 0; i < segments_data.length; i++) {{
const promptSpan = document.getElementById('segment-prompt-' + i);
if (promptSpan) {{
promptSpan.textContent = segments_data[i].original_prompt;
}}
}}
// Hide unused prompt spans
for (let i = segments_data.length; i < {MAX_SEGMENTS_FOR_EDITING}; i++) {{
const promptSpan = document.getElementById('segment-prompt-' + i);
if (promptSpan) {{
promptSpan.textContent = ''; // Clear text
}}
}}
}}
"""
)
# Trigger the JS function whenever segments_state changes
segments_state.change(
None,
segments_state,
None,
_js="""
(segments_data) => {
updateSegmentPrompts(segments_data);
}
"""
)
# Launch the interface
if __name__ == "__main__":
# Attempt ImageMagick policy fix on script startup
# This helps but might still require manual sudo depending on system config
fix_imagemagick_policy()
print("Launching Gradio interface...")
# Make sure to set PEXELS_API_KEY and OPENROUTER_API_KEY environment variables
# or replace 'YOUR_PEXELS_API_KEY' and 'YOUR_OPENROUTER_API_KEY' above.
if PEXELS_API_KEY == 'YOUR_PEXELS_API_KEY':
print("Warning: PEXELS_API_KEY is not configured. Media search may fail.")
if OPENROUTER_API_KEY == 'YOUR_OPENROUTER_API_KEY':
print("Warning: OPENROUTER_API_KEY is not configured. Script generation will fail.")
demo.launch(share=True) # Set share=True to get a public link