Spaces:
Build error
Build error
import gradio as gr | |
import os | |
import shutil | |
import requests | |
import io | |
import time | |
import re | |
import random | |
import math | |
import tempfile | |
import traceback | |
import numpy as np | |
import soundfile as sf | |
# import pysrt # Not strictly needed if embedding captions directly | |
import cv2 # OpenCV for potential image processing, though PIL is often enough | |
from moviepy.editor import ( | |
VideoFileClip, AudioFileClip, ImageClip, concatenate_videoclips, | |
CompositeVideoClip, TextClip, CompositeAudioClip, ColorClip | |
) | |
import moviepy.video.fx.all as vfx | |
from pydub import AudioSegment | |
from PIL import Image, ImageDraw, ImageFont | |
from bs4 import BeautifulSoup | |
from urllib.parse import quote | |
from gtts import gTTS | |
import logging | |
# --- Configuration --- | |
# IMPORTANT: Use Hugging Face Secrets for API keys in a real Space | |
PEXELS_API_KEY = 'BhJqbcdm9Vi90KqzXKAhnEHGsuFNv4irXuOjWtT761U49lRzo03qBGna' # Replace with your Pexels API Key | |
OPENROUTER_API_KEY = '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' # Replace with your OpenRouter API Key | |
OPENROUTER_MODEL = "mistralai/mistral-7b-instruct:free" # Using a known free model | |
# OPENROUTER_MODEL = "mistralai/mistral-small-latest" # Or a small paid one if needed | |
TEMP_FOLDER_BASE = "/tmp/ai_doc_generator" | |
OUTPUT_VIDEO_FILENAME = "final_documentary.mp4" | |
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" | |
DEFAULT_FONT = "DejaVuSans.ttf" # A common font available in many Linux distros, adjust if needed | |
BGM_FILE = "background_music.mp3" # Optional: Place a royalty-free mp3 here | |
BGM_VOLUME = 0.1 # Background music volume multiplier (0.0 to 1.0) | |
# --- Logging Setup --- | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
# --- Kokoro TTS Initialization (Optional) --- | |
KOKORO_ENABLED = False | |
pipeline = None | |
# try: | |
# from kokoro import KPipeline | |
# # Check for GPU availability if desired, default to CPU | |
# device = 'cuda' if torch.cuda.is_available() else 'cpu' | |
# pipeline = KPipeline(lang_code='a', device=device) # 'a' for multilingual? Check Kokoro docs | |
# KOKORO_ENABLED = True | |
# logging.info("β Kokoro TTS Initialized.") | |
# except ImportError: | |
# logging.warning("Kokoro library not found. Falling back to gTTS.") | |
# pipeline = None | |
# except Exception as e: | |
# logging.warning(f"β οΈ Error initializing Kokoro TTS: {e}. Using gTTS fallback.") | |
# pipeline = None | |
# --- Helper Functions --- | |
def generate_script(topic, api_key, model): | |
"""Generates a documentary script using OpenRouter API.""" | |
logging.info(f"Generating script for topic: {topic}") | |
prompt = f"""Create a short documentary script about '{topic}'. | |
The script should be structured as a sequence of scenes and narrations. | |
Each scene description should be enclosed in [SCENE: description] tags. The description should be concise and suggest visuals (e.g., 'drone shot of mountains', 'close up of a historical artifact', 'archival footage of protests'). | |
Each narration segment should follow its corresponding scene and be enclosed in [NARRATION: text] tags. The narration should be engaging and informative, broken into short sentences suitable for ~5-10 second clips. | |
Keep the total number of scenes between 5 and 8. | |
Example: | |
[SCENE: Time-lapse of a bustling city street at night] | |
[NARRATION: Cities are centers of human activity, constantly evolving.] | |
[SCENE: Close up on intricate gears of an old clock] | |
[NARRATION: But how do we measure the relentless march of time?] | |
Generate the script now: | |
""" | |
headers = { | |
"Authorization": f"Bearer {api_key}", | |
"Content-Type": "application/json" | |
} | |
data = { | |
"model": model, | |
"messages": [{"role": "user", "content": prompt}], | |
"max_tokens": 1000, # Adjust as needed | |
} | |
try: | |
response = requests.post("https://openrouter.ai/api/v1/chat/completions", headers=headers, json=data, timeout=60) | |
response.raise_for_status() # Raise an exception for bad status codes (4xx or 5xx) | |
result = response.json() | |
script_content = result['choices'][0]['message']['content'] | |
logging.info("β Script generated successfully.") | |
# Basic validation | |
if "[SCENE:" not in script_content or "[NARRATION:" not in script_content: | |
logging.error("β Script generation failed: Output format incorrect.") | |
logging.debug(f"Raw script output: {script_content}") | |
return None | |
return script_content | |
except requests.exceptions.RequestException as e: | |
logging.error(f"β Script generation failed: API request error: {e}") | |
return None | |
except (KeyError, IndexError) as e: | |
logging.error(f"β Script generation failed: Unexpected API response format: {e}") | |
logging.debug(f"Raw API response: {response.text}") | |
return None | |
except Exception as e: | |
logging.error(f"β Script generation failed: An unexpected error occurred: {e}") | |
traceback.print_exc() | |
return None | |
def parse_script(script_text): | |
"""Parses the generated script into scene prompts and narration text.""" | |
logging.info("Parsing script...") | |
if not script_text: | |
return None | |
# Regex to find scene and narration blocks | |
pattern = re.compile(r"\[SCENE:\s*(.*?)\s*\]\s*\[NARRATION:\s*(.*?)\s*\]", re.DOTALL | re.IGNORECASE) | |
matches = pattern.findall(script_text) | |
if not matches: | |
logging.error("β Script parsing failed: No valid [SCENE]/[NARRATION] pairs found.") | |
logging.debug(f"Script content for parsing: {script_text}") | |
return None | |
elements = [] | |
for scene_desc, narration_text in matches: | |
scene_desc = scene_desc.strip() | |
narration_text = narration_text.strip().replace('\n', ' ') # Clean up narration | |
if scene_desc and narration_text: | |
elements.append({"type": "scene", "prompt": scene_desc}) | |
elements.append({"type": "narration", "text": narration_text}) | |
else: | |
logging.warning(f"β οΈ Skipping invalid pair: Scene='{scene_desc}', Narration='{narration_text}'") | |
logging.info(f"β Script parsed into {len(elements)//2} scene/narration pairs.") | |
return elements | |
def search_pexels(query, api_key, media_type="videos", per_page=5): | |
"""Searches Pexels API for videos or photos.""" | |
if not api_key or api_key == "YOUR_PEXELS_API_KEY_HERE": | |
logging.warning("β οΈ Pexels API key not configured. Skipping search.") | |
return [] | |
logging.info(f"Searching Pexels {media_type} for: {query}") | |
base_url = f"https://api.pexels.com/{media_type}/search" | |
headers = {"Authorization": api_key} | |
params = {"query": query, "per_page": per_page, "orientation": "landscape"} # Default landscape | |
try: | |
response = requests.get(base_url, headers=headers, params=params, timeout=20) | |
response.raise_for_status() | |
data = response.json() | |
results = [] | |
media_key = 'videos' if media_type == 'videos' else 'photos' | |
link_key = 'video_files' if media_type == 'videos' else 'src' | |
for item in data.get(media_key, []): | |
if media_type == 'videos': | |
# Find HD or highest quality video link | |
video_links = sorted(item.get(link_key, []), key=lambda x: x.get('width', 0), reverse=True) | |
if video_links: | |
# Prefer HD (1920x1080) or similar if available | |
hd_link = next((link['link'] for link in video_links if link.get('quality') == 'hd' and link.get('width') == 1920), None) | |
if hd_link: | |
results.append({'url': hd_link, 'type': 'video'}) | |
elif video_links[0].get('link'): # Fallback to highest available | |
results.append({'url': video_links[0]['link'], 'type': 'video'}) | |
else: # photos | |
# Get large or original image link | |
img_links = item.get(link_key, {}) | |
if img_links.get('large2x'): | |
results.append({'url': img_links['large2x'], 'type': 'image'}) | |
elif img_links.get('large'): | |
results.append({'url': img_links['large'], 'type': 'image'}) | |
elif img_links.get('original'): | |
results.append({'url': img_links['original'], 'type': 'image'}) | |
logging.info(f"β Found {len(results)} Pexels {media_type} results.") | |
return results | |
except requests.exceptions.RequestException as e: | |
logging.error(f"β Pexels API request error: {e}") | |
return [] | |
except Exception as e: | |
logging.error(f"β Error processing Pexels response: {e}") | |
traceback.print_exc() | |
return [] | |
def download_media(url, save_dir): | |
"""Downloads media (video or image) from a URL.""" | |
logging.info(f"Downloading media from: {url}") | |
try: | |
response = requests.get(url, stream=True, timeout=60, headers={'User-Agent': USER_AGENT}) | |
response.raise_for_status() | |
# Try to get filename from URL or Content-Disposition | |
filename = url.split('/')[-1].split('?')[0] | |
if not filename or '.' not in filename: # Basic check for extension | |
# Look for content-disposition header | |
cd = response.headers.get('content-disposition') | |
if cd: | |
fname = re.findall('filename="?(.+)"?', cd) | |
if fname: | |
filename = fname[0] | |
# If still no good filename, generate one based on type | |
if not filename or '.' not in filename: | |
content_type = response.headers.get('content-type', '').lower() | |
ext = '.jpg' # default | |
if 'video' in content_type: | |
ext = '.mp4' | |
elif 'jpeg' in content_type or 'jpg' in content_type: | |
ext = '.jpg' | |
elif 'png' in content_type: | |
ext = '.png' | |
filename = f"media_{int(time.time())}{ext}" | |
save_path = os.path.join(save_dir, filename) | |
with open(save_path, 'wb') as f: | |
for chunk in response.iter_content(chunk_size=8192): | |
f.write(chunk) | |
logging.info(f"β Media downloaded successfully to: {save_path}") | |
return save_path | |
except requests.exceptions.RequestException as e: | |
logging.error(f"β Media download failed: Request error: {e}") | |
return None | |
except Exception as e: | |
logging.error(f"β Media download failed: An unexpected error occurred: {e}") | |
traceback.print_exc() | |
return None | |
def generate_tts(text, lang, save_dir, segment_index): | |
"""Generates TTS audio using Kokoro (if enabled) or gTTS.""" | |
filename = f"narration_{segment_index}.mp3" | |
filepath = os.path.join(save_dir, filename) | |
logging.info(f"Generating TTS for segment {segment_index}: '{text[:50]}...'") | |
audio_duration = 0 | |
success = False | |
# Try Kokoro first if enabled and initialized | |
# if KOKORO_ENABLED and pipeline: | |
# try: | |
# logging.info("Attempting TTS generation with Kokoro...") | |
# # Assuming Kokoro outputs a numpy array and sample rate | |
# wav, sr = pipeline.tts(text=text) | |
# sf.write(filepath, wav, sr) | |
# audio_duration = len(wav) / sr | |
# logging.info(f"β Kokoro TTS generated successfully ({audio_duration:.2f}s).") | |
# success = True | |
# except Exception as e: | |
# logging.warning(f"β οΈ Kokoro TTS failed: {e}. Falling back to gTTS.") | |
# Fallback to gTTS | |
if not success: | |
try: | |
logging.info("Attempting TTS generation with gTTS...") | |
tts = gTTS(text=text, lang=lang) | |
tts.save(filepath) | |
# Get duration using soundfile | |
try: | |
audio_info = sf.info(filepath) | |
audio_duration = audio_info.duration | |
except Exception as e_dur: | |
logging.warning(f"β οΈ Could not get duration using soundfile ({e_dur}), trying pydub...") | |
try: | |
audio_seg = AudioSegment.from_mp3(filepath) | |
audio_duration = len(audio_seg) / 1000.0 | |
except Exception as e_dur_pd: | |
logging.error(f"β Failed to get duration with pydub as well ({e_dur_pd}). Setting duration to estimated.") | |
# Estimate duration based on words (very rough) | |
words_per_minute = 150 | |
num_words = len(text.split()) | |
audio_duration = (num_words / words_per_minute) * 60 | |
if audio_duration < 2: audio_duration = 2 # Minimum duration | |
logging.info(f"β gTTS generated successfully ({audio_duration:.2f}s).") | |
success = True | |
except Exception as e: | |
logging.error(f"β gTTS failed: {e}") | |
traceback.print_exc() | |
success = False | |
return filepath if success else None, audio_duration if success else 0 | |
def resize_media_to_fill(clip, target_size): | |
"""Resizes a MoviePy clip (video or image) to fill the target size, cropping if necessary.""" | |
# target_size = (width, height) | |
target_w, target_h = target_size | |
target_aspect = target_w / target_h | |
clip_w, clip_h = clip.size | |
clip_aspect = clip_w / clip_h | |
if abs(clip_aspect - target_aspect) < 0.01: # Aspect ratios are close enough | |
return clip.resize(width=target_w) # Or height=target_h | |
if clip_aspect > target_aspect: | |
# Clip is wider than target, resize to target height and crop width | |
resized_clip = clip.resize(height=target_h) | |
crop_width = resized_clip.w | |
crop_x_center = crop_width / 2 | |
crop_x1 = int(crop_x_center - target_w / 2) | |
crop_x2 = int(crop_x_center + target_w / 2) | |
# Ensure crop coordinates are within bounds | |
crop_x1 = max(0, crop_x1) | |
crop_x2 = min(resized_clip.w, crop_x2) | |
# Adjust if calculated width is slightly off due to rounding | |
if crop_x2 - crop_x1 != target_w: | |
crop_x2 = crop_x1 + target_w # Prioritize target width | |
return resized_clip.fx(vfx.crop, x1=crop_x1, y1=0, x2=crop_x2, y2=target_h) | |
else: | |
# Clip is taller than target, resize to target width and crop height | |
resized_clip = clip.resize(width=target_w) | |
crop_height = resized_clip.h | |
crop_y_center = crop_height / 2 | |
crop_y1 = int(crop_y_center - target_h / 2) | |
crop_y2 = int(crop_y_center + target_h / 2) | |
# Ensure crop coordinates are within bounds | |
crop_y1 = max(0, crop_y1) | |
crop_y2 = min(resized_clip.h, crop_y2) | |
# Adjust if calculated height is slightly off | |
if crop_y2 - crop_y1 != target_h: | |
crop_y2 = crop_y1 + target_h | |
return resized_clip.fx(vfx.crop, x1=0, y1=crop_y1, x2=target_w, y2=crop_y2) | |
def apply_ken_burns(image_clip, duration, target_size, zoom_factor=1.1): | |
"""Applies a subtle zoom-out Ken Burns effect to an ImageClip.""" | |
# Ensure the input clip already matches the target size | |
if image_clip.size != target_size: | |
logging.warning("Applying Ken Burns to an image not matching target size, resizing first.") | |
image_clip = resize_media_to_fill(image_clip, target_size) | |
# Define the resize function based on time `t` | |
def resize_func(t): | |
# Zoom out: start at zoom_factor, end at 1.0 | |
current_zoom = 1 + (zoom_factor - 1) * (1 - t / duration) | |
return current_zoom | |
# Apply the resize effect over time | |
# Need to center the zoom effect | |
zoomed_clip = image_clip.fx(vfx.resize, resize_func) | |
# Crop back to target size, centered | |
final_clip = zoomed_clip.fx(vfx.crop, x_center=zoomed_clip.w/2, y_center=zoomed_clip.h/2, width=target_size[0], height=target_size[1]) | |
return final_clip.set_duration(duration) | |
def create_caption_clip(text, duration, clip_size, font_size=None, font_path=DEFAULT_FONT, color='white', stroke_color='black', stroke_width=1.5, position=('center', 'bottom'), margin=20): | |
"""Creates a MoviePy TextClip for captions with basic wrapping.""" | |
width, height = clip_size | |
max_text_width = width * 0.8 # Allow text to occupy 80% of the width | |
if font_size is None: | |
font_size = max(20, int(height / 25)) # Dynamic font size based on height | |
# Basic word wrapping | |
try: | |
# Attempt to load the font to estimate size | |
pil_font = ImageFont.truetype(font_path, font_size) | |
except IOError: | |
logging.warning(f"Font '{font_path}' not found. Using MoviePy default.") | |
pil_font = None # Use MoviePy default if specified font fails | |
words = text.split() | |
lines = [] | |
current_line = "" | |
line_width_func = lambda txt: pil_font.getbbox(txt)[2] if pil_font else len(txt) * font_size * 0.6 # Estimate width | |
for word in words: | |
test_line = f"{current_line} {word}".strip() | |
# Estimate width (this is approximate) | |
if line_width_func(test_line) <= max_text_width: | |
current_line = test_line | |
else: | |
if current_line: # Add the previous line if it wasn't empty | |
lines.append(current_line) | |
current_line = word # Start new line with the current word | |
# Handle case where a single word is too long | |
if line_width_func(current_line) > max_text_width: | |
logging.warning(f"Word '{current_line}' is too long for caption width.") | |
# Could implement character-level wrapping here if needed | |
if current_line: # Add the last line | |
lines.append(current_line) | |
wrapped_text = "\n".join(lines) | |
# Create the TextClip | |
try: | |
caption = TextClip( | |
wrapped_text, | |
fontsize=font_size, | |
color=color, | |
font=font_path, # MoviePy might handle font lookup differently | |
stroke_color=stroke_color, | |
stroke_width=stroke_width, | |
method='caption', # Use caption method for better wrapping if available | |
size=(int(max_text_width), None), # Constrain width for wrapping | |
align='center' | |
) | |
except Exception as e: | |
logging.error(f"Error creating TextClip (maybe font issue?): {e}. Using simpler TextClip.") | |
# Fallback to simpler TextClip without stroke/specific font if needed | |
caption = TextClip(wrapped_text, fontsize=font_size, color=color, method='caption', size=(int(max_text_width), None), align='center') | |
# Set position with margin | |
pos_x, pos_y = position | |
final_pos = list(caption.pos(pos_x, pos_y)) # Get numeric position | |
if 'bottom' in pos_y: | |
final_pos[1] -= margin | |
elif 'top' in pos_y: | |
final_pos[1] += margin | |
if 'right' in pos_x: | |
final_pos[0] -= margin | |
elif 'left' in pos_x: | |
final_pos[0] += margin | |
caption = caption.set_position(tuple(final_pos)).set_duration(duration) | |
return caption | |
def create_clip(media_path, media_type, audio_path, audio_duration, target_size, add_captions, narration_text, segment_index): | |
"""Creates a single video clip from media, audio, and optional captions.""" | |
logging.info(f"Creating clip {segment_index} - Type: {media_type}, Duration: {audio_duration:.2f}s") | |
try: | |
# Load Audio | |
audio_clip = AudioFileClip(audio_path) | |
# Verify audio duration (sometimes file reading is slightly off) | |
if abs(audio_clip.duration - audio_duration) > 0.1: | |
logging.warning(f"Audio file duration ({audio_clip.duration:.2f}s) differs from expected ({audio_duration:.2f}s). Using file duration.") | |
audio_duration = audio_clip.duration | |
# Ensure minimum duration to avoid issues | |
if audio_duration < 0.1: | |
logging.warning(f"Audio duration is very short ({audio_duration:.2f}s). Setting minimum 0.5s.") | |
audio_duration = 0.5 | |
audio_clip = audio_clip.subclip(0, audio_duration) | |
# Load Media (Video or Image) | |
if media_type == 'video': | |
try: | |
video_clip = VideoFileClip(media_path, target_resolution=(target_size[1], target_size[0])) # height, width | |
# Trim or loop video to match audio duration | |
if video_clip.duration >= audio_duration: | |
video_clip = video_clip.subclip(0, audio_duration) | |
else: | |
# Loop the video if it's shorter than the audio | |
logging.warning(f"Video duration ({video_clip.duration:.2f}s) shorter than audio ({audio_duration:.2f}s). Looping video.") | |
# video_clip = video_clip.fx(vfx.loop, duration=audio_duration) # Loop is simpler | |
# Alternatively freeze last frame: | |
num_loops = math.ceil(audio_duration / video_clip.duration) | |
video_clip = concatenate_videoclips([video_clip] * num_loops).subclip(0, audio_duration) | |
main_clip = resize_media_to_fill(video_clip, target_size) | |
except Exception as e: | |
logging.error(f"β Failed to load or process video file '{media_path}': {e}. Creating black clip.") | |
main_clip = ColorClip(size=target_size, color=(0,0,0), duration=audio_duration) | |
elif media_type == 'image': | |
try: | |
# Load image, resize to fill target, apply Ken Burns | |
img_clip_base = ImageClip(media_path) | |
img_clip_resized = resize_media_to_fill(img_clip_base, target_size) | |
main_clip = apply_ken_burns(img_clip_resized, audio_duration, target_size) | |
except Exception as e: | |
logging.error(f"β Failed to load or process image file '{media_path}': {e}. Creating black clip.") | |
main_clip = ColorClip(size=target_size, color=(0,0,0), duration=audio_duration) | |
else: | |
logging.error(f"β Unknown media type: {media_type}. Creating black clip.") | |
main_clip = ColorClip(size=target_size, color=(0,0,0), duration=audio_duration) | |
# Set duration definitively and add audio | |
main_clip = main_clip.set_duration(audio_duration).set_audio(audio_clip) | |
# Add Captions if enabled | |
if add_captions and narration_text: | |
caption_clip = create_caption_clip(narration_text, audio_duration, target_size) | |
final_clip = CompositeVideoClip([main_clip, caption_clip], size=target_size) | |
else: | |
final_clip = main_clip | |
logging.info(f"β Clip {segment_index} created successfully.") | |
return final_clip | |
except Exception as e: | |
logging.error(f"β Failed to create clip {segment_index}: {e}") | |
traceback.print_exc() | |
return None | |
def add_background_music(video_clip, music_file=BGM_FILE, volume=BGM_VOLUME): | |
"""Adds background music to the final video clip.""" | |
if not os.path.exists(music_file): | |
logging.warning(f"Background music file '{music_file}' not found. Skipping BGM.") | |
return video_clip | |
logging.info(f"Adding background music from {music_file}") | |
try: | |
bgm_clip = AudioFileClip(music_file) | |
video_duration = video_clip.duration | |
# Loop or trim BGM to match video duration | |
if bgm_clip.duration < video_duration: | |
# Loop BGM - Use audio_loop fx | |
bgm_clip = bgm_clip.fx(afx.audio_loop, duration=video_duration) | |
# Alternative manual loop: | |
# num_loops = math.ceil(video_duration / bgm_clip.duration) | |
# bgm_clip = concatenate_audioclips([bgm_clip] * num_loops).subclip(0, video_duration) | |
else: | |
bgm_clip = bgm_clip.subclip(0, video_duration) | |
# Adjust volume | |
bgm_clip = bgm_clip.volumex(volume) | |
# Combine with existing audio | |
original_audio = video_clip.audio | |
if original_audio: | |
combined_audio = CompositeAudioClip([original_audio, bgm_clip]) | |
else: | |
# Handle case where video might not have narration audio (e.g., if all TTS failed) | |
logging.warning("Video clip has no primary audio. Adding BGM only.") | |
combined_audio = bgm_clip | |
video_clip = video_clip.set_audio(combined_audio) | |
logging.info("β Background music added.") | |
return video_clip | |
except Exception as e: | |
logging.error(f"β Failed to add background music: {e}") | |
traceback.print_exc() | |
return video_clip # Return original clip on failure | |
# --- Main Gradio Function --- | |
def generate_video_process(topic, resolution_choice, add_captions_option, add_bgm_option, progress=gr.Progress()): | |
"""The main function called by Gradio to generate the video.""" | |
start_time = time.time() | |
status_log = [] | |
temp_dir = None | |
final_video_path = None | |
# Create a unique temporary directory for this run | |
try: | |
temp_dir = tempfile.mkdtemp(prefix=TEMP_FOLDER_BASE + "_") | |
status_log.append(f"Temporary directory created: {temp_dir}") | |
logging.info(f"Using temp directory: {temp_dir}") | |
# --- 1. Generate Script --- | |
progress(0.1, desc="Generating script...") | |
status_log.append("π Generating script...") | |
script = generate_script(topic, OPENROUTER_API_KEY, OPENROUTER_MODEL) | |
if not script: | |
status_log.append("β Script generation failed. Check API key and model.") | |
return "\n".join(status_log), None | |
status_log.append("β Script generated.") | |
# status_log.append(f"Raw Script:\n{script[:500]}...") # Optional: Log snippet | |
# --- 2. Parse Script --- | |
progress(0.2, desc="Parsing script...") | |
status_log.append("π Parsing script...") | |
elements = parse_script(script) | |
if not elements: | |
status_log.append("β Script parsing failed. Check script format.") | |
return "\n".join(status_log), None | |
num_segments = len(elements) // 2 | |
status_log.append(f"β Script parsed into {num_segments} segments.") | |
# --- 3. Process Segments (Media Search, Download, TTS, Clip Creation) --- | |
clips = [] | |
target_size = (1920, 1080) if resolution_choice == "Full HD (16:9)" else (1080, 1920) # W, H | |
status_log.append(f"Target resolution: {target_size[0]}x{target_size[1]}") | |
for i in range(0, len(elements), 2): | |
segment_index = i // 2 | |
current_progress = 0.2 + (0.6 * (segment_index / num_segments)) | |
progress(current_progress, desc=f"Processing segment {segment_index + 1}/{num_segments}") | |
scene_elem = elements[i] | |
narration_elem = elements[i+1] | |
scene_prompt = scene_elem['prompt'] | |
narration_text = narration_elem['text'] | |
status_log.append(f"\n--- Segment {segment_index + 1}/{num_segments} ---") | |
status_log.append(f"Scene Prompt: {scene_prompt}") | |
status_log.append(f"Narration: {narration_text[:100]}...") | |
# 3a. Generate TTS | |
status_log.append("π Generating narration audio...") | |
tts_path, tts_duration = generate_tts(narration_text, 'en', temp_dir, segment_index) | |
if not tts_path or tts_duration <= 0.1: # Check for valid duration | |
status_log.append(f"β οΈ TTS generation failed for segment {segment_index + 1}. Skipping segment.") | |
logging.warning(f"Skipping segment {segment_index+1} due to TTS failure.") | |
continue | |
status_log.append(f"β Narration audio generated ({tts_duration:.2f}s): {os.path.basename(tts_path)}") | |
# 3b. Search for Media | |
status_log.append("π Searching for media...") | |
media_path = None | |
media_type = None | |
# Try Pexels Video first | |
video_results = search_pexels(scene_prompt, PEXELS_API_KEY, media_type="videos") | |
if video_results: | |
selected_media = random.choice(video_results) | |
status_log.append(f"π₯ Downloading Pexels video: {selected_media['url']}") | |
media_path = download_media(selected_media['url'], temp_dir) | |
if media_path: | |
media_type = 'video' | |
else: | |
status_log.append("β οΈ Video download failed.") | |
# Try Pexels Image if video fails or not found | |
if not media_path: | |
status_log.append("π No suitable video found/downloaded. Searching Pexels images...") | |
image_results = search_pexels(scene_prompt, PEXELS_API_KEY, media_type="photos") | |
if image_results: | |
selected_media = random.choice(image_results) | |
status_log.append(f"π₯ Downloading Pexels image: {selected_media['url']}") | |
media_path = download_media(selected_media['url'], temp_dir) | |
if media_path: | |
media_type = 'image' | |
else: | |
status_log.append("β οΈ Image download failed.") | |
# Fallback: If no media found after searches | |
if not media_path: | |
status_log.append(f"β οΈ No suitable media found for '{scene_prompt}'. Using black screen.") | |
media_type = 'color' # Special type for ColorClip | |
media_path = None # No path needed for color clip | |
# 3c. Create Clip | |
status_log.append(f"π Creating video clip for segment {segment_index + 1}...") | |
clip = create_clip( | |
media_path=media_path if media_type != 'color' else None, # Pass None if color | |
media_type=media_type, | |
audio_path=tts_path, | |
audio_duration=tts_duration, | |
target_size=target_size, | |
add_captions=add_captions_option, | |
narration_text=narration_text, | |
segment_index=segment_index | |
) | |
if clip: | |
clips.append(clip) | |
status_log.append(f"β Clip {segment_index + 1} created.") | |
else: | |
status_log.append(f"β Failed to create clip for segment {segment_index + 1}. Skipping.") | |
logging.error(f"Failed to create clip {segment_index+1}, skipping.") | |
if not clips: | |
status_log.append("\nβ No valid clips were created. Cannot generate video.") | |
return "\n".join(status_log), None | |
# --- 4. Concatenate Clips --- | |
progress(0.85, desc="Combining video clips...") | |
status_log.append("\nπ Combining video clips...") | |
try: | |
final_clip = concatenate_videoclips(clips, method="compose") | |
status_log.append("β Clips combined successfully.") | |
except Exception as e: | |
status_log.append(f"β Error concatenating clips: {e}") | |
logging.error(f"Concatenation failed: {e}") | |
traceback.print_exc() | |
# Attempt cleanup even on error | |
for clip in clips: | |
clip.close() | |
return "\n".join(status_log), None | |
# --- 5. Add Background Music (Optional) --- | |
if add_bgm_option: | |
progress(0.9, desc="Adding background music...") | |
status_log.append("π Adding background music...") | |
final_clip = add_background_music(final_clip, music_file=BGM_FILE, volume=BGM_VOLUME) | |
# --- 6. Write Final Video --- | |
progress(0.95, desc="Writing final video file...") | |
status_log.append("π Writing final video file (this may take time)...") | |
output_path = os.path.join(temp_dir, OUTPUT_VIDEO_FILENAME) | |
try: | |
# Use 'medium' preset for better quality/size balance than 'ultrafast' | |
# Use 'libx264' for wide compatibility. Adjust audio_codec if needed. | |
# threads=4 can help speed up encoding on multi-core systems | |
final_clip.write_videofile( | |
output_path, | |
codec='libx264', | |
audio_codec='aac', | |
fps=24, | |
preset='medium', | |
threads=4, | |
logger='bar' # Use None for less verbose output, or 'bar' for progress | |
) | |
status_log.append(f"β Final video saved to: {output_path}") | |
final_video_path = output_path # Set the path to be returned | |
except Exception as e: | |
status_log.append(f"β Error writing final video file: {e}") | |
logging.error(f"Final video write failed: {e}") | |
traceback.print_exc() | |
final_video_path = None # Ensure no path is returned on failure | |
finally: | |
# Ensure MoviePy resources are released | |
final_clip.close() | |
for clip in clips: | |
try: | |
clip.close() | |
if clip.audio: clip.audio.close() | |
except: | |
pass # Ignore errors during cleanup | |
except Exception as e: | |
status_log.append(f"\nβ An unexpected error occurred during video generation: {e}") | |
logging.error("An unexpected error occurred in generate_video_process:") | |
logging.error(traceback.format_exc()) | |
final_video_path = None # Ensure failure state | |
finally: | |
# --- 7. Cleanup --- | |
if temp_dir and os.path.exists(temp_dir): | |
try: | |
shutil.rmtree(temp_dir) | |
status_log.append(f"π§Ή Temporary directory cleaned up: {temp_dir}") | |
logging.info(f"Cleaned up temp directory: {temp_dir}") | |
except Exception as e: | |
status_log.append(f"β οΈ Error cleaning up temporary directory {temp_dir}: {e}") | |
logging.warning(f"Cleanup failed for {temp_dir}: {e}") | |
end_time = time.time() | |
total_time = end_time - start_time | |
status_log.append(f"\n--- Generation Complete ---") | |
status_log.append(f"Total time: {total_time:.2f} seconds") | |
progress(1.0, desc="Finished!") | |
return "\n".join(status_log), final_video_path | |
# --- Gradio Interface Definition --- | |
with gr.Blocks() as iface: | |
gr.Markdown("# π€ AI Documentary Generator") | |
gr.Markdown("Enter a topic, choose your settings, and let the AI create a short video documentary!") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
topic_input = gr.Textbox( | |
label="Video Topic", | |
placeholder="e.g., The History of Coffee, The Secrets of the Deep Ocean, The Rise of Quantum Computing", | |
lines=2 | |
) | |
resolution_input = gr.Radio( | |
label="Video Format", | |
choices=["Short (9:16)", "Full HD (16:9)"], | |
value="Short (9:16)" | |
) | |
captions_input = gr.Checkbox(label="Add Captions", value=True) | |
bgm_input = gr.Checkbox(label=f"Add Background Music ({os.path.basename(BGM_FILE) if os.path.exists(BGM_FILE) else 'No BGM file found'})", value=True, interactive=os.path.exists(BGM_FILE)) | |
generate_button = gr.Button("Generate Video", variant="primary") | |
with gr.Column(scale=2): | |
status_output = gr.Textbox(label="Status Log", lines=15, interactive=False) | |
video_output = gr.Video(label="Generated Video") | |
generate_button.click( | |
fn=generate_video_process, | |
inputs=[topic_input, resolution_input, captions_input, bgm_input], | |
outputs=[status_output, video_output] | |
) | |
gr.Examples( | |
examples=[ | |
["The lifecycle of a butterfly", "Short (9:16)", True, True], | |
["Ancient Roman Engineering", "Full HD (16:9)", True, False], | |
["The impact of social media", "Short (9:16)", False, True], | |
], | |
inputs=[topic_input, resolution_input, captions_input, bgm_input] | |
) | |
# --- Launch the App --- | |
if __name__ == "__main__": | |
# Optional: Check for API keys on startup | |
if not PEXELS_API_KEY or PEXELS_API_KEY == "YOUR_PEXELS_API_KEY_HERE": | |
logging.warning("PEXELS_API_KEY is not set. Media search will be limited.") | |
print("WARNING: PEXELS_API_KEY is not set. Media search will be limited.") | |
if not OPENROUTER_API_KEY or OPENROUTER_API_KEY == "YOUR_OPENROUTER_API_KEY_HERE": | |
logging.warning("OPENROUTER_API_KEY is not set. Script generation will fail.") | |
print("WARNING: OPENROUTER_API_KEY is not set. Script generation will fail.") | |
# Optional: Add a placeholder BGM file if it doesn't exist | |
if not os.path.exists(BGM_FILE): | |
logging.warning(f"Background music file '{BGM_FILE}' not found. Creating a silent placeholder.") | |
try: | |
# Create a short silent mp3 using pydub | |
silent_segment = AudioSegment.silent(duration=1000) # 1 second silence | |
silent_segment.export(BGM_FILE, format="mp3") | |
logging.info(f"Created silent placeholder BGM file: {BGM_FILE}") | |
except Exception as e: | |
logging.error(f"Could not create placeholder BGM file: {e}") | |
# Fix ImageMagick policy (attempt) - May need sudo/root privileges not available in all environments | |
# def fix_imagemagick_policy(): | |
# policy_path = "/etc/ImageMagick-6/policy.xml" # Adjust path if needed | |
# if os.path.exists(policy_path): | |
# try: | |
# # Use sed to modify the policy file (requires sed command) | |
# os.system(f"sed -i 's/rights=\"none\" pattern=\"PS\"/rights=\"read|write\" pattern=\"PS\"/' {policy_path}") | |
# os.system(f"sed -i 's/rights=\"none\" pattern=\"LABEL\"/rights=\"read|write\" pattern=\"LABEL\"/' {policy_path}") | |
# os.system(f"sed -i 's/rights=\"none\" pattern=\"TEXT\"/rights=\"read|write\" pattern=\"TEXT\"/' {policy_path}") # Add TEXT pattern | |
# logging.info(f"Attempted to update ImageMagick policy at {policy_path}") | |
# except Exception as e: | |
# logging.warning(f"Failed to automatically update ImageMagick policy: {e}. Manual adjustment might be needed if text rendering fails.") | |
# fix_imagemagick_policy() | |
iface.launch(debug=True, share=True) # Set share=True for public link if needed |