diff --git "a/app.py" "b/app.py" --- "a/app.py" +++ "b/app.py" @@ -1,55 +1,64 @@ - +# Install necessary packages (assuming these are already run in your environment) +# !pip install transformers==4.49.0 +# !pip install moviepy gTTS requests pydub pillow +# !pip cache purge +# !apt-get install imagemagick -y +# !pip install kokoro>=0.3.4 soundfile +# !apt-get-qq -y install espeak-ng > /dev/null 2>&1 +# !pip install pysrt +# !pip install gradio # Import necessary libraries from kokoro import KPipeline - import soundfile as sf import torch - -import soundfile as sf +# Removed duplicate import of soundfile as sf import os -from moviepy.editor import VideoFileClip, AudioFileClip, ImageClip +from moviepy.editor import ( + VideoFileClip, AudioFileClip, ImageClip, concatenate_videoclips, + CompositeVideoClip, TextClip, CompositeAudioClip # Added CompositeAudioClip +) from PIL import Image import tempfile import random import cv2 import math -import os, requests, io, time, re, random -from moviepy.editor import ( - VideoFileClip, concatenate_videoclips, AudioFileClip, ImageClip, - CompositeVideoClip, TextClip, CompositeAudioClip -) +import requests, io, time, re +# Removed duplicate import of random import gradio as gr import shutil -import os +# Removed duplicate import of os import moviepy.video.fx.all as vfx import moviepy.config as mpy_config from pydub import AudioSegment from pydub.generators import Sine - -from PIL import Image, ImageDraw, ImageFont +# Removed duplicate import of Image, ImageDraw, ImageFont import numpy as np from bs4 import BeautifulSoup import base64 from urllib.parse import quote import pysrt from gtts import gTTS -import gradio as gr # Import Gradio +# Removed duplicate import of gradio as gr # Initialize Kokoro TTS pipeline (using American English) pipeline = KPipeline(lang_code='a') # Use voice 'af_heart' for American English # Ensure ImageMagick binary is set -mpy_config.change_settings({"IMAGEMAGICK_BINARY": "/usr/bin/convert"}) +try: + mpy_config.change_settings({"IMAGEMAGICK_BINARY": "/usr/bin/convert"}) + print("ImageMagick binary set successfully.") +except Exception as e: + print(f"Warning: Could not set ImageMagick binary automatically: {e}") + print("TextClip functionality might be limited if ImageMagick is not found.") + # ---------------- Global Configuration ---------------- # -PEXELS_API_KEY = 'BhJqbcdm9Vi90KqzXKAhnEHGsuFNv4irXuOjWtT761U49lRzo03qBGna' -OPENROUTER_API_KEY = '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' +PEXELS_API_KEY = 'BhJqbcdm9Vi90KqzXKAhnEHGsuFNv4irXuOjWtT761U49lRzo03qBGna' # Replace with your key +OPENROUTER_API_KEY = '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' # Replace with your key OPENROUTER_MODEL = "mistralai/mistral-small-3.1-24b-instruct:free" OUTPUT_VIDEO_FILENAME = "final_video.mp4" USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" - - # Additional global variables needed for the Gradio interface selected_voice = 'af_heart' # Default voice voice_speed = 0.9 # Default voice speed @@ -64,23 +73,13 @@ TEMP_FOLDER = None # ---------------- Helper Functions ---------------- # -# (Your existing helper functions remain unchanged: generate_script, parse_script, -# search_pexels_videos, search_pexels_images, search_google_images, download_image, -# download_video, generate_media, generate_tts, apply_kenburns_effect, -# resize_to_fill, find_mp3_files, add_background_music, create_clip, -# fix_imagemagick_policy) - -# Define these globally as they were in your original code but will be set per run -TARGET_RESOLUTION = None -CAPTION_COLOR = None -TEMP_FOLDER = None def generate_script(user_input): """Generate documentary script with proper OpenRouter handling.""" headers = { 'Authorization': f'Bearer {OPENROUTER_API_KEY}', - 'HTTP-Referer': 'https://your-domain.com', - 'X-Title': 'AI Documentary Maker' + 'HTTP-Referer': 'https://your-domain.com', # Optional: Replace with your actual domain if needed + 'X-Title': 'AI Documentary Maker' # Optional } prompt = f"""Short Documentary Script GeneratorInstructions: @@ -175,22 +174,23 @@ Now here is the Topic/scrip: {user_input} 'https://openrouter.ai/api/v1/chat/completions', headers=headers, json=data, - timeout=30 + timeout=60 # Increased timeout ) - if response.status_code == 200: - response_data = response.json() - if 'choices' in response_data and len(response_data['choices']) > 0: - return response_data['choices'][0]['message']['content'] - else: - print("Unexpected response format:", response_data) - return None + response.raise_for_status() # Raise an exception for bad status codes + + response_data = response.json() + if 'choices' in response_data and len(response_data['choices']) > 0 and 'message' in response_data['choices'][0] and 'content' in response_data['choices'][0]['message']: + return response_data['choices'][0]['message']['content'].strip() else: - print(f"API Error {response.status_code}: {response.text}") + print("Unexpected API response format:", response_data) return None + except requests.exceptions.RequestException as e: + print(f"API request failed: {e}") + return None except Exception as e: - print(f"Request failed: {str(e)}") + print(f"An unexpected error occurred during script generation: {e}") return None def parse_script(script_text): @@ -204,182 +204,293 @@ def parse_script(script_text): current_title = None current_text = "" + if not script_text: + print("Error: Received empty script text for parsing.") + return [] + try: - for line in script_text.splitlines(): + lines = script_text.strip().splitlines() + for line in lines: line = line.strip() - if line.startswith("[") and "]" in line: - bracket_start = line.find("[") - bracket_end = line.find("]", bracket_start) - if bracket_start != -1 and bracket_end != -1: - if current_title is not None: - sections[current_title] = current_text.strip() - current_title = line[bracket_start+1:bracket_end] - current_text = line[bracket_end+1:].strip() - elif current_title: - current_text += line + " " - - if current_title: + if not line: # Skip empty lines + continue + + match = re.match(r'^\[([^\]]+)\](.*)', line) + if match: + # If we were processing a previous title, save it + if current_title is not None and current_text: + sections[current_title] = current_text.strip() + + current_title = match.group(1).strip() + current_text = match.group(2).strip() + " " # Start text for the new title + elif current_title is not None: + # Append line to the current text if it doesn't start a new section + current_text += line + " " + + # Add the last section after the loop ends + if current_title is not None and current_text: sections[current_title] = current_text.strip() elements = [] for title, narration in sections.items(): + narration = narration.strip() # Ensure no leading/trailing whitespace if not title or not narration: + print(f"Warning: Skipping empty title ('{title}') or narration ('{narration}')") continue media_element = {"type": "media", "prompt": title, "effects": "fade-in"} words = narration.split() - duration = max(3, len(words) * 0.5) + # Simple duration estimate: 0.5 seconds per word, minimum 3 seconds + duration = max(3.0, len(words) * 0.5) tts_element = {"type": "tts", "text": narration, "voice": "en", "duration": duration} elements.append(media_element) elements.append(tts_element) + if not elements: + print("Warning: Script parsing resulted in no elements. Check script format.") return elements except Exception as e: print(f"Error parsing script: {e}") + print(f"Problematic script text snippet: {script_text[:200]}") # Log part of the script return [] + def search_pexels_videos(query, pexels_api_key): """Search for a video on Pexels by query and return a random HD video.""" + if not pexels_api_key: + print("Pexels API key is missing. Cannot search for videos.") + return None headers = {'Authorization': pexels_api_key} base_url = "https://api.pexels.com/videos/search" - num_pages = 3 + num_pages = 3 # Search first 3 pages videos_per_page = 15 max_retries = 3 - retry_delay = 1 + retry_delay = 2 # Start with 2 seconds delay search_query = query all_videos = [] + print(f"Searching Pexels videos for: '{query}'") for page in range(1, num_pages + 1): + params = {"query": search_query, "per_page": videos_per_page, "page": page, "orientation": "landscape"} # Added orientation for attempt in range(max_retries): try: - params = {"query": search_query, "per_page": videos_per_page, "page": page} - response = requests.get(base_url, headers=headers, params=params, timeout=10) + response = requests.get(base_url, headers=headers, params=params, timeout=15) # Increased timeout if response.status_code == 200: data = response.json() videos = data.get("videos", []) if not videos: - print(f"No videos found on page {page}.") - break + # print(f"No videos found on page {page} for '{query}'.") # Less verbose + break # Stop searching pages if one is empty for video in videos: video_files = video.get("video_files", []) + # Prefer HD, then SD if HD not found + hd_link = None + sd_link = None for file in video_files: - if file.get("quality") == "hd": - all_videos.append(file.get("link")) - break + if file.get("quality") == "hd" and file.get("link"): + hd_link = file.get("link") + break # Found HD, use it + elif file.get("quality") == "sd" and file.get("link"): + sd_link = file.get("link") # Keep SD as fallback + + link_to_add = hd_link if hd_link else sd_link + if link_to_add: + all_videos.append(link_to_add) - break + break # Success for this page, move to next page elif response.status_code == 429: - print(f"Rate limit hit (attempt {attempt+1}/{max_retries}). Retrying in {retry_delay} seconds...") + print(f"Pexels rate limit hit (attempt {attempt+1}/{max_retries}). Retrying in {retry_delay} seconds...") time.sleep(retry_delay) retry_delay *= 2 + elif response.status_code == 400: # Bad request often means invalid query + print(f"Pexels API bad request (400) for query '{query}'. Skipping.") + return None # Don't retry bad requests else: - print(f"Error fetching videos: {response.status_code} {response.text}") + print(f"Error fetching Pexels videos: {response.status_code} {response.text}") if attempt < max_retries - 1: - print(f"Retrying in {retry_delay} seconds...") + print(f"Retrying Pexels video search in {retry_delay} seconds...") time.sleep(retry_delay) retry_delay *= 2 else: - break + print("Max retries reached for Pexels video search.") + break # Max retries for this page - except requests.exceptions.RequestException as e: - print(f"Request exception: {e}") + except requests.exceptions.Timeout: + print(f"Pexels video search timed out (attempt {attempt+1}/{max_retries}).") if attempt < max_retries - 1: - print(f"Retrying in {retry_delay} seconds...") - time.sleep(retry_delay) - retry_delay *= 2 + print(f"Retrying Pexels video search in {retry_delay} seconds...") + time.sleep(retry_delay) + retry_delay *= 2 else: - break + print("Max retries reached for Pexels video search due to timeout.") + break # Max retries for this page + except requests.exceptions.RequestException as e: + print(f"Pexels video search request exception: {e}") + # Don't retry general request exceptions unless specifically needed + break # Stop trying for this page + + # Reset retry delay for the next page + retry_delay = 2 if all_videos: random_video = random.choice(all_videos) - print(f"Selected random video from {len(all_videos)} HD videos") + print(f"Selected random video from {len(all_videos)} found for '{query}'") return random_video else: - print("No suitable videos found after searching all pages.") + print(f"No suitable Pexels videos found for query: '{query}'") return None def search_pexels_images(query, pexels_api_key): """Search for an image on Pexels by query.""" + if not pexels_api_key: + print("Pexels API key is missing. Cannot search for images.") + return None headers = {'Authorization': pexels_api_key} url = "https://api.pexels.com/v1/search" - params = {"query": query, "per_page": 5, "orientation": "landscape"} + params = {"query": query, "per_page": 10, "orientation": "landscape"} # Get more results, landscape only max_retries = 3 - retry_delay = 1 + retry_delay = 2 + print(f"Searching Pexels images for: '{query}'") for attempt in range(max_retries): try: - response = requests.get(url, headers=headers, params=params, timeout=10) + response = requests.get(url, headers=headers, params=params, timeout=15) if response.status_code == 200: data = response.json() photos = data.get("photos", []) if photos: - photo = random.choice(photos[:min(5, len(photos))]) - img_url = photo.get("src", {}).get("original") - return img_url + # Select from 'original', 'large2x', 'large' in order of preference + valid_photos = [] + for photo in photos: + src = photo.get("src", {}) + img_url = src.get("original") or src.get("large2x") or src.get("large") + if img_url: + valid_photos.append(img_url) + + if valid_photos: + chosen_url = random.choice(valid_photos) + print(f"Found {len(valid_photos)} Pexels images for '{query}', selected one.") + return chosen_url + else: + print(f"No valid image URLs found in Pexels response for '{query}'.") + return None else: - print(f"No images found for query: {query}") + # print(f"No Pexels images found for query: {query}") # Less verbose return None elif response.status_code == 429: - print(f"Rate limit hit (attempt {attempt+1}/{max_retries}). Retrying in {retry_delay} seconds...") + print(f"Pexels rate limit hit (attempt {attempt+1}/{max_retries}). Retrying in {retry_delay} seconds...") time.sleep(retry_delay) retry_delay *= 2 + elif response.status_code == 400: + print(f"Pexels API bad request (400) for query '{query}'. Skipping.") + return None else: - print(f"Error fetching images: {response.status_code} {response.text}") + print(f"Error fetching Pexels images: {response.status_code} {response.text}") if attempt < max_retries - 1: - print(f"Retrying in {retry_delay} seconds...") + print(f"Retrying Pexels image search in {retry_delay} seconds...") time.sleep(retry_delay) retry_delay *= 2 - + else: + print("Max retries reached for Pexels image search.") + return None # Max retries failed + + except requests.exceptions.Timeout: + print(f"Pexels image search timed out (attempt {attempt+1}/{max_retries}).") + if attempt < max_retries - 1: + print(f"Retrying Pexels image search in {retry_delay} seconds...") + time.sleep(retry_delay) + retry_delay *= 2 + else: + print("Max retries reached for Pexels image search due to timeout.") + return None # Max retries failed except requests.exceptions.RequestException as e: - print(f"Request exception: {e}") - if attempt < max_retries - 1: - print(f"Retrying in {retry_delay} seconds...") - time.sleep(retry_delay) - retry_delay *= 2 + print(f"Pexels image search request exception: {e}") + return None # Don't retry - print(f"No Pexels images found for query: {query} after all attempts") + print(f"No Pexels images found for query: '{query}' after all attempts.") return None def search_google_images(query): - """Search for images on Google Images (for news-related queries)""" + """Search for images on Google Images (use cautiously, might break).""" + print(f"Attempting Google Image search for (use with caution): '{query}'") try: - search_url = f"https://www.google.com/search?q={quote(query)}&tbm=isch" + # Using a simpler, potentially more stable URL structure + search_url = f"https://www.google.com/search?q={quote(query)}&source=lnms&tbm=isch" headers = {"User-Agent": USER_AGENT} response = requests.get(search_url, headers=headers, timeout=10) + response.raise_for_status() # Check for HTTP errors + soup = BeautifulSoup(response.text, "html.parser") - img_tags = soup.find_all("img") + # Google changes its HTML structure often. This is a common pattern, but might need updates. + # Look for image data embedded in script tags or specific img tags. image_urls = [] + # Try finding JSON data first (often more reliable if present) + scripts = soup.find_all("script") + for script in scripts: + if script.string and 'AF_initDataCallback' in script.string: + # This requires more complex parsing of the JS data structure + # For simplicity, we'll stick to img tags for now. + pass # Placeholder for potential future JSON parsing + + # Fallback to finding img tags (less reliable for direct source URLs) + img_tags = soup.find_all("img") for img in img_tags: - src = img.get("src", "") - if src.startswith("http") and "gstatic" not in src: - image_urls.append(src) + src = img.get("src") or img.get("data-src") # Check both src and data-src + if src and src.startswith("http") and not "gstatic.com" in src: + # Basic filtering, might need refinement + image_urls.append(src) + elif src and src.startswith('data:image'): + # Handle base64 encoded images (less common for main results now) + try: + # Extract base64 data (simplistic extraction) + header, encoded = src.split(",", 1) + # You could save this, but it's often just thumbnails + # print("Found base64 image data (skipping for now)") + except ValueError: + pass # Ignore malformed data URIs if image_urls: - return random.choice(image_urls[:5]) if len(image_urls) >= 5 else image_urls[0] + # Return a random one from the first few potentially relevant results + num_to_consider = min(len(image_urls), 10) + chosen_url = random.choice(image_urls[:num_to_consider]) + print(f"Found {len(image_urls)} potential Google images, selected one.") + return chosen_url else: - print(f"No Google Images found for query: {query}") + print(f"No suitable Google Images found for query: '{query}' with current parsing method.") return None + except requests.exceptions.RequestException as e: + print(f"Error during Google Images request: {e}") + return None except Exception as e: - print(f"Error in Google Images search: {e}") + print(f"Error parsing Google Images HTML: {e}") return None + def download_image(image_url, filename): """Download an image from a URL to a local file with enhanced error handling.""" + if not image_url: + print("Error: No image URL provided for download.") + return None try: - headers = {"User-Agent": USER_AGENT} + headers = {"User-Agent": USER_AGENT, "Accept": "image/*"} # Be more specific about accepted content print(f"Downloading image from: {image_url} to {filename}") - response = requests.get(image_url, headers=headers, stream=True, timeout=15) - response.raise_for_status() + response = requests.get(image_url, headers=headers, stream=True, timeout=20) # Increased timeout + response.raise_for_status() # Check for download errors + + # Check content type if possible + content_type = response.headers.get('Content-Type', '').lower() + if 'image' not in content_type: + print(f"Warning: URL content type ({content_type}) might not be an image. Proceeding anyway.") with open(filename, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): @@ -387,586 +498,1327 @@ def download_image(image_url, filename): print(f"Image downloaded successfully to: {filename}") + # Validate the downloaded image try: img = Image.open(filename) - img.verify() + img.verify() # Check if Pillow can read the header + # Re-open after verify img = Image.open(filename) if img.mode != 'RGB': + print(f"Converting image {filename} from {img.mode} to RGB.") img = img.convert('RGB') - img.save(filename) + img.save(filename, quality=90) # Save with decent quality + img.close() # Close the image file handle print(f"Image validated and processed: {filename}") return filename - except Exception as e_validate: - print(f"Downloaded file is not a valid image: {e_validate}") + except (IOError, SyntaxError, Image.UnidentifiedImageError) as e_validate: + print(f"Downloaded file '{filename}' is not a valid image or is corrupted: {e_validate}") if os.path.exists(filename): - os.remove(filename) + try: + os.remove(filename) + print(f"Removed invalid image file: {filename}") + except OSError as e_remove: + print(f"Error removing invalid image file '{filename}': {e_remove}") return None except requests.exceptions.RequestException as e_download: - print(f"Image download error: {e_download}") + print(f"Image download error from {image_url}: {e_download}") + # Clean up potentially incomplete file if os.path.exists(filename): - os.remove(filename) + try: + os.remove(filename) + except OSError: pass return None except Exception as e_general: - print(f"General error during image processing: {e_general}") + print(f"General error during image processing for {image_url}: {e_general}") if os.path.exists(filename): - os.remove(filename) + try: + os.remove(filename) + except OSError: pass return None def download_video(video_url, filename): """Download a video from a URL to a local file.""" + if not video_url: + print("Error: No video URL provided for download.") + return None try: - response = requests.get(video_url, stream=True, timeout=30) + headers = {"User-Agent": USER_AGENT} # Pexels might not require this, but good practice + print(f"Downloading video from: {video_url} to {filename}") + response = requests.get(video_url, headers=headers, stream=True, timeout=60) # Generous timeout for videos response.raise_for_status() + with open(filename, 'wb') as f: - for chunk in response.iter_content(chunk_size=8192): + for chunk in response.iter_content(chunk_size=1024*1024): # Larger chunks for video f.write(chunk) print(f"Video downloaded successfully to: {filename}") + + # Basic validation: check file size + if os.path.getsize(filename) < 1024: # Check if file is suspiciously small (e.g., < 1KB) + print(f"Warning: Downloaded video file '{filename}' is very small. It might be invalid.") + # Keep the file for now, let moviepy handle potential errors later + return filename - except Exception as e: - print(f"Video download error: {e}") + except requests.exceptions.RequestException as e: + print(f"Video download error from {video_url}: {e}") + if os.path.exists(filename): + try: + os.remove(filename) # Clean up failed download + except OSError: pass + return None + except Exception as e_general: + print(f"General error during video download for {video_url}: {e_general}") if os.path.exists(filename): - os.remove(filename) + try: + os.remove(filename) + except OSError: pass return None + def generate_media(prompt, user_image=None, current_index=0, total_segments=1): """ - Generate a visual asset by first searching for a video or using a specific search strategy. - For news-related queries, use Google Images. + Generate a visual asset: Try video (based on probability), then Pexels image, then Google (news), then fallback Pexels image. Returns a dict: {'path': , 'asset_type': 'video' or 'image'}. """ safe_prompt = re.sub(r'[^\w\s-]', '', prompt).strip().replace(' ', '_') + if not safe_prompt: # Handle cases where prompt becomes empty after sanitizing + safe_prompt = f"media_{current_index}" + print(f"\n--- Generating Media for Prompt: '{prompt}' (Segment {current_index+1}/{total_segments}) ---") - if "news" in prompt.lower(): - print(f"News-related query detected: {prompt}. Using Google Images...") - image_file = os.path.join(TEMP_FOLDER, f"{safe_prompt}_news.jpg") - image_url = search_google_images(prompt) - if image_url: - downloaded_image = download_image(image_url, image_file) - if downloaded_image: - print(f"News image saved to {downloaded_image}") - return {"path": downloaded_image, "asset_type": "image"} - else: - print(f"Google Images search failed for prompt: {prompt}") - + # 1. Try Video first based on probability if random.random() < video_clip_probability: - video_file = os.path.join(TEMP_FOLDER, f"{safe_prompt}_video.mp4") + print(f"Attempting video search (Probability: {video_clip_probability*100}%)") + video_file = os.path.join(TEMP_FOLDER, f"{safe_prompt}_video_{current_index}.mp4") video_url = search_pexels_videos(prompt, PEXELS_API_KEY) if video_url: downloaded_video = download_video(video_url, video_file) - if downloaded_video: - print(f"Video asset saved to {downloaded_video}") - return {"path": downloaded_video, "asset_type": "video"} + if downloaded_video and os.path.exists(downloaded_video): + # Further check if video is usable by moviepy (optional, adds overhead) + try: + with VideoFileClip(downloaded_video) as test_clip: + if test_clip.duration > 0: + print(f"Video asset usable: {downloaded_video}") + return {"path": downloaded_video, "asset_type": "video"} + else: + print(f"Downloaded video file seems invalid (duration 0): {downloaded_video}") + os.remove(downloaded_video) # Clean up invalid video + except Exception as e: + print(f"Error testing downloaded video {downloaded_video}: {e}") + if os.path.exists(downloaded_video): os.remove(downloaded_video) # Clean up invalid video + else: + print(f"Pexels video download failed for prompt: '{prompt}'") else: - print(f"Pexels video search failed for prompt: {prompt}") - - image_file = os.path.join(TEMP_FOLDER, f"{safe_prompt}.jpg") - image_url = search_pexels_images(prompt, PEXELS_API_KEY) - if image_url: - downloaded_image = download_image(image_url, image_file) - if downloaded_image: - print(f"Image asset saved to {downloaded_image}") - return {"path": downloaded_image, "asset_type": "image"} + print(f"Pexels video search failed for prompt: '{prompt}'") + + # 2. Try Pexels Image + print("Attempting Pexels image search...") + image_file_pexels = os.path.join(TEMP_FOLDER, f"{safe_prompt}_pexels_{current_index}.jpg") + image_url_pexels = search_pexels_images(prompt, PEXELS_API_KEY) + if image_url_pexels: + downloaded_image_pexels = download_image(image_url_pexels, image_file_pexels) + if downloaded_image_pexels and os.path.exists(downloaded_image_pexels): + print(f"Pexels image asset saved: {downloaded_image_pexels}") + return {"path": downloaded_image_pexels, "asset_type": "image"} else: - print(f"Pexels image download failed for prompt: {prompt}") - - fallback_terms = ["nature", "people", "landscape", "technology", "business"] - for term in fallback_terms: - print(f"Trying fallback image search with term: {term}") - fallback_file = os.path.join(TEMP_FOLDER, f"fallback_{term}.jpg") - fallback_url = search_pexels_images(term, PEXELS_API_KEY) - if fallback_url: - downloaded_fallback = download_image(fallback_url, fallback_file) - if downloaded_fallback: - print(f"Fallback image saved to {downloaded_fallback}") - return {"path": downloaded_fallback, "asset_type": "image"} + print(f"Pexels image download failed for prompt: '{prompt}'") + + # 3. If "news" in prompt, try Google Images as a secondary option + if "news" in prompt.lower(): + print(f"News-related query: '{prompt}'. Trying Google Images as secondary...") + image_file_google = os.path.join(TEMP_FOLDER, f"{safe_prompt}_google_{current_index}.jpg") + image_url_google = search_google_images(prompt) + if image_url_google: + downloaded_image_google = download_image(image_url_google, image_file_google) + if downloaded_image_google and os.path.exists(downloaded_image_google): + print(f"Google image asset saved: {downloaded_image_google}") + return {"path": downloaded_image_google, "asset_type": "image"} else: - print(f"Fallback image download failed for term: {term}") + print(f"Google Images download failed for prompt: '{prompt}'") + else: + print(f"Google Images search failed for prompt: '{prompt}'") + + # 4. Fallback to generic Pexels image search if everything else failed + print("Primary searches failed. Attempting fallback Pexels image search...") + fallback_terms = ["abstract", "texture", "technology", "nature", "background"] + fallback_term = random.choice(fallback_terms) + print(f"Using fallback term: '{fallback_term}'") + fallback_file = os.path.join(TEMP_FOLDER, f"fallback_{fallback_term}_{current_index}.jpg") + fallback_url = search_pexels_images(fallback_term, PEXELS_API_KEY) + if fallback_url: + downloaded_fallback = download_image(fallback_url, fallback_file) + if downloaded_fallback and os.path.exists(downloaded_fallback): + print(f"Fallback image asset saved: {downloaded_fallback}") + return {"path": downloaded_fallback, "asset_type": "image"} + else: + print(f"Fallback image download failed for term: '{fallback_term}'") + else: + print(f"Fallback image search failed for term: '{fallback_term}'") + + # 5. Absolute fallback: Generate a simple color background (if ImageMagick is available) + try: + print("All media generation failed. Creating a simple color background.") + color_bg_path = os.path.join(TEMP_FOLDER, f"color_bg_{current_index}.png") + # Ensure TARGET_RESOLUTION is set before calling this + if TARGET_RESOLUTION: + w, h = TARGET_RESOLUTION + # Pick a random dark color + r, g, b = random.randint(0, 50), random.randint(0, 50), random.randint(0, 50) + color = f"rgb({r},{g},{b})" + # Use ImageMagick 'convert' command - requires it to be installed and accessible + cmd = f"convert -size {w}x{h} xc:'{color}' {color_bg_path}" + os.system(cmd) + if os.path.exists(color_bg_path): + print(f"Generated color background: {color_bg_path}") + return {"path": color_bg_path, "asset_type": "image"} + else: + print("Failed to generate color background using ImageMagick.") + return None else: - print(f"Fallback image search failed for term: {term}") + print("Cannot generate color background: TARGET_RESOLUTION not set.") + return None + except Exception as e: + print(f"Error generating color background: {e}") + return None - print(f"Failed to generate visual asset for prompt: {prompt}") + # Should not be reached if color background works, but as a final safety net: + print(f"ERROR: Failed to generate *any* visual asset for prompt: '{prompt}'") return None + def generate_silent_audio(duration, sample_rate=24000): """Generate a silent WAV audio file lasting 'duration' seconds.""" - num_samples = int(duration * sample_rate) - silence = np.zeros(num_samples, dtype=np.float32) - silent_path = os.path.join(TEMP_FOLDER, f"silent_{int(time.time())}.wav") - sf.write(silent_path, silence, sample_rate) - print(f"Silent audio generated: {silent_path}") - return silent_path + try: + num_samples = int(duration * sample_rate) + silence = np.zeros(num_samples, dtype=np.float32) + # Ensure TEMP_FOLDER exists and is writable + if not TEMP_FOLDER or not os.path.isdir(TEMP_FOLDER): + print("Error: TEMP_FOLDER not set or invalid for silent audio.") + # Create a fallback temporary file + silent_file = tempfile.NamedTemporaryFile(suffix=".wav", delete=False) + silent_path = silent_file.name + silent_file.close() # Close handle immediately after getting name + else: + silent_path = os.path.join(TEMP_FOLDER, f"silent_{int(time.time()*1000)}.wav") + + sf.write(silent_path, silence, sample_rate) + print(f"Silent audio generated: {silent_path} ({duration:.2f}s)") + return silent_path + except Exception as e: + print(f"Error generating silent audio: {e}") + # Return None or raise exception? Returning None might hide issues. + # Let's return None and let the calling function handle it. + return None + def generate_tts(text, voice): """ Generate TTS audio using Kokoro, falling back to gTTS or silent audio if needed. + Uses global `selected_voice` and `voice_speed`. """ - safe_text = re.sub(r'[^\w\s-]', '', text[:10]).strip().replace(' ', '_') - file_path = os.path.join(TEMP_FOLDER, f"tts_{safe_text}.wav") + if not text: + print("Warning: Empty text received for TTS. Generating silence.") + # Estimate a short duration for empty text, e.g., 1 second + return generate_silent_audio(duration=1.0) - if os.path.exists(file_path): - print(f"Using cached TTS for text '{text[:10]}...'") - return file_path + # Sanitize text slightly for filename (limit length, basic chars) + safe_text_part = re.sub(r'[^\w-]', '', text[:15]).strip().replace(' ', '_') + if not safe_text_part: safe_text_part = f"tts_{int(time.time()*1000)}" + file_path = os.path.join(TEMP_FOLDER, f"{safe_text_part}.wav") + # Decide voice: Use global `selected_voice` if `voice` is the default 'en' + kokoro_voice_to_use = selected_voice if voice == 'en' else voice + print(f"Generating TTS for: '{text[:50]}...' (Voice: {kokoro_voice_to_use}, Speed: {voice_speed})") + + # --- Try Kokoro TTS --- try: - kokoro_voice = selected_voice if voice == 'en' else voice - generator = pipeline(text, voice=kokoro_voice, speed=voice_speed, split_pattern=r'\n+') + # Ensure pipeline is initialized + if pipeline is None: + raise ValueError("Kokoro pipeline is not initialized.") + + generator = pipeline(text, voice=kokoro_voice_to_use, speed=voice_speed, split_pattern=r'\n+') # Split on newlines if any audio_segments = [] + output_sample_rate = 24000 # Kokoro's default rate + for i, (gs, ps, audio) in enumerate(generator): - audio_segments.append(audio) + if audio is not None and audio.ndim > 0 and audio.size > 0: # Check if audio data is valid + # Ensure audio is float32, Kokoro might return different types + if audio.dtype != np.float32: + # Attempt conversion (e.g., from int16) + if audio.dtype == np.int16: + audio = audio.astype(np.float32) / 32768.0 + else: + print(f"Warning: Unexpected audio dtype {audio.dtype} from Kokoro. Trying direct use.") + # If unsure how to convert, might need to skip or handle specific cases + audio_segments.append(audio) + else: + print(f"Warning: Kokoro returned empty or invalid audio segment {i} for text.") + + if not audio_segments: + print("Error: Kokoro generated no valid audio segments.") + raise ValueError("No audio data from Kokoro") + + # Concatenate segments if needed full_audio = np.concatenate(audio_segments) if len(audio_segments) > 1 else audio_segments[0] - sf.write(file_path, full_audio, 24000) - print(f"TTS audio saved to {file_path} (Kokoro)") + + # Check final audio shape and content + if full_audio is None or full_audio.ndim == 0 or full_audio.size == 0: + print("Error: Final concatenated audio from Kokoro is invalid.") + raise ValueError("Invalid final audio data from Kokoro") + + # Check for NaN or Inf values + if np.isnan(full_audio).any() or np.isinf(full_audio).any(): + print("Error: Kokoro audio contains NaN or Inf values. Attempting to clean.") + full_audio = np.nan_to_num(full_audio) # Replace NaN with 0, Inf with large numbers + + # Normalize audio slightly to prevent clipping (optional) + max_val = np.max(np.abs(full_audio)) + if max_val > 1.0: + full_audio = full_audio / max_val * 0.98 + + sf.write(file_path, full_audio, output_sample_rate) + print(f"TTS audio saved: {file_path} (Kokoro)") return file_path - except Exception as e: - print(f"Error with Kokoro TTS: {e}") + + except Exception as e_kokoro: + print(f"Error with Kokoro TTS: {e_kokoro}. Trying gTTS fallback...") + + # --- Try gTTS Fallback --- try: - print("Falling back to gTTS...") - tts = gTTS(text=text, lang='en') - mp3_path = os.path.join(TEMP_FOLDER, f"tts_{safe_text}.mp3") + tts = gTTS(text=text, lang='en', slow= (voice_speed < 0.8) ) # Basic speed control approximation + # Save MP3 temporarily + mp3_path = os.path.join(TEMP_FOLDER, f"{safe_text_part}_gtts.mp3") tts.save(mp3_path) + + # Convert MP3 to WAV using pydub audio = AudioSegment.from_mp3(mp3_path) + # Export as WAV (pydub handles sample rate conversion if needed, defaults reasonable) audio.export(file_path, format="wav") - os.remove(mp3_path) - print(f"Fallback TTS saved to {file_path} (gTTS)") - return file_path - except Exception as fallback_error: - print(f"Both TTS methods failed: {fallback_error}") - return generate_silent_audio(duration=max(3, len(text.split()) * 0.5)) - -def apply_kenburns_effect(clip, target_resolution, effect_type=None): - """Apply a smooth Ken Burns effect with a single movement pattern.""" - target_w, target_h = target_resolution - clip_aspect = clip.w / clip.h - target_aspect = target_w / target_h - - if clip_aspect > target_aspect: - new_height = target_h - new_width = int(new_height * clip_aspect) - else: - new_width = target_w - new_height = int(new_width / clip_aspect) - - clip = clip.resize(newsize=(new_width, new_height)) - base_scale = 1.15 - new_width = int(new_width * base_scale) - new_height = int(new_height * base_scale) - clip = clip.resize(newsize=(new_width, new_height)) - - max_offset_x = new_width - target_w - max_offset_y = new_height - target_h - - available_effects = ["zoom-in", "zoom-out", "pan-left", "pan-right", "up-left"] - if effect_type is None or effect_type == "random": - effect_type = random.choice(available_effects) - - if effect_type == "zoom-in": - start_zoom = 0.9 - end_zoom = 1.1 - start_center = (new_width / 2, new_height / 2) - end_center = start_center - elif effect_type == "zoom-out": - start_zoom = 1.1 - end_zoom = 0.9 - start_center = (new_width / 2, new_height / 2) - end_center = start_center - elif effect_type == "pan-left": - start_zoom = 1.0 - end_zoom = 1.0 - start_center = (max_offset_x + target_w / 2, (max_offset_y // 2) + target_h / 2) - end_center = (target_w / 2, (max_offset_y // 2) + target_h / 2) - elif effect_type == "pan-right": - start_zoom = 1.0 - end_zoom = 1.0 - start_center = (target_w / 2, (max_offset_y // 2) + target_h / 2) - end_center = (max_offset_x + target_w / 2, (max_offset_y // 2) + target_h / 2) - elif effect_type == "up-left": - start_zoom = 1.0 - end_zoom = 1.0 - start_center = (max_offset_x + target_w / 2, max_offset_y + target_h / 2) - end_center = (target_w / 2, target_h / 2) - else: - raise ValueError(f"Unsupported effect_type: {effect_type}") - - def transform_frame(get_frame, t): - frame = get_frame(t) - ratio = t / clip.duration if clip.duration > 0 else 0 - ratio = 0.5 - 0.5 * math.cos(math.pi * ratio) - current_zoom = start_zoom + (end_zoom - start_zoom) * ratio - crop_w = int(target_w / current_zoom) - crop_h = int(target_h / current_zoom) - current_center_x = start_center[0] + (end_center[0] - start_center[0]) * ratio - current_center_y = start_center[1] + (end_center[1] - start_center[1]) * ratio - min_center_x = crop_w / 2 - max_center_x = new_width - crop_w / 2 - min_center_y = crop_h / 2 - max_center_y = new_height - crop_h / 2 - current_center_x = max(min_center_x, min(current_center_x, max_center_x)) - current_center_y = max(min_center_y, min(current_center_y, max_center_y)) - cropped_frame = cv2.getRectSubPix(frame, (crop_w, crop_h), (current_center_x, current_center_y)) - resized_frame = cv2.resize(cropped_frame, (target_w, target_h), interpolation=cv2.INTER_LANCZOS4) - return resized_frame - - return clip.fl(transform_frame) + + # Clean up temporary MP3 + if os.path.exists(mp3_path): + try: + os.remove(mp3_path) + except OSError: pass + + print(f"Fallback TTS saved: {file_path} (gTTS)") + # Check if the generated WAV file is valid + if os.path.exists(file_path) and os.path.getsize(file_path) > 100: # Basic size check + return file_path + else: + print(f"Error: gTTS generated an invalid or empty WAV file: {file_path}") + if os.path.exists(file_path): os.remove(file_path) + raise ValueError("gTTS output file invalid") + + except Exception as e_gtts: + print(f"Error with gTTS fallback: {e_gtts}. Generating silence.") + + # --- Generate Silence as final fallback --- + # Estimate duration based on text length if possible + estimated_duration = max(1.0, len(text.split()) * (0.6 / voice_speed)) # Rough estimate + return generate_silent_audio(duration=estimated_duration) + + +def apply_kenburns_effect(clip, target_resolution, effect_type="random"): + """Apply a smooth Ken Burns effect (zoom/pan) to an image clip.""" + try: + target_w, target_h = target_resolution + # Ensure clip has dimensions (might be None if error occurred) + if not hasattr(clip, 'w') or not hasattr(clip, 'h') or clip.w is None or clip.h is None or clip.w == 0 or clip.h == 0: + print("Error applying Ken Burns: Invalid clip dimensions.") + return clip # Return original clip + + clip_w, clip_h = clip.w, clip.h + clip_aspect = clip_w / clip_h + target_aspect = target_w / target_h + + # --- Resize to cover target area --- + if clip_aspect > target_aspect: + # Image is wider than target: Resize based on height + scale_factor = target_h / clip_h + resized_w = int(clip_w * scale_factor) + resized_h = target_h + else: + # Image is taller than target: Resize based on width + scale_factor = target_w / clip_w + resized_w = target_w + resized_h = int(clip_h * scale_factor) + + # Use LANCZOS for resizing images - better quality + clip = clip.resize(newsize=(resized_w, resized_h)) + + # --- Apply scale for zoom effect --- + # Scale slightly larger to allow for movement without showing edges + zoom_scale = 1.15 # How much larger the image is than the frame initially + zoomed_w = int(resized_w * zoom_scale) + zoomed_h = int(resized_h * zoom_scale) + clip = clip.resize(newsize=(zoomed_w, zoomed_h)) + + # --- Determine movement parameters --- + max_offset_x = max(0, zoomed_w - target_w) + max_offset_y = max(0, zoomed_h - target_h) + + available_effects = ["zoom-in", "zoom-out", "pan-left", "pan-right", "pan-up", "pan-down", "diag-tl-br", "diag-tr-bl"] + if effect_type == "random": + effect_type = random.choice(available_effects) + elif effect_type not in available_effects: + print(f"Warning: Unknown Ken Burns effect '{effect_type}'. Defaulting to zoom-in.") + effect_type = "zoom-in" + + print(f"Applying Ken Burns effect: {effect_type}") + + # Define start and end positions/zooms based on effect type + # Position is the center of the crop window relative to the zoomed image + center_x = zoomed_w / 2 + center_y = zoomed_h / 2 + start_pos = (center_x, center_y) + end_pos = (center_x, center_y) + start_zoom_factor = 1.0 # Relative to the base zoomed size + end_zoom_factor = 1.0 + + if effect_type == "zoom-in": + start_zoom_factor = 1.0 + end_zoom_factor = 1.0 / zoom_scale # Zoom in to fill the original zoomed size + elif effect_type == "zoom-out": + start_zoom_factor = 1.0 / zoom_scale + end_zoom_factor = 1.0 + elif effect_type == "pan-left": + start_pos = (center_x + max_offset_x / 2, center_y) + end_pos = (center_x - max_offset_x / 2, center_y) + elif effect_type == "pan-right": + start_pos = (center_x - max_offset_x / 2, center_y) + end_pos = (center_x + max_offset_x / 2, center_y) + elif effect_type == "pan-up": + start_pos = (center_x, center_y + max_offset_y / 2) + end_pos = (center_x, center_y - max_offset_y / 2) + elif effect_type == "pan-down": + start_pos = (center_x, center_y - max_offset_y / 2) + end_pos = (center_x, center_y + max_offset_y / 2) + elif effect_type == "diag-tl-br": # Top-Left to Bottom-Right + start_pos = (center_x - max_offset_x / 2, center_y - max_offset_y / 2) + end_pos = (center_x + max_offset_x / 2, center_y + max_offset_y / 2) + elif effect_type == "diag-tr-bl": # Top-Right to Bottom-Left + start_pos = (center_x + max_offset_x / 2, center_y - max_offset_y / 2) + end_pos = (center_x - max_offset_x / 2, center_y + max_offset_y / 2) + + + # --- Define the transformation function for moviepy's fl --- + def transform_frame(get_frame, t): + frame = get_frame(t) # Get the frame from the *zoomed* clip at time t + + # Smooth interpolation (cosine ease-in-out) + if clip.duration is None or clip.duration <= 0: + ratio = 0 + else: + ratio = t / clip.duration + ratio = 0.5 - 0.5 * math.cos(math.pi * ratio) # Ease in/out + + # Interpolate zoom and position + current_zoom_factor = start_zoom_factor + (end_zoom_factor - start_zoom_factor) * ratio + current_center_x = start_pos[0] + (end_pos[0] - start_pos[0]) * ratio + current_center_y = start_pos[1] + (end_pos[1] - start_pos[1]) * ratio + + # Calculate the size of the crop window in the zoomed image coordinates + # This needs to be target_w/h divided by the current zoom factor relative to the *original* target size + # The base zoom is `zoom_scale`, current relative zoom is `current_zoom_factor` + effective_zoom = zoom_scale * current_zoom_factor # This isn't quite right. Let's rethink. + + # --- Simpler approach: Define crop window size based on target --- + # The frame we get *is* the zoomed frame. We need to crop *from* it. + # The size of the window we cut *from the zoomed frame* needs to scale inversely with zoom? No. + + # Let's define the zoom based on the *final output size* relative to the *zoomed clip size*. + # If zoom_factor is 1.0, we crop target_w x target_h. + # If zoom_factor is < 1.0 (zoomed out), we crop a larger area and scale down. + # If zoom_factor is > 1.0 (zoomed in), we crop a smaller area and scale up. + + # Let's redefine start/end zoom based on the final *visual* zoom level. + # zoom_level = 1.0 means the final image fills the target resolution exactly. + # zoom_level = 1.1 means the final image is zoomed in by 10%. + + start_visual_zoom = 1.0 + end_visual_zoom = 1.0 + + if effect_type == "zoom-in": + start_visual_zoom = 1.0 + end_visual_zoom = zoom_scale # Zoom in to the max pre-zoom + elif effect_type == "zoom-out": + start_visual_zoom = zoom_scale + end_visual_zoom = 1.0 + # For pans, visual zoom stays constant at 1.0 + + current_visual_zoom = start_visual_zoom + (end_visual_zoom - start_visual_zoom) * ratio + + # Calculate crop window size based on the current visual zoom needed + crop_w = int(target_w / current_visual_zoom) + crop_h = int(target_h / current_visual_zoom) + + # Ensure the crop window isn't larger than the actual frame dimensions + crop_w = min(crop_w, zoomed_w) + crop_h = min(crop_h, zoomed_h) + + # Clamp the center position to prevent cropping outside the image bounds + min_center_x = crop_w / 2 + max_center_x = zoomed_w - crop_w / 2 + min_center_y = crop_h / 2 + max_center_y = zoomed_h - crop_h / 2 + + clamped_center_x = max(min_center_x, min(current_center_x, max_center_x)) + clamped_center_y = max(min_center_y, min(current_center_y, max_center_y)) + + # Use cv2.getRectSubPix for subpixel accuracy cropping + # Input frame should be numpy array + if not isinstance(frame, np.ndarray): + # This shouldn't happen if using ImageClip, but good check + print("Warning: Frame is not numpy array in Ken Burns transform.") + return frame # Or handle conversion + + # Ensure frame is contiguous C-style array if needed by cv2 + frame_contiguous = np.ascontiguousarray(frame) + + try: + cropped_frame = cv2.getRectSubPix(frame_contiguous, (crop_w, crop_h), (clamped_center_x, clamped_center_y)) + except cv2.error as e: + print(f"Error during cv2.getRectSubPix: {e}") + print(f" Frame shape: {frame_contiguous.shape}, dtype: {frame_contiguous.dtype}") + print(f" Crop size: ({crop_w}, {crop_h})") + print(f" Center: ({clamped_center_x}, {clamped_center_y})") + # Fallback: return uncropped frame, maybe resized + return cv2.resize(frame_contiguous, (target_w, target_h), interpolation=cv2.INTER_LINEAR) + + + # Resize the cropped frame to the target resolution + # Use LANCZOS4 for high quality resize + resized_frame = cv2.resize(cropped_frame, (target_w, target_h), interpolation=cv2.INTER_LANCZOS4) + + return resized_frame + + # Apply the transformation using moviepy's fl method + # ismask=False indicates we are transforming the color channels + # apply_to=['mask'] would apply only to mask if needed + return clip.fl(transform_frame, apply_to='mask') if clip.ismask else clip.fl(transform_frame) + + except Exception as e: + print(f"Error applying Ken Burns effect: {e}") + # Return the original clip (possibly resized to fill initially) if effect fails + return resize_to_fill(clip, target_resolution) # Fallback to simple resize/crop + def resize_to_fill(clip, target_resolution): - """Resize and crop a clip to fill the target resolution while maintaining aspect ratio.""" - target_w, target_h = target_resolution - clip_aspect = clip.w / clip.h - target_aspect = target_w / target_h - - if clip_aspect > target_aspect: - clip = clip.resize(height=target_h) - crop_amount = (clip.w - target_w) / 2 - clip = clip.crop(x1=crop_amount, x2=clip.w - crop_amount, y1=0, y2=clip.h) - else: - clip = clip.resize(width=target_w) - crop_amount = (clip.h - target_h) / 2 - clip = clip.crop(x1=0, x2=clip.w, y1=crop_amount, y2=clip.h - crop_amount) + """Resize and crop a clip (video or image) to fill the target resolution, maintaining aspect ratio.""" + try: + target_w, target_h = target_resolution + if not hasattr(clip, 'size') or clip.size is None or clip.w is None or clip.h is None or clip.w == 0 or clip.h == 0: + print(f"Error: Cannot resize clip with invalid dimensions: size={getattr(clip, 'size', 'N/A')}") + # Return a dummy clip or raise error? Let's return None to signal failure. + # Or maybe return a black clip of target size? + # For now, let's try to return the original clip, maybe it recovers later. + print("Returning original clip due to resize error.") + return clip + + clip_w, clip_h = clip.w, clip.h + clip_aspect = clip_w / clip_h + target_aspect = target_w / target_h + + if clip_aspect > target_aspect: + # Clip is wider than target: Resize based on height, crop width + new_h = target_h + scale_factor = new_h / clip_h + new_w = int(clip_w * scale_factor) + clip_resized = clip.resize(height=new_h) # Moviepy handles width automatically if height is set + + # Calculate cropping amounts (ensure they are integers) + crop_amount = int((new_w - target_w) / 2) + if crop_amount < 0: crop_amount = 0 # Avoid negative crop + + # Ensure crop doesn't exceed bounds + x1 = crop_amount + x2 = new_w - crop_amount + # Adjust if rounding caused issues + if x2 > new_w: x2 = new_w + if x1 >= x2: x1 = 0; x2 = target_w # Fallback if crop is invalid + + clip_cropped = clip_resized.crop(x1=x1, width=target_w, y1=0, height=target_h) # Use width/height args for crop + + elif clip_aspect < target_aspect: + # Clip is taller than target: Resize based on width, crop height + new_w = target_w + scale_factor = new_w / clip_w + new_h = int(clip_h * scale_factor) + clip_resized = clip.resize(width=new_w) # Moviepy handles height automatically + + crop_amount = int((new_h - target_h) / 2) + if crop_amount < 0: crop_amount = 0 + + y1 = crop_amount + y2 = new_h - crop_amount + if y2 > new_h: y2 = new_h + if y1 >= y2: y1 = 0; y2 = target_h + + clip_cropped = clip_resized.crop(y1=y1, height=target_h, x1=0, width=target_w) # Use width/height args for crop + else: + # Aspect ratios match: Just resize + clip_cropped = clip.resize(newsize=(target_w, target_h)) + + # Final check on dimensions + if clip_cropped.w != target_w or clip_cropped.h != target_h: + print(f"Warning: resize_to_fill resulted in unexpected dimensions ({clip_cropped.w}x{clip_cropped.h}). Attempting final resize.") + return clip_cropped.resize(newsize=(target_w, target_h)) + + return clip_cropped + + except Exception as e: + print(f"Error in resize_to_fill: {e}") + print(f"Clip info: duration={getattr(clip, 'duration', 'N/A')}, size={getattr(clip, 'size', 'N/A')}") + # Fallback: Try a simple resize without cropping if complex logic failed + try: + return clip.resize(newsize=target_resolution) + except Exception as e_resize: + print(f"Fallback resize also failed: {e_resize}") + # Return original clip as last resort + return clip - return clip def find_mp3_files(): """Search for any MP3 files in the current directory and subdirectories.""" + # This function is no longer used as music is uploaded via Gradio and copied to "music.mp3" + # Keeping it here for potential future use or reference. mp3_files = [] - for root, dirs, files in os.walk('.'): - for file in files: - if file.endswith('.mp3'): - mp3_path = os.path.join(root, file) - mp3_files.append(mp3_path) - print(f"Found MP3 file: {mp3_path}") - return mp3_files[0] if mp3_files else None + try: + for root, dirs, files in os.walk('.'): + for file in files: + if file.lower().endswith('.mp3'): + mp3_path = os.path.join(root, file) + mp3_files.append(mp3_path) + print(f"Found MP3 file: {mp3_path}") + return mp3_files[0] if mp3_files else None + except Exception as e: + print(f"Error searching for MP3 files: {e}") + return None def add_background_music(final_video, bg_music_volume=0.10): - """Add background music to the final video using any MP3 file found.""" + """Add background music using 'music.mp3' if it exists.""" try: + # Expect the music file to be named 'music.mp3' in the current directory bg_music_path = "music.mp3" - if bg_music_path and os.path.exists(bg_music_path): + if os.path.exists(bg_music_path) and os.path.getsize(bg_music_path) > 100: print(f"Adding background music from: {bg_music_path}") bg_music = AudioFileClip(bg_music_path) - if bg_music.duration < final_video.duration: - loops_needed = math.ceil(final_video.duration / bg_music.duration) - bg_segments = [bg_music] * loops_needed - bg_music = concatenate_audioclips(bg_segments) - bg_music = bg_music.subclip(0, final_video.duration) - bg_music = bg_music.volumex(bg_music_volume) - video_audio = final_video.audio - mixed_audio = CompositeAudioClip([video_audio, bg_music]) - final_video = final_video.set_audio(mixed_audio) - print("Background music added successfully") + + # Ensure video has audio track to mix with + if final_video.audio is None: + print("Warning: Video has no primary audio track. Adding only background music.") + # Create silent audio matching video duration if needed + if bg_music.duration < final_video.duration: + loops_needed = math.ceil(final_video.duration / bg_music.duration) + bg_music = concatenate_audioclips([bg_music] * loops_needed) + final_audio = bg_music.subclip(0, final_video.duration).volumex(bg_music_volume) + else: + # Loop or trim background music to match video duration + if bg_music.duration < final_video.duration: + loops_needed = math.ceil(final_video.duration / bg_music.duration) + # Check if looping is feasible + if loops_needed > 100: # Avoid excessive looping + print(f"Warning: Background music is very short ({bg_music.duration:.1f}s) compared to video ({final_video.duration:.1f}s). Looping capped.") + loops_needed = 100 + bg_segments = [bg_music] * int(loops_needed) + try: + bg_music_looped = concatenate_audioclips(bg_segments) + except Exception as e_concat: + print(f"Error concatenating audio for looping: {e_concat}. Using single instance.") + bg_music_looped = bg_music # Fallback to single instance + bg_music = bg_music_looped + + # Trim precisely to video duration + bg_music = bg_music.subclip(0, final_video.duration) + + # Apply volume adjustment + bg_music = bg_music.volumex(bg_music_volume) + + # Mix audio tracks + video_audio = final_video.audio + # Ensure both clips have the same duration before compositing + if abs(video_audio.duration - bg_music.duration) > 0.1: + print(f"Warning: Audio duration mismatch before mixing (Vid: {video_audio.duration:.2f}s, BG: {bg_music.duration:.2f}s). Adjusting BG music.") + bg_music = bg_music.set_duration(video_audio.duration) + + mixed_audio = CompositeAudioClip([video_audio, bg_music]) + final_audio = mixed_audio + + # Set the composite audio to the video + final_video = final_video.set_audio(final_audio) + print(f"Background music added successfully (Volume: {bg_music_volume:.2f})") else: - print("No MP3 files found, skipping background music") + print("Background music file 'music.mp3' not found or is empty. Skipping background music.") return final_video except Exception as e: print(f"Error adding background music: {e}") - print("Continuing without background music") - return final_video + print("Continuing without background music.") + # Return the video without the potentially failed audio modification + return final_video.set_audio(final_video.audio) # Ensure audio is reset if it failed mid-process + +# --- NEW create_clip Function --- def create_clip(media_path, asset_type, tts_path, duration=None, effects=None, narration_text=None, segment_index=0): """Create a video clip with synchronized subtitles and narration.""" try: - print(f"Creating clip #{segment_index} with asset_type: {asset_type}, media_path: {media_path}") - if not os.path.exists(media_path) or not os.path.exists(tts_path): - print("Missing media or TTS file") + print(f"--- Creating Clip #{segment_index+1} ---") + print(f" Media: {asset_type} at {os.path.basename(media_path)}") + print(f" TTS: {os.path.basename(tts_path)}") + print(f" Narration: '{narration_text[:50]}...'") + + if not media_path or not os.path.exists(media_path) or os.path.getsize(media_path) < 100: + print(f"Error: Invalid or missing media file: {media_path}") return None + if not tts_path or not os.path.exists(tts_path) or os.path.getsize(tts_path) < 100: + print(f"Error: Invalid or missing TTS file: {tts_path}") + # Attempt to use silent audio as fallback? + print("Attempting to generate silent audio as fallback.") + # Use the estimated duration from parse_script if available + fallback_duration = duration if duration else 3.0 + tts_path = generate_silent_audio(fallback_duration) + if not tts_path: + print("Error: Failed to generate fallback silent audio. Cannot create clip.") + return None # Critical failure if no audio + + # Load audio first to get accurate duration + try: + audio_clip = AudioFileClip(tts_path) + # Apply slight fade out to prevent abrupt cuts + audio_clip = audio_clip.audio_fadeout(0.1) + audio_duration = audio_clip.duration + if audio_duration <= 0.1: # Check for very short/empty audio + print(f"Warning: Audio duration is very short ({audio_duration:.2f}s). Adjusting target duration.") + audio_duration = max(audio_duration, 1.0) # Ensure at least 1s duration + except Exception as e: + print(f"Error loading audio file {tts_path}: {e}") + print("Using estimated duration and generating silence.") + audio_duration = duration if duration else 3.0 + silent_audio_path = generate_silent_audio(audio_duration) + if not silent_audio_path: return None # Cannot proceed without audio + audio_clip = AudioFileClip(silent_audio_path) + + + # Add a small buffer to the target duration for visuals + target_duration = audio_duration + 0.2 # e.g., 0.2s buffer + + print(f" Audio Duration: {audio_duration:.2f}s, Target Visual Duration: {target_duration:.2f}s") + + # Create base visual clip (video or image) + clip = None + if asset_type == "video": + try: + clip = VideoFileClip(media_path) + # Ensure video duration is sufficient, loop/subclip as needed + if clip.duration < target_duration: + print(f" Looping video (duration {clip.duration:.2f}s) to match target {target_duration:.2f}s") + # Use loop method carefully, might cause issues if duration is very short + # Alternative: freeze last frame? For now, loop. + clip = clip.loop(duration=target_duration) + else: + # Start from beginning, take required duration + clip = clip.subclip(0, target_duration) + + # Resize/crop video to fill target resolution *after* duration adjustment + clip = resize_to_fill(clip, TARGET_RESOLUTION) + # Apply fade-in/out to video clips too + clip = clip.fadein(0.3).fadeout(0.3) + + except Exception as e: + print(f"Error processing video file {media_path}: {e}") + # Fallback to generating a color background if video fails + fallback_media = generate_media("abstract", current_index=segment_index, total_segments=0) # Use a simple fallback + if fallback_media and fallback_media['asset_type'] == 'image': + print("Falling back to generated image due to video error.") + asset_type = 'image' + media_path = fallback_media['path'] + else: + print("ERROR: Video processing failed, and fallback media generation failed.") + return None # Cannot proceed - audio_clip = AudioFileClip(tts_path).audio_fadeout(0.2) - audio_duration = audio_clip.duration - target_duration = audio_duration + 0.2 + # This needs to handle the case where video processing failed and fell back to image + if asset_type == "image": + try: + # Check image validity again before creating ImageClip + img = Image.open(media_path) + img.verify() + img.close() # Close after verify - if asset_type == "video": - clip = VideoFileClip(media_path) - clip = resize_to_fill(clip, TARGET_RESOLUTION) - if clip.duration < target_duration: - clip = clip.loop(duration=target_duration) - else: - clip = clip.subclip(0, target_duration) - elif asset_type == "image": - img = Image.open(media_path) - if img.mode != 'RGB': - with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as temp: - img.convert('RGB').save(temp.name) - media_path = temp.name - img.close() - clip = ImageClip(media_path).set_duration(target_duration) - clip = apply_kenburns_effect(clip, TARGET_RESOLUTION) - clip = clip.fadein(0.3).fadeout(0.3) - else: + # Create ImageClip and set duration + clip = ImageClip(media_path).set_duration(target_duration) + + # Apply Ken Burns effect (which includes resizing) + clip = apply_kenburns_effect(clip, TARGET_RESOLUTION, effect_type=effects or "random") # Use specified or random effect + + # Apply fade-in/out (Ken Burns function doesn't handle this) + clip = clip.fadein(0.3).fadeout(0.3) + + except Exception as e: + print(f"Error processing image file {media_path}: {e}") + return None # Fail if image processing has critical error + + if clip is None: + print("Error: Visual clip (video or image) could not be created.") return None - if narration_text and CAPTION_COLOR != "transparent": + # --- SUBTITLE GENERATION START --- + if narration_text and CAPTION_COLOR != "transparent" and audio_duration > 0.1: # Avoid captions on silent/very short clips + print(f" Adding Captions (Color: {CAPTION_COLOR}, Size: {font_size})") + subtitle_clips = [] try: words = narration_text.split() - chunks = [] - current_chunk = [] - for word in words: - current_chunk.append(word) - if len(current_chunk) >= 5: - chunks.append(' '.join(current_chunk)) - current_chunk = [] - if current_chunk: - chunks.append(' '.join(current_chunk)) - - chunk_duration = audio_duration / len(chunks) - subtitle_clips = [] - subtitle_y_position = int(TARGET_RESOLUTION[1] * 0.70) - - for i, chunk_text in enumerate(chunks): - start_time = i * chunk_duration - end_time = (i + 1) * chunk_duration + # Dynamic chunking: aim for ~3-7 words per chunk, max ~3 seconds per chunk + max_words_per_chunk = 6 + min_words_per_chunk = 3 + max_duration_per_chunk = 3.0 # seconds + + chunks_data = [] # Stores (text, start_time, end_time) + current_chunk_words = [] + current_chunk_start_time = 0.0 + approx_time_per_word = audio_duration / len(words) if words else 0 + + for i, word in enumerate(words): + current_chunk_words.append(word) + current_word_end_time = current_chunk_start_time + len(current_chunk_words) * approx_time_per_word + + # Check if chunk should end + time_condition = (current_word_end_time - current_chunk_start_time) >= max_duration_per_chunk + word_count_condition = len(current_chunk_words) >= max_words_per_chunk + is_last_word = (i == len(words) - 1) + + # End chunk if time/word limit reached, or if it's the last word + # Ensure minimum word count unless it's the last segment + if ( (time_condition or word_count_condition) and len(current_chunk_words) >= min_words_per_chunk ) or is_last_word: + chunk_text = ' '.join(current_chunk_words) + # Ensure end time doesn't exceed total audio duration + chunk_end_time = min(current_word_end_time, audio_duration) + # Prevent zero-duration chunks + if chunk_end_time > current_chunk_start_time: + chunks_data.append((chunk_text, current_chunk_start_time, chunk_end_time)) + # Prepare for next chunk + current_chunk_start_time = chunk_end_time + current_chunk_words = [] + + # If loop finished but last chunk wasn't added (e.g., few words left) + if current_chunk_words: + chunk_text = ' '.join(current_chunk_words) + chunk_end_time = audio_duration # Last chunk goes to the end + if chunk_end_time > current_chunk_start_time: + chunks_data.append((chunk_text, current_chunk_start_time, chunk_end_time)) + + + # Calculate subtitle position (e.g., 80% down the screen) + subtitle_y_position = int(TARGET_RESOLUTION[1] * 0.80) # Lower position + + # Create TextClip for each chunk + for chunk_text, start_time, end_time in chunks_data: + # Ensure duration is positive + chunk_duration = end_time - start_time + if chunk_duration <= 0.05: # Skip tiny duration chunks + continue + + try: + # Use global font_size here + txt_clip = TextClip( + txt=chunk_text, + fontsize=font_size, # Use global variable + font='Arial-Bold', # Consider making font configurable? + color=CAPTION_COLOR, + bg_color='rgba(0, 0, 0, 0.4)', # Slightly darker background + method='caption', # Handles word wrapping + align='center', + stroke_color='black', # Black stroke for better contrast + stroke_width=1.5, + # Adjust size: 85% of width, height automatic + size=(TARGET_RESOLUTION[0] * 0.85, None) + ).set_start(start_time).set_duration(chunk_duration) # Use duration + + # Position the text clip + txt_clip = txt_clip.set_position(('center', subtitle_y_position)) + subtitle_clips.append(txt_clip) + except Exception as e_textclip: + # Handle potential errors from TextClip generation (e.g., font not found) + print(f"Error creating TextClip for chunk '{chunk_text}': {e_textclip}") + # Optionally add a fallback simple text clip here if needed + + # Overlay the list of subtitle clips onto the main video/image clip + if subtitle_clips: + clip = CompositeVideoClip([clip] + subtitle_clips) + else: + print("Warning: No subtitle clips were generated despite text being present.") + + except Exception as sub_error: + # Fallback: If complex chunking/styling fails, display the whole text simply + print(f"Subtitle generation error: {sub_error}. Using fallback simple text.") + try: txt_clip = TextClip( - chunk_text, - fontsize=45, - font='Arial-Bold', + narration_text, + fontsize=int(font_size * 0.8), # Slightly smaller for full text color=CAPTION_COLOR, - bg_color='rgba(0, 0, 0, 0.25)', - method='caption', + font='Arial', # Simpler font for fallback align='center', - stroke_width=2, - stroke_color=CAPTION_COLOR, - size=(TARGET_RESOLUTION[0] * 0.8, None) - ).set_start(start_time).set_end(end_time) - txt_clip = txt_clip.set_position(('center', subtitle_y_position)) - subtitle_clips.append(txt_clip) - - clip = CompositeVideoClip([clip] + subtitle_clips) - except Exception as sub_error: - print(f"Subtitle error: {sub_error}") - txt_clip = TextClip( - narration_text, - fontsize=font_size, - color=CAPTION_COLOR, - align='center', - size=(TARGET_RESOLUTION[0] * 0.7, None) - ).set_position(('center', int(TARGET_RESOLUTION[1] / 3))).set_duration(clip.duration) - clip = CompositeVideoClip([clip, txt_clip]) - + method='caption', + bg_color='rgba(0, 0, 0, 0.4)', + size=(TARGET_RESOLUTION[0] * 0.8, None) # Max width + ).set_position(('center', subtitle_y_position)).set_duration(clip.duration) # Show for full clip duration + # Overlay the single fallback text clip + clip = CompositeVideoClip([clip, txt_clip]) + except Exception as e_fallback_text: + print(f"Error creating fallback TextClip: {e_fallback_text}") + # Proceed without captions if fallback also fails + + # --- SUBTITLE GENERATION END --- + + # Set the audio track to the final clip clip = clip.set_audio(audio_clip) - print(f"Clip created: {clip.duration:.1f}s") + + # Final duration check/adjustment (optional but good practice) + if abs(clip.duration - target_duration) > 0.1: + print(f"Warning: Final clip duration ({clip.duration:.2f}s) differs from target ({target_duration:.2f}s). Adjusting.") + clip = clip.set_duration(target_duration) + + + print(f"--- Clip #{segment_index+1} created successfully (Duration: {clip.duration:.2f}s) ---") return clip + except Exception as e: - print(f"Error in create_clip: {str(e)}") - return None + print(f"*************** FATAL ERROR in create_clip (Segment {segment_index+1}) ***************") + import traceback + traceback.print_exc() # Print detailed traceback + print(f"Error details: {str(e)}") + print(f" Media Path: {media_path}") + print(f" TTS Path: {tts_path}") + print(f" Asset Type: {asset_type}") + print("**************************************************************************") + return None # Return None on failure + def fix_imagemagick_policy(): - """Fix ImageMagick security policies.""" + """Attempts to fix ImageMagick security policies on Linux systems.""" + # This is often needed for TextClip with complex features (backgrounds, strokes) on Colab/Linux. + # It might require sudo privileges. + policy_fixed = False try: - print("Attempting to fix ImageMagick security policies...") + print("Attempting to fix ImageMagick security policies (may require sudo)...") + # Common paths for ImageMagick policy files policy_paths = [ "/etc/ImageMagick-6/policy.xml", "/etc/ImageMagick-7/policy.xml", "/etc/ImageMagick/policy.xml", - "/usr/local/etc/ImageMagick-7/policy.xml" + "/usr/local/etc/ImageMagick-7/policy.xml", + # Add other potential paths if needed ] - found_policy = next((path for path in policy_paths if os.path.exists(path)), None) - if not found_policy: - print("No policy.xml found. Using alternative subtitle method.") - return False - print(f"Modifying policy file at {found_policy}") - os.system(f"sudo cp {found_policy} {found_policy}.bak") - os.system(f"sudo sed -i 's/rights=\"none\"/rights=\"read|write\"/g' {found_policy}") - os.system(f"sudo sed -i 's/]*>/]*>//g' {found_policy}") - print("ImageMagick policies updated successfully.") - return True - except Exception as e: - print(f"Error fixing policies: {e}") - return False - - - - - - - - - - - - - - - - - - - - - + found_policy = None + for path in policy_paths: + if os.path.exists(path): + found_policy = path + break + if not found_policy: + print("ImageMagick policy.xml not found in common locations. Skipping policy fix.") + print("TextClip features might be limited if default policies are restrictive.") + return False # Indicate policy wasn't found/fixed + + print(f"Found policy file: {found_policy}. Attempting to modify...") + + # Commands to relax restrictions (use with caution) + # Backup the original file first + backup_cmd = f"sudo cp {found_policy} {found_policy}.bak" + # Allow read/write for common formats (including text/caption) + sed_cmd_rights = f"sudo sed -i 's/rights=\"none\" pattern=\"PS\"/rights=\"read|write\" pattern=\"PS\"/' {found_policy}; " \ + f"sudo sed -i 's/rights=\"none\" pattern=\"EPS\"/rights=\"read|write\" pattern=\"EPS\"/' {found_policy}; " \ + f"sudo sed -i 's/rights=\"none\" pattern=\"PDF\"/rights=\"read|write\" pattern=\"PDF\"/' {found_policy}; " \ + f"sudo sed -i 's/rights=\"none\" pattern=\"XPS\"/rights=\"read|write\" pattern=\"XPS\"/' {found_policy}; " \ + f"sudo sed -i 's/rights=\"none\" pattern=\"LABEL\"/rights=\"read|write\" pattern=\"LABEL\"/' {found_policy}; " \ + f"sudo sed -i 's/rights=\"none\" pattern=\"caption\"/rights=\"read|write\" pattern=\"caption\"/' {found_policy}; " \ + f"sudo sed -i 's/rights=\"none\" pattern=\"TEXT\"/rights=\"read|write\" pattern=\"TEXT\"/' {found_policy}" + # Allow read/write for path operations (needed for fonts, temp files) + sed_cmd_path = f"sudo sed -i 's///' {found_policy}" + + + print("Executing policy modification commands (requires sudo)...") + backup_status = os.system(backup_cmd) + if backup_status == 0: + print("Policy file backed up.") + rights_status = os.system(sed_cmd_rights) + path_status = os.system(sed_cmd_path) + + if rights_status == 0 and path_status == 0: + print("ImageMagick policies potentially updated successfully.") + policy_fixed = True + else: + print("Error executing policy modification commands. Check sudo permissions and sed syntax.") + else: + print("Error backing up policy file. Aborting modifications.") + # Optional: Restart services if needed (usually not required just for policy changes) + # os.system("sudo systemctl restart imagemagick") # Example + return policy_fixed + except Exception as e: + print(f"Error occurred during ImageMagick policy fix: {e}") + return False # ---------------- Main Video Generation Function ---------------- # def generate_video(user_input, resolution, caption_option): - """Generate a video based on user input via Gradio.""" + """Generate a video based on user input via Gradio. Uses global settings.""" global TARGET_RESOLUTION, CAPTION_COLOR, TEMP_FOLDER - + + start_time = time.time() + print("\n=============================================") + print("======= STARTING VIDEO GENERATION =======") + print(f" Concept: '{user_input[:100]}...'") + print(f" Resolution: {resolution}") + print(f" Captions: {caption_option}") + print(f" Voice: {selected_voice} (Speed: {voice_speed})") + print(f" BG Music Vol: {bg_music_volume}, FPS: {fps}, Preset: {preset}") + print(f" Video Clip Prob: {video_clip_probability*100}%, Caption Size: {font_size}") + print("=============================================\n") + + + # --- Setup --- + if not OPENROUTER_API_KEY or not PEXELS_API_KEY: + print("ERROR: API keys (OpenRouter or Pexels) are missing!") + # Gradio doesn't handle exceptions well, return None or error message? + # For now, print and return None. Consider adding gr.Error later. + return None + # Set resolution - if resolution == "Full": + if resolution == "Full": # 16:9 Landscape TARGET_RESOLUTION = (1920, 1080) - elif resolution == "Short": + elif resolution == "Short": # 9:16 Portrait TARGET_RESOLUTION = (1080, 1920) else: - TARGET_RESOLUTION = (1920, 1080) # Default + print(f"Warning: Unknown resolution '{resolution}'. Defaulting to Full HD (1920x1080).") + TARGET_RESOLUTION = (1920, 1080) - # Set caption color + # Set caption color based on user choice CAPTION_COLOR = "white" if caption_option == "Yes" else "transparent" - # Create a unique temporary folder - TEMP_FOLDER = tempfile.mkdtemp() + # Create a unique temporary folder for this run + try: + TEMP_FOLDER = tempfile.mkdtemp() + print(f"Temporary folder created: {TEMP_FOLDER}") + except Exception as e: + print(f"FATAL ERROR: Could not create temporary folder: {e}") + return None # Cannot proceed without temp folder - # Fix ImageMagick policy + # Fix ImageMagick policy (important for captions) fix_success = fix_imagemagick_policy() if not fix_success: - print("Will use alternative methods if needed") + print("Continuing without guaranteed ImageMagick policy fix. Captions might have issues.") + # --- End Setup --- + - print("Generating script from API...") + # --- Script Generation --- + print("\n--- Generating Script ---") script = generate_script(user_input) if not script: - print("Failed to generate script.") - shutil.rmtree(TEMP_FOLDER) + print("FATAL ERROR: Failed to generate script from API.") + shutil.rmtree(TEMP_FOLDER) # Clean up temp folder on failure return None - print("Generated Script:\n", script) + print("Generated Script:\n", script) # Print the full script for debugging + # --- End Script Generation --- + + + # --- Script Parsing --- + print("\n--- Parsing Script ---") elements = parse_script(script) if not elements: - print("Failed to parse script into elements.") + print("FATAL ERROR: Failed to parse script into elements. Check script format and parsing logic.") shutil.rmtree(TEMP_FOLDER) return None - print(f"Parsed {len(elements)//2} script segments.") + num_segments = len(elements) // 2 + print(f"Parsed {num_segments} script segments.") + if num_segments == 0: + print("Warning: Script parsed into 0 segments. No video will be generated.") + shutil.rmtree(TEMP_FOLDER) + return None + # --- End Script Parsing --- + + # --- Pair Elements (Media + TTS) --- paired_elements = [] for i in range(0, len(elements), 2): - if i + 1 < len(elements): + if i + 1 < len(elements) and elements[i]['type'] == 'media' and elements[i+1]['type'] == 'tts': paired_elements.append((elements[i], elements[i + 1])) + else: + print(f"Warning: Skipping invalid element pair at index {i}. Expected media then tts.") if not paired_elements: - print("No valid script segments found.") + print("FATAL ERROR: No valid media-tts pairs found after parsing.") shutil.rmtree(TEMP_FOLDER) return None + # --- End Pairing --- + + # --- Clip Creation Loop --- + print("\n--- Creating Individual Clips ---") clips = [] + successful_clips = 0 for idx, (media_elem, tts_elem) in enumerate(paired_elements): - print(f"\nProcessing segment {idx+1}/{len(paired_elements)} with prompt: '{media_elem['prompt']}'") - media_asset = generate_media(media_elem['prompt'], current_index=idx, total_segments=len(paired_elements)) - if not media_asset: - print(f"Skipping segment {idx+1} due to missing media asset.") - continue + print(f"\n>>> Processing Segment {idx+1}/{len(paired_elements)}: Prompt '{media_elem.get('prompt', 'N/A')}'") + + # 1. Generate Media Asset + media_asset = generate_media( + media_elem['prompt'], + current_index=idx, + total_segments=len(paired_elements) + ) + if not media_asset or not media_asset.get('path'): + print(f"ERROR: Failed to generate media for segment {idx+1}. Skipping segment.") + continue # Skip this segment + + # 2. Generate TTS tts_path = generate_tts(tts_elem['text'], tts_elem['voice']) if not tts_path: - print(f"Skipping segment {idx+1} due to TTS generation failure.") - continue + print(f"ERROR: Failed to generate TTS for segment {idx+1}. Skipping segment.") + # Clean up the potentially downloaded media asset if TTS failed + if media_asset and os.path.exists(media_asset['path']): + try: os.remove(media_asset['path']) + except OSError: pass + continue # Skip this segment + + # 3. Create the Clip (Visual + Audio + Subtitles) clip = create_clip( media_path=media_asset['path'], asset_type=media_asset['asset_type'], tts_path=tts_path, - duration=tts_elem['duration'], - effects=media_elem.get('effects', 'fade-in'), + duration=tts_elem.get('duration'), # Pass estimated duration for potential fallback + effects=media_elem.get('effects'), narration_text=tts_elem['text'], segment_index=idx ) + if clip: - clips.append(clip) + # Validate clip duration and dimensions before adding + if clip.duration > 0 and clip.w == TARGET_RESOLUTION[0] and clip.h == TARGET_RESOLUTION[1]: + clips.append(clip) + successful_clips += 1 + print(f">>> Segment {idx+1} processed successfully.") + else: + print(f"ERROR: Clip for segment {idx+1} has invalid duration ({clip.duration}) or dimensions ({clip.w}x{clip.h}). Skipping.") + # Clean up resources associated with the failed clip + clip.close() # Close moviepy resources if possible + # Files in TEMP_FOLDER will be cleaned later, no need to delete individually here unless necessary else: - print(f"Clip creation failed for segment {idx+1}.") + print(f"ERROR: Clip creation failed for segment {idx+1}. See errors above.") + # Files in TEMP_FOLDER will be cleaned later + # --- End Clip Creation Loop --- + + + # --- Final Video Assembly --- if not clips: - print("No clips were successfully created.") + print("\nFATAL ERROR: No clips were successfully created. Cannot generate video.") shutil.rmtree(TEMP_FOLDER) return None - print("\nConcatenating clips...") - final_video = concatenate_videoclips(clips, method="compose") - final_video = add_background_music(final_video, bg_music_volume=bg_music_volume) + print(f"\n--- Assembling Final Video ({len(clips)} clips) ---") + try: + # Concatenate clips + final_video = concatenate_videoclips(clips, method="compose") # 'compose' is generally safer + print(f"Clips concatenated. Total duration before music: {final_video.duration:.2f}s") + + # Add background music + final_video = add_background_music(final_video, bg_music_volume=bg_music_volume) + + # Write the final video file + print(f"Exporting final video to '{OUTPUT_VIDEO_FILENAME}' (FPS: {fps}, Preset: {preset})...") + # Use threads based on CPU count? Maybe default is fine. logger='bar' for progress bar + final_video.write_videofile( + OUTPUT_VIDEO_FILENAME, + codec='libx264', # Common, good quality codec + audio_codec='aac', # Common audio codec + fps=fps, + preset=preset, # Controls encoding speed vs compression + threads=4, # Use multiple threads if available + logger='bar' # Show progress bar + ) + print(f"Final video saved successfully as {OUTPUT_VIDEO_FILENAME}") + + # Close clips to release resources + for clip in clips: + clip.close() + final_video.close() - print(f"Exporting final video to {OUTPUT_VIDEO_FILENAME}...") - final_video.write_videofile(OUTPUT_VIDEO_FILENAME, codec='libx264', fps=fps, preset=preset) - print(f"Final video saved as {OUTPUT_VIDEO_FILENAME}") + except Exception as e: + print(f"FATAL ERROR during final video assembly or writing: {e}") + import traceback + traceback.print_exc() + shutil.rmtree(TEMP_FOLDER) + return None + # --- End Final Video Assembly --- + + + # --- Cleanup --- + print("\n--- Cleaning Up Temporary Files ---") + try: + shutil.rmtree(TEMP_FOLDER) + print(f"Temporary folder removed: {TEMP_FOLDER}") + except Exception as e: + print(f"Warning: Could not remove temporary folder {TEMP_FOLDER}: {e}") + # --- End Cleanup --- - # Clean up - print("Cleaning up temporary files...") - shutil.rmtree(TEMP_FOLDER) - print("Temporary files removed.") + end_time = time.time() + total_time = end_time - start_time + print("\n=============================================") + print("======= VIDEO GENERATION COMPLETE =======") + print(f" Total time: {total_time:.2f} seconds") + print("=============================================\n") + # Return the path to the generated video file return OUTPUT_VIDEO_FILENAME -# ---------------- Gradio Interface ---------------- # + +# ---------------- Gradio Interface Setup ---------------- # + +# Dictionary mapping user-friendly names to Kokoro voice IDs VOICE_CHOICES = { - 'Emma (Female)': 'af_heart', - 'Bella (Female)': 'af_bella', - 'Nicole (Female)': 'af_nicole', - 'Aoede (Female)': 'af_aoede', - 'Kore (Female)': 'af_kore', - 'Sarah (Female)': 'af_sarah', - 'Nova (Female)': 'af_nova', - 'Sky (Female)': 'af_sky', - 'Alloy (Female)': 'af_alloy', - 'Jessica (Female)': 'af_jessica', - 'River (Female)': 'af_river', - 'Michael (Male)': 'am_michael', - 'Fenrir (Male)': 'am_fenrir', - 'Puck (Male)': 'am_puck', - 'Echo (Male)': 'am_echo', - 'Eric (Male)': 'am_eric', - 'Liam (Male)': 'am_liam', - 'Onyx (Male)': 'am_onyx', - 'Santa (Male)': 'am_santa', - 'Adam (Male)': 'am_adam', - 'Emma 🇬🇧 (Female)': 'bf_emma', - 'Isabella 🇬🇧 (Female)': 'bf_isabella', - 'Alice 🇬🇧 (Female)': 'bf_alice', - 'Lily 🇬🇧 (Female)': 'bf_lily', - 'George 🇬🇧 (Male)': 'bm_george', - 'Fable 🇬🇧 (Male)': 'bm_fable', - 'Lewis 🇬🇧 (Male)': 'bm_lewis', - 'Daniel 🇬🇧 (Male)': 'bm_daniel' + 'Emma (Female)': 'af_heart', 'Bella (Female)': 'af_bella', 'Nicole (Female)': 'af_nicole', + 'Aoede (Female)': 'af_aoede', 'Kore (Female)': 'af_kore', 'Sarah (Female)': 'af_sarah', + 'Nova (Female)': 'af_nova', 'Sky (Female)': 'af_sky', 'Alloy (Female)': 'af_alloy', + 'Jessica (Female)': 'af_jessica', 'River (Female)': 'af_river', + 'Michael (Male)': 'am_michael', 'Fenrir (Male)': 'am_fenrir', 'Puck (Male)': 'am_puck', + 'Echo (Male)': 'am_echo', 'Eric (Male)': 'am_eric', 'Liam (Male)': 'am_liam', + 'Onyx (Male)': 'am_onyx', 'Santa (Male)': 'am_santa', 'Adam (Male)': 'am_adam', + 'Emma 🇬🇧 (Female)': 'bf_emma', 'Isabella 🇬🇧 (Female)': 'bf_isabella', 'Alice 🇬🇧 (Female)': 'bf_alice', + 'Lily 🇬🇧 (Female)': 'bf_lily', 'George 🇬🇧 (Male)': 'bm_george', 'Fable 🇬🇧 (Male)': 'bm_fable', + 'Lewis 🇬🇧 (Male)': 'bm_lewis', 'Daniel 🇬🇧 (Male)': 'bm_daniel' } def generate_video_with_options(user_input, resolution, caption_option, music_file, voice, vclip_prob, bg_vol, video_fps, video_preset, v_speed, caption_size): + """Wrapper function for Gradio to set global options before calling generate_video.""" global selected_voice, voice_speed, font_size, video_clip_probability, bg_music_volume, fps, preset - - # Update global variables with user selections - selected_voice = VOICE_CHOICES[voice] + + print("--- Updating Settings from Gradio ---") + # Update global variables with user selections from Gradio interface + selected_voice = VOICE_CHOICES.get(voice, 'af_heart') # Get voice ID, default if key not found voice_speed = v_speed font_size = caption_size - video_clip_probability = vclip_prob / 100 # Convert from percentage to decimal + video_clip_probability = vclip_prob / 100.0 # Convert percentage to decimal bg_music_volume = bg_vol fps = video_fps preset = video_preset - - # Handle music upload + + # Handle music upload: Copy uploaded file to a standard name 'music.mp3' + target_music_path = "music.mp3" + # Remove previous music file if it exists + if os.path.exists(target_music_path): + try: + os.remove(target_music_path) + print(f"Removed previous '{target_music_path}'") + except OSError as e: + print(f"Warning: Could not remove previous music file: {e}") + if music_file is not None: - target_path = "music.mp3" - shutil.copy(music_file.name, target_path) - print(f"Uploaded music saved as: {target_path}") - - # Generate the video - return generate_video(user_input, resolution, caption_option) - -# Create the Gradio interface -iface = gr.Interface( - fn=generate_video_with_options, - inputs=[ - gr.Textbox(label="Video Concept", placeholder="Enter your video concept here..."), - gr.Radio(["Full", "Short"], label="Resolution", value="Full"), - gr.Radio(["Yes", "No"], label="Captions", value="Yes"), - gr.File(label="Upload Background Music (MP3)", file_types=[".mp3"]), - gr.Dropdown(choices=list(VOICE_CHOICES.keys()), label="Choose Voice", value="Emma (Female)"), - gr.Slider(0, 100, value=25, step=1, label="Video Clip Usage Probability (%)"), - gr.Slider(0.0, 1.0, value=0.08, step=0.01, label="Background Music Volume"), - gr.Slider(10, 60, value=30, step=1, label="Video FPS"), - gr.Dropdown(choices=["ultrafast", "superfast", "veryfast", "faster", "fast", "medium", "slow"], - value="veryfast", label="Export Preset"), - gr.Slider(0.5, 1.5, value=0.9, step=0.05, label="Voice Speed"), - gr.Slider(20, 100, value=45, step=1, label="Caption Font Size") - ], - outputs=gr.Video(label="Generated Video"), - title="AI Documentary Video Generator", - description="Create short documentary videos with AI. Upload music, choose voice, and customize settings." -) + try: + # music_file is a TemporaryFileWrapper object in Gradio >= 3.0 + shutil.copy(music_file.name, target_music_path) + print(f"Uploaded music '{os.path.basename(music_file.name)}' copied to '{target_music_path}'") + except Exception as e: + print(f"Error copying uploaded music file: {e}") + # Continue without background music if copy fails + else: + print("No background music file uploaded.") + + # Call the main video generation function with the core inputs + # The function will use the global variables updated above + try: + video_path = generate_video(user_input, resolution, caption_option) + # Check if video generation failed (returned None) + if video_path is None: + # Raise a Gradio error to display it in the interface + raise gr.Error("Video generation failed. Please check the console logs for details.") + return video_path + except gr.Error as e: + # Re-raise Gradio errors to show them in the UI + raise e + except Exception as e: + # Catch unexpected errors during generation + print(f"An unexpected error occurred in generate_video_with_options: {e}") + import traceback + traceback.print_exc() + # Raise a Gradio error for unexpected issues + raise gr.Error(f"An unexpected error occurred: {e}. Check logs.") + + +# Create the Gradio interface definition +with gr.Blocks(theme=gr.themes.Soft()) as iface: # Using Blocks for better layout control + gr.Markdown("# 🤖 AI Documentary Video Generator") + gr.Markdown("Create short, funny documentary-style videos with AI narration and stock footage. Customize voice, music, captions, and more.") + + with gr.Row(): + with gr.Column(scale=2): + user_input = gr.Textbox(label="🎬 Video Concept / Script", placeholder="Enter your video topic (e.g., 'Top 5 facts about cats') or paste a full script formatted like the example...", lines=4) + with gr.Accordion("Example Script Format", open=False): + gr.Markdown(""" + ``` + [Cats] + Cats: tiny ninjas plotting world domination. + [Sleeping] + They sleep 23 hours a day, planning. + [Boxes] + Their mortal enemy? The empty box. It must be contained. + [Zoomies] + Suddenly, zoomies! Because reasons. + [Subscribe] + Subscribe now, or a cat will judge you silently. Forever. + ``` + **Rules:** + - Start each scene with `[Search Term]` (1-2 words for Pexels). + - Follow with 5-15 words of narration. + - Keep it funny and conversational. + - End with a subscribe line related to the topic. + """) + with gr.Column(scale=1): + resolution = gr.Radio(["Full", "Short"], label="📐 Resolution", value="Full", info="Full=16:9 (YouTube), Short=9:16 (TikTok/Shorts)") + caption_option = gr.Radio(["Yes", "No"], label="✍️ Add Captions?", value="Yes") + music_file = gr.File(label="🎵 Upload Background Music (Optional MP3)", file_types=[".mp3"], type="file") # Use type="file" + + with gr.Accordion("⚙️ Advanced Settings", open=False): + with gr.Row(): + voice = gr.Dropdown(choices=list(VOICE_CHOICES.keys()), label="🗣️ Choose Voice", value="Emma (Female)") + v_speed = gr.Slider(minimum=0.5, maximum=1.5, value=0.9, step=0.05, label="💨 Voice Speed", info="0.5=Slow, 1.0=Normal, 1.5=Fast") + with gr.Row(): + caption_size = gr.Slider(minimum=20, maximum=100, value=45, step=1, label="🔡 Caption Font Size") + vclip_prob = gr.Slider(minimum=0, maximum=100, value=25, step=5, label="🎞️ Video Clip %", info="Chance of using a video clip instead of an image for a scene.") + with gr.Row(): + bg_vol = gr.Slider(minimum=0.0, maximum=1.0, value=0.08, step=0.01, label="🔉 BG Music Volume", info="0.0=Silent, 1.0=Full Volume") + video_fps = gr.Slider(minimum=15, maximum=60, value=30, step=1, label="🎬 Video FPS") + video_preset = gr.Dropdown( + choices=["ultrafast", "superfast", "veryfast", "faster", "fast", "medium", "slow", "slower", "veryslow"], + value="veryfast", label="⚙️ Export Quality/Speed", info="Faster presets = lower quality/size, Slower presets = higher quality/size." + ) + + submit_button = gr.Button("✨ Generate Video ✨", variant="primary") + output_video = gr.Video(label="Generated Video") + + # Define the action when the button is clicked + submit_button.click( + fn=generate_video_with_options, + inputs=[ + user_input, resolution, caption_option, music_file, voice, + vclip_prob, bg_vol, video_fps, video_preset, v_speed, caption_size + ], + outputs=output_video + ) + + gr.Markdown("---") + gr.Markdown("⚠️ **Note:** Video generation can take several minutes, especially on CPU. Check console logs for progress.") + # Launch the interface if __name__ == "__main__": - iface.launch(share=True) \ No newline at end of file + # Ensure API keys are set before launching + if not PEXELS_API_KEY or not OPENROUTER_API_KEY: + print("####################################################################") + print("ERROR: PEXELS_API_KEY or OPENROUTER_API_KEY is not set!") + print("Please set these variables at the top of the script before running.") + print("####################################################################") + # Optionally exit if keys are missing + # exit(1) + else: + print("API Keys seem to be set. Launching Gradio interface...") + iface.launch(share=True, debug=True) # Enable share=True for public link, debug=True for more logs \ No newline at end of file