diff --git "a/app.py" "b/app.py" --- "a/app.py" +++ "b/app.py" @@ -1,866 +1,1632 @@ - - -# Import necessary libraries -from kokoro import KPipeline - -import soundfile as sf -import torch - -import soundfile as sf +# --- Import necessary libraries --- +import gradio as gr import os -from moviepy.editor import VideoFileClip, AudioFileClip, ImageClip -from PIL import Image +import shutil import tempfile +import time +import re import random -import cv2 import math -import os, requests, io, time, re, random +import requests +import io +import uuid # For unique IDs +import traceback # For detailed error printing +import numpy as np +from PIL import Image, ImageDraw, ImageFont +import cv2 from moviepy.editor import ( VideoFileClip, concatenate_videoclips, AudioFileClip, ImageClip, - CompositeVideoClip, TextClip + CompositeVideoClip, TextClip, CompositeAudioClip ) import moviepy.video.fx.all as vfx import moviepy.config as mpy_config from pydub import AudioSegment from pydub.generators import Sine - -from PIL import Image, ImageDraw, ImageFont -import numpy as np +import soundfile as sf +import torch # Assuming Kokoro needs it +from kokoro import KPipeline # Kokoro TTS from bs4 import BeautifulSoup -import base64 from urllib.parse import quote -import pysrt from gtts import gTTS -import gradio as gr # Import Gradio - -# Initialize Kokoro TTS pipeline (using American English) -pipeline = KPipeline(lang_code='a') # Use voice 'af_heart' for American English -# Ensure ImageMagick binary is set -mpy_config.change_settings({"IMAGEMAGICK_BINARY": "/usr/bin/convert"}) - -# ---------------- Global Configuration ---------------- # -PEXELS_API_KEY = 'BhJqbcdm9Vi90KqzXKAhnEHGsuFNv4irXuOjWtT761U49lRzo03qBGna' -OPENROUTER_API_KEY = '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' -OPENROUTER_MODEL = "mistralai/mistral-small-3.1-24b-instruct:free" -OUTPUT_VIDEO_FILENAME = "final_video.mp4" +from functools import partial # For event handlers + +# --- Initialize Kokoro TTS Pipeline --- +# Ensure this is done safely (e.g., check if already initialized if run multiple times) +try: + # Use American English voice provided by Kokoro library example + pipeline = KPipeline(lang_code='a') # 'a' often corresponds to American English variant + print("Kokoro TTS Pipeline Initialized.") +except Exception as e: + print(f"Error initializing Kokoro TTS: {e}. TTS functionality might be limited.") + pipeline = None # Set pipeline to None if initialization fails + + +# --- Configuration --- +PEXELS_API_KEY = 'BhJqbcdm9Vi90KqzXKAhnEHGsuFNv4irXuOjWtT761U49lRzo03qBGna' # Replace with your actual key +OPENROUTER_API_KEY = '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' # Replace with your actual key +OPENROUTER_MODEL = "mistralai/mistral-small" # Use a reliable model +OUTPUT_VIDEO_FILENAME_BASE = "ai_docu_video" USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" +# Optional: Set ImageMagick binary if moviepy doesn't find it automatically +# try: +# # Check common paths or allow environment variable override +# imagemagick_path = os.environ.get("IMAGEMAGICK_BINARY", "/usr/bin/convert") # Example path +# if os.path.exists(imagemagick_path): +# mpy_config.change_settings({"IMAGEMAGICK_BINARY": imagemagick_path}) +# print(f"ImageMagick binary set to: {imagemagick_path}") +# else: +# print(f"Warning: ImageMagick binary not found at {imagemagick_path}. Text rendering might use defaults.") +# except Exception as e: +# print(f"Warning: Error configuring ImageMagick: {e}") + +# --- Helper Functions (Refactored for Parameters & Temp Dir) --- -# ---------------- Helper Functions ---------------- # -# (Your existing helper functions remain unchanged: generate_script, parse_script, -# search_pexels_videos, search_pexels_images, search_google_images, download_image, -# download_video, generate_media, generate_tts, apply_kenburns_effect, -# resize_to_fill, find_mp3_files, add_background_music, create_clip, -# fix_imagemagick_policy) +def fix_imagemagick_policy(): + """Attempts to fix common ImageMagick security policy issues for caption rendering.""" + # This function might require sudo privileges and is OS-dependent. Use with caution. + policy_paths = [ + "/etc/ImageMagick-6/policy.xml", + "/etc/ImageMagick-7/policy.xml", + "/etc/ImageMagick/policy.xml", + "/usr/local/etc/ImageMagick-7/policy.xml" + ] + found_policy = next((path for path in policy_paths if os.path.exists(path)), None) + if not found_policy: + print("ImageMagick policy.xml not found in common locations. Skipping policy fix.") + return False -# Define these globally as they were in your original code but will be set per run -TARGET_RESOLUTION = None -CAPTION_COLOR = None -TEMP_FOLDER = None + print(f"Attempting to modify ImageMagick policy at: {found_policy}") + print("NOTE: This may require administrative privileges (sudo).") + # Use simpler patterns that are more likely to work across versions + commands = [ + f"sudo sed -i.bak 's/rights=\"none\" pattern=\"PS\"/rights=\"read|write\" pattern=\"PS\"/' {found_policy}", + f"sudo sed -i 's/rights=\"none\" pattern=\"EPS\"/rights=\"read|write\" pattern=\"EPS\"/' {found_policy}", + f"sudo sed -i 's/rights=\"none\" pattern=\"PDF\"/rights=\"read|write\" pattern=\"PDF\"/' {found_policy}", + f"sudo sed -i 's/rights=\"none\" pattern=\"XPS\"/rights=\"read|write\" pattern=\"XPS\"/' {found_policy}", + # Allow reading/writing paths - adjust pattern if needed + f"sudo sed -i 's///' {found_policy}", + # Less aggressive version for path, allows reading system paths if needed by fonts etc. + f"sudo sed -i 's///' {found_policy}" + ] + success = True + for cmd in commands: + print(f"Executing: {cmd}") + try: + # Use os.system - requires user interaction for sudo password if needed + # Consider subprocess for better control if running non-interactively + exit_code = os.system(cmd) + if exit_code != 0: + print(f"Command failed with exit code {exit_code}. Policy might not be fully updated.") + # Don't necessarily set success to False, some might fail harmlessly + except Exception as e: + print(f"Error executing command: {e}") + success = False # Mark as failed if command execution raises error + + if success: + print("ImageMagick policy modifications attempted. Restart application if issues persist.") + else: + print("Some policy modification commands failed. Text rendering might be affected.") + return success def generate_script(user_input): - """Generate documentary script with proper OpenRouter handling.""" + """Generate documentary script using OpenRouter API.""" + print("[Function] generate_script called") headers = { 'Authorization': f'Bearer {OPENROUTER_API_KEY}', - 'HTTP-Referer': 'https://your-domain.com', - 'X-Title': 'AI Documentary Maker' + 'HTTP-Referer': 'https://your-app-domain.com', # Replace with your actual domain/app name + 'X-Title': 'AI Documentary Maker - Dynamic' } - - prompt = f"""Short Documentary Script GeneratorInstructions: + # Use the detailed prompt provided by the user + prompt = f"""Short Documentary Script Generator Instructions: If I say "use this," just output the script exactly as I gave it. If I only give topics, generate a script based on them. -If I provide a full script, rewrite it without any changes. Make everything short simple and humarous funny and act as serious but humarous. And don't say anything off topic. Also alway say a funny statement to subscribe based on the video topic at the end. Use normal conversational text like a normal person talking and avoid AI phase make the statements humanize and normal conversational -And always talk negative views don't really wants any positive statement and the full script will be one video about one topic so keep in mind +If I provide a full script, rewrite it without any changes. Make everything short simple and humorous funny and act as serious but humorous. And don't say anything off topic. Also always say a funny statement to subscribe based on the video topic at the end. Use normal conversational text like a normal person talking and avoid AI phase make the statements humanize and normal conversational +And always talk negative views don't really want any positive statement and the full script will be one video about one topic so keep in mind Formatting Rules: - -Title in Square Brackets: - - -Each section starts with a one-word title inside [ ] (max two words if necessary). -This title will be used as a search term for Pexels footage. - - - -Casual & Funny Narration: - - -Each section has 5-10 words of narration. -Keep it natural, funny, and unpredictable (not robotic, poetic, or rhythmic). - - - -No Special Formatting: - - -No bold, italics, or special characters. You are a assistant AI your task is to create script. You aren't a chatbot. So, don't write extra text - - - -Generalized Search Terms: - - -If a term is too specific, make it more general for Pexels search. - - - -Scene-Specific Writing: - - -Each section describes only what should be shown in the video. - - - -Output Only the Script, and also make it funny and humarous and helirous and also add to subscribe with a funny statement like subscribe now or ..... - - +Title in Square Brackets: Each section starts with a one-word title inside [ ] (max two words if necessary). This title will be used as a search term for Pexels footage. +Casual & Funny Narration: Each section has 5-10 words of narration. Keep it natural, funny, and unpredictable (not robotic, poetic, or rhythmic). +No Special Formatting: No bold, italics, or special characters. You are an assistant AI your task is to create script. You aren't a chatbot. So, don't write extra text +Generalized Search Terms: If a term is too specific, make it more general for Pexels search. +Scene-Specific Writing: Each section describes only what should be shown in the video. +Output Only the Script, and also make it funny and humorous and hilarious and also add to subscribe with a funny statement like subscribe now or ..... No extra text, just the script. - - Example Output: [North Korea] - Top 5 unknown facts about North Korea. - [Invisibility] - North Korea’s internet speed is so fast… it doesn’t exist. - [Leadership] - Kim Jong-un once won an election with 100% votes… against himself. - [Magic] - North Korea discovered time travel. That’s why their news is always from the past. - [Warning] - Subscribe now, or Kim Jong-un will send you a free one-way ticket… to North Korea. - [Freedom] - North Korean citizens can do anything… as long as it's government-approved. -Now here is the Topic/scrip: {user_input} + +Now here is the Topic/script: {user_input} """ data = { 'model': OPENROUTER_MODEL, 'messages': [{'role': 'user', 'content': prompt}], - 'temperature': 0.4, - 'max_tokens': 5000 + 'temperature': 0.5, # Slightly increased for more creative/funny results + 'max_tokens': 1024 # Increased max tokens for potentially longer scripts } - try: response = requests.post( 'https://openrouter.ai/api/v1/chat/completions', - headers=headers, - json=data, - timeout=30 + headers=headers, json=data, timeout=60 # Increased timeout ) - - if response.status_code == 200: - response_data = response.json() - if 'choices' in response_data and len(response_data['choices']) > 0: - return response_data['choices'][0]['message']['content'] - else: - print("Unexpected response format:", response_data) - return None + response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) + + response_data = response.json() + if 'choices' in response_data and len(response_data['choices']) > 0: + script_content = response_data['choices'][0]['message']['content'] + # Basic cleanup: remove potential leading/trailing whitespace/newlines + script_content = script_content.strip() + print("--- Generated Script ---") + print(script_content) + print("-----------------------") + # Check if script seems empty or just contains formatting noise + if not script_content or len(script_content) < 10 or script_content.count('[') == 0: + print("Warning: Generated script seems empty or invalid.") + return f"Error: Generated script was empty or invalid. Raw response: {script_content}" + return script_content else: - print(f"API Error {response.status_code}: {response.text}") - return None - + print("API Error: Unexpected response format:", response_data) + return "Error: Could not generate script (unexpected format)." + + except requests.exceptions.RequestException as e: + print(f"API Request failed: {str(e)}") + # Provide more specific error if possible + error_message = f"Error: Could not generate script (Request failed: {e})." + if isinstance(e, requests.exceptions.Timeout): + error_message = "Error: Could not generate script (API request timed out)." + elif isinstance(e, requests.exceptions.HTTPError): + error_message = f"Error: Could not generate script (API Error {e.response.status_code}: {e.response.text})." + return error_message except Exception as e: - print(f"Request failed: {str(e)}") - return None + print(f"Unexpected error during script generation: {e}") + print(traceback.format_exc()) + return f"Error: An unexpected error occurred during script generation: {e}" def parse_script(script_text): - """ - Parse the generated script into a list of elements. - For each section, create two elements: - - A 'media' element using the section title as the visual prompt. - - A 'tts' element with the narration text, voice info, and computed duration. - """ - sections = {} + """Parse the generated script into media and TTS elements with segment IDs.""" + print("[Function] parse_script called") + elements = [] + segment_id_counter = 0 current_title = None - current_text = "" - - try: - for line in script_text.splitlines(): - line = line.strip() - if line.startswith("[") and "]" in line: - bracket_start = line.find("[") - bracket_end = line.find("]", bracket_start) - if bracket_start != -1 and bracket_end != -1: - if current_title is not None: - sections[current_title] = current_text.strip() - current_title = line[bracket_start+1:bracket_end] - current_text = line[bracket_end+1:].strip() - elif current_title: - current_text += line + " " - - if current_title: - sections[current_title] = current_text.strip() - - elements = [] - for title, narration in sections.items(): - if not title or not narration: - continue - - media_element = {"type": "media", "prompt": title, "effects": "fade-in"} - words = narration.split() - duration = max(3, len(words) * 0.5) - tts_element = {"type": "tts", "text": narration, "voice": "en", "duration": duration} - elements.append(media_element) - elements.append(tts_element) - - return elements - except Exception as e: - print(f"Error parsing script: {e}") - return [] + current_narration = "" -def search_pexels_videos(query, pexels_api_key): - """Search for a video on Pexels by query and return a random HD video.""" - headers = {'Authorization': pexels_api_key} - base_url = "https://api.pexels.com/videos/search" - num_pages = 3 - videos_per_page = 15 - - max_retries = 3 - retry_delay = 1 + lines = script_text.splitlines() - search_query = query - all_videos = [] + for i, line in enumerate(lines): + line = line.strip() + if not line: # Skip empty lines + continue - for page in range(1, num_pages + 1): - for attempt in range(max_retries): - try: - params = {"query": search_query, "per_page": videos_per_page, "page": page} - response = requests.get(base_url, headers=headers, params=params, timeout=10) - - if response.status_code == 200: - data = response.json() - videos = data.get("videos", []) - - if not videos: - print(f"No videos found on page {page}.") - break - - for video in videos: - video_files = video.get("video_files", []) - for file in video_files: - if file.get("quality") == "hd": - all_videos.append(file.get("link")) - break - - break - - elif response.status_code == 429: - print(f"Rate limit hit (attempt {attempt+1}/{max_retries}). Retrying in {retry_delay} seconds...") - time.sleep(retry_delay) - retry_delay *= 2 - else: - print(f"Error fetching videos: {response.status_code} {response.text}") - if attempt < max_retries - 1: - print(f"Retrying in {retry_delay} seconds...") - time.sleep(retry_delay) - retry_delay *= 2 - else: - break - - except requests.exceptions.RequestException as e: - print(f"Request exception: {e}") - if attempt < max_retries - 1: - print(f"Retrying in {retry_delay} seconds...") - time.sleep(retry_delay) - retry_delay *= 2 - else: - break + match = re.match(r'^\[(.*?)\](.*)', line) # Match [Title] Optional Text + + if match: + # If we have a pending title/narration, save it before starting new one + if current_title is not None and current_narration: + segment_id = f"seg_{segment_id_counter}" + media_element = {"type": "media", "prompt": current_title, "segment_id": segment_id} + # Estimate duration based on narration words + words = current_narration.split() + duration = max(2.0, min(15.0, len(words) * 0.45 + 0.5)) # Base duration + per word, capped + tts_element = {"type": "tts", "text": current_narration, "duration": duration, "segment_id": segment_id} + elements.append(media_element) + elements.append(tts_element) + segment_id_counter += 1 + print(f" -> Parsed segment {segment_id}: '{current_title}' / '{current_narration[:30]}...'") + + # Start the new segment + current_title = match.group(1).strip() + current_narration = match.group(2).strip() # Text on the same line + + elif current_title is not None: + # This line is part of the narration for the current title + current_narration += (" " + line) if current_narration else line # Add space only if needed + + # Add the very last segment after the loop finishes + if current_title is not None and current_narration: + segment_id = f"seg_{segment_id_counter}" + media_element = {"type": "media", "prompt": current_title, "segment_id": segment_id} + words = current_narration.split() + duration = max(2.0, min(15.0, len(words) * 0.45 + 0.5)) + tts_element = {"type": "tts", "text": current_narration, "duration": duration, "segment_id": segment_id} + elements.append(media_element) + elements.append(tts_element) + print(f" -> Parsed segment {segment_id}: '{current_title}' / '{current_narration[:30]}...'") - if all_videos: - random_video = random.choice(all_videos) - print(f"Selected random video from {len(all_videos)} HD videos") - return random_video + if not elements: + print("Warning: Script parsing resulted in zero elements.") else: - print("No suitable videos found after searching all pages.") - return None - -def search_pexels_images(query, pexels_api_key): - """Search for an image on Pexels by query.""" - headers = {'Authorization': pexels_api_key} - url = "https://api.pexels.com/v1/search" - params = {"query": query, "per_page": 5, "orientation": "landscape"} - + print(f"Parsed into {len(elements)} elements ({len(elements)//2} segments)") + return elements + +def search_pexels(query, api_key, search_type="videos", per_page=10, orientation="landscape"): + """Search Pexels API for videos or photos.""" + base_url = f"https://api.pexels.com/{search_type}/search" + headers = {'Authorization': api_key} + params = {"query": query, "per_page": per_page, "orientation": orientation} max_retries = 3 retry_delay = 1 + print(f"Searching Pexels {search_type} for: '{query}' (Orientation: {orientation})") + for attempt in range(max_retries): try: - response = requests.get(url, headers=headers, params=params, timeout=10) - - if response.status_code == 200: - data = response.json() - photos = data.get("photos", []) - if photos: - photo = random.choice(photos[:min(5, len(photos))]) - img_url = photo.get("src", {}).get("original") - return img_url - else: - print(f"No images found for query: {query}") - return None - - elif response.status_code == 429: - print(f"Rate limit hit (attempt {attempt+1}/{max_retries}). Retrying in {retry_delay} seconds...") - time.sleep(retry_delay) - retry_delay *= 2 + response = requests.get(base_url, headers=headers, params=params, timeout=15) + response.raise_for_status() # Check for HTTP errors + + data = response.json() + results = data.get("videos" if search_type == "videos" else "photos", []) + + if not results: + print(f"No Pexels {search_type} found for '{query}' on attempt {attempt+1}.") + # Optionally try modifying query slightly on retries (e.g., remove pluralization) + # if attempt == 0 and query.endswith('s'): params['query'] = query[:-1] + # else: break # Stop if no results after modification or first try + break # Keep it simple: stop if no results + + print(f"Found {len(results)} Pexels {search_type} results.") + return results # Return the list of results + + except requests.exceptions.HTTPError as e: + if e.response.status_code == 429: # Rate limit + print(f"Pexels API rate limit hit (attempt {attempt+1}/{max_retries}). Retrying in {retry_delay}s...") + time.sleep(retry_delay) + retry_delay *= 2 # Exponential backoff + elif e.response.status_code == 400 and 'invalid query' in e.response.text.lower(): + print(f"Pexels API Error: Invalid query '{query}'. Skipping.") + return [] # Return empty list for invalid query else: - print(f"Error fetching images: {response.status_code} {response.text}") - if attempt < max_retries - 1: - print(f"Retrying in {retry_delay} seconds...") - time.sleep(retry_delay) - retry_delay *= 2 - + print(f"Pexels API HTTP Error {e.response.status_code}: {e.response.text} (attempt {attempt+1}/{max_retries})") + if attempt < max_retries - 1: + time.sleep(retry_delay) + retry_delay *= 2 + else: + print("Max retries reached for HTTP error.") + return [] # Failed after retries except requests.exceptions.RequestException as e: - print(f"Request exception: {e}") + print(f"Pexels API Request Exception: {e} (attempt {attempt+1}/{max_retries})") if attempt < max_retries - 1: - print(f"Retrying in {retry_delay} seconds...") time.sleep(retry_delay) retry_delay *= 2 + else: + print("Max retries reached for request exception.") + return [] # Failed after retries - print(f"No Pexels images found for query: {query} after all attempts") - return None + print(f"Pexels search failed for '{query}' after {max_retries} attempts.") + return [] # Return empty list if search fails completely + +def search_google_images(query, temp_dir): + """Search Google Images and attempt to download the first few valid results.""" + print(f"Searching Google Images for: '{query}'") + search_url = f"https://www.google.com/search?q={quote(query)}&tbm=isch&safe=active" # Added safe search + headers = {"User-Agent": USER_AGENT} + downloaded_path = None -def search_google_images(query): - """Search for images on Google Images (for news-related queries)""" try: - search_url = f"https://www.google.com/search?q={quote(query)}&tbm=isch" - headers = {"User-Agent": USER_AGENT} response = requests.get(search_url, headers=headers, timeout=10) + response.raise_for_status() soup = BeautifulSoup(response.text, "html.parser") img_tags = soup.find_all("img") image_urls = [] for img in img_tags: - src = img.get("src", "") - if src.startswith("http") and "gstatic" not in src: + src = img.get("src") or img.get("data-src") # Try both src and data-src + if src and src.startswith("http") and "gstatic.com/images" not in src: # Filter out base64/gstatic image_urls.append(src) - if image_urls: - return random.choice(image_urls[:5]) if len(image_urls) >= 5 else image_urls[0] - else: - print(f"No Google Images found for query: {query}") - return None + print(f"Found {len(image_urls)} potential image URLs from Google.") + + # Try downloading the first few valid URLs + for i, url in enumerate(image_urls[:5]): # Try top 5 + safe_prompt = re.sub(r'[^\w\s-]', '', query).strip().replace(' ', '_') + filename = os.path.join(temp_dir, f"gimg_{safe_prompt}_{uuid.uuid4().hex[:6]}.jpg") + print(f"Attempting download from Google Images URL #{i+1}: {url[:80]}...") + downloaded_path = download_image(url, filename) + if downloaded_path: + print(f"Successfully downloaded Google Image to: {os.path.basename(downloaded_path)}") + return downloaded_path # Return the first one that works + else: + print("Download/validation failed for this URL.") + time.sleep(0.2) # Small delay before next attempt + + except requests.exceptions.RequestException as e: + print(f"Error during Google Images search request: {e}") except Exception as e: - print(f"Error in Google Images search: {e}") - return None + print(f"Error parsing Google Images results or downloading: {e}") + print(traceback.format_exc()) + + print(f"Google Images search/download failed for query: {query}") + return None -def download_image(image_url, filename): - """Download an image from a URL to a local file with enhanced error handling.""" +def download_media(url, filename, media_type="image"): + """Download image or video from URL with error handling and validation.""" + print(f"Downloading {media_type}: {url[:80]}... -> {os.path.basename(filename)}") + headers = {"User-Agent": USER_AGENT} try: - headers = {"User-Agent": USER_AGENT} - print(f"Downloading image from: {image_url} to {filename}") - response = requests.get(image_url, headers=headers, stream=True, timeout=15) + response = requests.get(url, headers=headers, stream=True, timeout=30) # Increased timeout for videos response.raise_for_status() with open(filename, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) + print(f"{media_type.capitalize()} downloaded successfully.") - print(f"Image downloaded successfully to: {filename}") - - try: - img = Image.open(filename) - img.verify() - img = Image.open(filename) - if img.mode != 'RGB': - img = img.convert('RGB') - img.save(filename) - print(f"Image validated and processed: {filename}") - return filename - except Exception as e_validate: - print(f"Downloaded file is not a valid image: {e_validate}") - if os.path.exists(filename): - os.remove(filename) - return None + # Validate image files + if media_type == "image": + try: + with Image.open(filename) as img: + img.verify() # Check if Pillow can read metadata + # Re-open to check format and convert if necessary + with Image.open(filename) as img: + if img.format in ['JPEG', 'PNG', 'WEBP']: # Common formats + if img.mode != 'RGB': + print(f"Converting image {os.path.basename(filename)} to RGB") + # Create a new filename for the converted image + name, ext = os.path.splitext(filename) + rgb_filename = f"{name}_rgb{ext}" + img.convert('RGB').save(rgb_filename) + # Optionally remove the original non-RGB file + # try: os.remove(filename) except OSError: pass + filename = rgb_filename # Use the new RGB file path + print(f"Image validated ({img.format}, {img.mode}). Path: {os.path.basename(filename)}") + return filename + else: + print(f"Warning: Downloaded image format ({img.format}) might not be ideal. Attempting conversion.") + name, ext = os.path.splitext(filename) + jpg_filename = f"{name}_converted.jpg" + try: + img.convert('RGB').save(jpg_filename) + # try: os.remove(filename) except OSError: pass + filename = jpg_filename + print(f"Image converted to JPG. Path: {os.path.basename(filename)}") + return filename + except Exception as conv_err: + print(f"Error converting image to JPG: {conv_err}. Keeping original.") + # Fallback: Try returning original if conversion failed but it opened + return filename + + except (IOError, SyntaxError, Exception) as e_validate: + print(f"Downloaded file is not a valid image or processing failed: {e_validate}") + try: os.remove(filename) # Clean up invalid file + except OSError: pass + return None + elif media_type == "video": + # Basic video validation (can be expanded with ffprobe if needed) + if os.path.getsize(filename) < 1024: # Check if file size is suspiciously small + print("Warning: Downloaded video file is very small. May be invalid.") + # Keep it for now, moviepy will likely fail later if invalid + print(f"Video downloaded. Path: {os.path.basename(filename)}") + return filename # Assume valid for now except requests.exceptions.RequestException as e_download: - print(f"Image download error: {e_download}") - if os.path.exists(filename): - os.remove(filename) + print(f"{media_type.capitalize()} download error: {e_download}") + if os.path.exists(filename): try: os.remove(filename) + except OSError: pass return None except Exception as e_general: - print(f"General error during image processing: {e_general}") - if os.path.exists(filename): - os.remove(filename) + print(f"General error during {media_type} download/processing: {e_general}") + print(traceback.format_exc()) + if os.path.exists(filename): try: os.remove(filename) + except OSError: pass return None -def download_video(video_url, filename): - """Download a video from a URL to a local file.""" - try: - response = requests.get(video_url, stream=True, timeout=30) - response.raise_for_status() - with open(filename, 'wb') as f: - for chunk in response.iter_content(chunk_size=8192): - f.write(chunk) - print(f"Video downloaded successfully to: {filename}") - return filename - except Exception as e: - print(f"Video download error: {e}") - if os.path.exists(filename): - os.remove(filename) - return None + return None # Should not be reached ideally -def generate_media(prompt, user_image=None, current_index=0, total_segments=1): - """ - Generate a visual asset by first searching for a video or using a specific search strategy. - For news-related queries, use Google Images. - Returns a dict: {'path': , 'asset_type': 'video' or 'image'}. - """ - safe_prompt = re.sub(r'[^\w\s-]', '', prompt).strip().replace(' ', '_') - if "news" in prompt.lower(): - print(f"News-related query detected: {prompt}. Using Google Images...") - image_file = os.path.join(TEMP_FOLDER, f"{safe_prompt}_news.jpg") - image_url = search_google_images(prompt) - if image_url: - downloaded_image = download_image(image_url, image_file) - if downloaded_image: - print(f"News image saved to {downloaded_image}") - return {"path": downloaded_image, "asset_type": "image"} - else: - print(f"Google Images search failed for prompt: {prompt}") - - if random.random() < 0.25: - video_file = os.path.join(TEMP_FOLDER, f"{safe_prompt}_video.mp4") - video_url = search_pexels_videos(prompt, PEXELS_API_KEY) - if video_url: - downloaded_video = download_video(video_url, video_file) - if downloaded_video: - print(f"Video asset saved to {downloaded_video}") - return {"path": downloaded_video, "asset_type": "video"} - else: - print(f"Pexels video search failed for prompt: {prompt}") - - image_file = os.path.join(TEMP_FOLDER, f"{safe_prompt}.jpg") - image_url = search_pexels_images(prompt, PEXELS_API_KEY) - if image_url: - downloaded_image = download_image(image_url, image_file) - if downloaded_image: - print(f"Image asset saved to {downloaded_image}") - return {"path": downloaded_image, "asset_type": "image"} - else: - print(f"Pexels image download failed for prompt: {prompt}") +# Keep original download_image/video for compatibility if needed, but prefer download_media +def download_image(url, filename): + return download_media(url, filename, media_type="image") - fallback_terms = ["nature", "people", "landscape", "technology", "business"] - for term in fallback_terms: - print(f"Trying fallback image search with term: {term}") - fallback_file = os.path.join(TEMP_FOLDER, f"fallback_{term}.jpg") - fallback_url = search_pexels_images(term, PEXELS_API_KEY) - if fallback_url: - downloaded_fallback = download_image(fallback_url, fallback_file) - if downloaded_fallback: - print(f"Fallback image saved to {downloaded_fallback}") - return {"path": downloaded_fallback, "asset_type": "image"} - else: - print(f"Fallback image download failed for term: {term}") +def download_video(url, filename): + return download_media(url, filename, media_type="video") + + +def generate_media(prompt, temp_dir, video_preference_ratio=0.3, is_news=False): + """Generate a visual asset (video or image) based on prompt and preferences.""" + safe_prompt = re.sub(r'[^\w\s-]', '', prompt).strip().replace(' ', '_') + print(f"\nGenerating media for prompt: '{prompt}', Video Pref: {video_preference_ratio}, News: {is_news}") + + # --- Strategy --- + # 1. If News: Prioritize Google Images. + # 2. If Video Preferred: Try Pexels Video first. + # 3. Try Pexels Image. + # 4. If Image failed or Video not preferred initially: Try Pexels Video. + # 5. Fallback: Generic Pexels Image search. + + # 1. News Strategy + if is_news: + print("News strategy: Trying Google Images first.") + gimg_path = search_google_images(prompt, temp_dir) + if gimg_path: + return {"path": gimg_path, "asset_type": "image", "source": "google"} else: - print(f"Fallback image search failed for term: {term}") + print("Google Images failed for news prompt, continuing with Pexels...") + + # 2. Video Preferred Strategy + if random.random() < video_preference_ratio: + print("Video preference: Trying Pexels Video first.") + pexel_videos = search_pexels(prompt, PEXELS_API_KEY, search_type="videos") + if pexel_videos: + selected_video_info = random.choice(pexel_videos) + # Find HD link if possible, otherwise take highest quality available + hd_link = next((f['link'] for f in selected_video_info.get('video_files', []) if f.get('quality') == 'hd'), None) + if not hd_link: + best_link = max(selected_video_info.get('video_files', []), key=lambda x: x.get('width', 0) * x.get('height', 0), default=None) + hd_link = best_link['link'] if best_link else None + + if hd_link: + video_file = os.path.join(temp_dir, f"vid_{safe_prompt}_{uuid.uuid4().hex[:6]}.mp4") + downloaded_video = download_video(hd_link, video_file) + if downloaded_video: + return {"path": downloaded_video, "asset_type": "video", "source": "pexels"} + print("Pexels Video (first attempt) failed or no suitable link found.") + + + # 3. Pexels Image Strategy + print("Trying Pexels Image search.") + pexel_images = search_pexels(prompt, PEXELS_API_KEY, search_type="photos") + if pexel_images: + selected_photo_info = random.choice(pexel_images) + # Prefer 'large' or 'original' size + img_url = selected_photo_info.get('src', {}).get('large', selected_photo_info.get('src', {}).get('original')) + if img_url: + image_file = os.path.join(temp_dir, f"img_{safe_prompt}_{uuid.uuid4().hex[:6]}.jpg") + downloaded_image = download_image(img_url, image_file) + if downloaded_image: + return {"path": downloaded_image, "asset_type": "image", "source": "pexels"} + print("Pexels Image search failed.") + + + # 4. Pexels Video (Second Attempt if not tried first or if Image failed) + if not (random.random() < video_preference_ratio): # If video wasn't tried first + print("Trying Pexels Video (second attempt).") + pexel_videos = search_pexels(prompt, PEXELS_API_KEY, search_type="videos") + if pexel_videos: + selected_video_info = random.choice(pexel_videos) + hd_link = next((f['link'] for f in selected_video_info.get('video_files', []) if f.get('quality') == 'hd'), None) + if not hd_link: + best_link = max(selected_video_info.get('video_files', []), key=lambda x: x.get('width', 0) * x.get('height', 0), default=None) + hd_link = best_link['link'] if best_link else None + + if hd_link: + video_file = os.path.join(temp_dir, f"vid_{safe_prompt}_{uuid.uuid4().hex[:6]}.mp4") + downloaded_video = download_video(hd_link, video_file) + if downloaded_video: + return {"path": downloaded_video, "asset_type": "video", "source": "pexels"} + print("Pexels Video (second attempt) failed.") + + + # 5. Fallback Image Strategy + print("All primary searches failed. Trying fallback Pexels image search...") + fallback_terms = ["abstract", "texture", "background", "technology", "nature"] + random.shuffle(fallback_terms) # Try in random order + for term in fallback_terms: + print(f" Fallback search term: '{term}'") + fallback_images = search_pexels(term, PEXELS_API_KEY, search_type="photos", per_page=5) + if fallback_images: + selected_photo_info = random.choice(fallback_images) + img_url = selected_photo_info.get('src', {}).get('large', selected_photo_info.get('src', {}).get('original')) + if img_url: + fallback_file = os.path.join(temp_dir, f"fallback_{term}_{uuid.uuid4().hex[:6]}.jpg") + downloaded_fallback = download_image(img_url, fallback_file) + if downloaded_fallback: + print(f"Using fallback image '{term}'.") + return {"path": downloaded_fallback, "asset_type": "image", "source": "pexels_fallback"} + time.sleep(0.5) # Avoid hitting rate limits rapidly on fallback + + print(f"ERROR: Failed to generate any visual asset for prompt: {prompt}") + # Create a placeholder black image as a last resort? + try: + placeholder_path = os.path.join(temp_dir, f"placeholder_{uuid.uuid4().hex[:6]}.png") + img = Image.new('RGB', (640, 360), color = 'black') # Small black image + draw = ImageDraw.Draw(img) + draw.text((10, 10), f"Media Failed\n'{prompt[:50]}...'", fill="white") + img.save(placeholder_path) + print("Using placeholder black image.") + return {"path": placeholder_path, "asset_type": "image", "source": "placeholder"} + except Exception as placeholder_err: + print(f"Failed to create placeholder image: {placeholder_err}") + return None # Absolute failure + +def generate_tts(text, temp_dir, voice='en', use_kokoro=True): + """Generate TTS using Kokoro, falling back to gTTS or silence.""" + safe_text = re.sub(r'[^\w\s-]', '', text[:20]).strip().replace(' ', '_') + file_path = os.path.join(temp_dir, f"tts_{safe_text}_{uuid.uuid4().hex[:6]}.wav") + print(f"Generating TTS for: '{text[:40]}...'") + + # Try Kokoro first if enabled and available + if use_kokoro and pipeline: + try: + # Assuming 'af_heart' or similar is the desired American English voice from Kokoro + kokoro_voice_code = 'af_heart' # Adjust if your Kokoro setup uses different codes + generator = pipeline(text, voice=kokoro_voice_code, speed=0.95, split_pattern=r'\n+') # Adjust speed as needed + audio_segments = [audio for _, _, audio in generator] - print(f"Failed to generate visual asset for prompt: {prompt}") - return None + if not audio_segments: + raise ValueError("Kokoro TTS returned no audio segments.") -def generate_silent_audio(duration, sample_rate=24000): - """Generate a silent WAV audio file lasting 'duration' seconds.""" - num_samples = int(duration * sample_rate) - silence = np.zeros(num_samples, dtype=np.float32) - silent_path = os.path.join(TEMP_FOLDER, f"silent_{int(time.time())}.wav") - sf.write(silent_path, silence, sample_rate) - print(f"Silent audio generated: {silent_path}") - return silent_path - -def generate_tts(text, voice): - """ - Generate TTS audio using Kokoro, falling back to gTTS or silent audio if needed. - """ - safe_text = re.sub(r'[^\w\s-]', '', text[:10]).strip().replace(' ', '_') - file_path = os.path.join(TEMP_FOLDER, f"tts_{safe_text}.wav") - - if os.path.exists(file_path): - print(f"Using cached TTS for text '{text[:10]}...'") - return file_path + # Concatenate segments if multiple, ensure numpy array + full_audio = np.concatenate(audio_segments) if len(audio_segments) > 1 else audio_segments[0] + if not isinstance(full_audio, np.ndarray): + raise ValueError("Kokoro output is not a numpy array.") + + # Ensure audio is float32 for soundfile writing if needed + if full_audio.dtype != np.float32: + full_audio = full_audio.astype(np.float32) + # Normalize if necessary after type conversion (e.g., if it was int16) + max_val = np.max(np.abs(full_audio)) + if max_val > 1.0: full_audio /= max_val + + + sf.write(file_path, full_audio, 24000) # Kokoro default sample rate is often 24000 + print(f"TTS audio saved to {os.path.basename(file_path)} (Kokoro)") + return file_path + except Exception as e: + print(f"Error with Kokoro TTS: {e}. Trying gTTS fallback.") + print(traceback.format_exc()) # Print full traceback for Kokoro errors + # Fall through to gTTS + # Fallback to gTTS try: - kokoro_voice = 'af_heart' if voice == 'en' else voice - generator = pipeline(text, voice=kokoro_voice, speed=0.9, split_pattern=r'\n+') - audio_segments = [] - for i, (gs, ps, audio) in enumerate(generator): - audio_segments.append(audio) - full_audio = np.concatenate(audio_segments) if len(audio_segments) > 1 else audio_segments[0] - sf.write(file_path, full_audio, 24000) - print(f"TTS audio saved to {file_path} (Kokoro)") + print("Using gTTS fallback...") + tts = gTTS(text=text, lang='en', slow=False) + # Save as mp3 first, then convert to wav + mp3_path = os.path.join(temp_dir, f"tts_{safe_text}_{uuid.uuid4().hex[:6]}.mp3") + tts.save(mp3_path) + + # Convert mp3 to wav using pydub + audio = AudioSegment.from_mp3(mp3_path) + # Set sample rate to match Kokoro if possible, otherwise use a standard rate + audio = audio.set_frame_rate(24000) + # Export as WAV + audio.export(file_path, format="wav") + os.remove(mp3_path) # Clean up the temporary mp3 file + print(f"Fallback TTS saved to {os.path.basename(file_path)} (gTTS)") return file_path + except ImportError as ie: + print(f"Error: gTTS or its dependency (pydub/ffmpeg) not installed? {ie}") + print("Skipping gTTS fallback.") + # Fall through to silence + except Exception as fallback_error: + print(f"gTTS fallback also failed: {fallback_error}") + print(traceback.format_exc()) + # Fall through to silence + + # Fallback to silent audio if all TTS methods fail + print("All TTS methods failed. Generating silent audio.") + duration = max(1.0, len(text.split()) * 0.4) # Estimate duration for silence + return generate_silent_audio(duration, temp_dir, sample_rate=24000) + +def generate_silent_audio(duration, temp_dir, sample_rate=24000): + """Generate a silent WAV audio file.""" + try: + num_samples = int(duration * sample_rate) + silence = np.zeros(num_samples, dtype=np.float32) + silent_path = os.path.join(temp_dir, f"silent_{uuid.uuid4().hex[:6]}.wav") + sf.write(silent_path, silence, sample_rate) + print(f"Silent audio generated: {os.path.basename(silent_path)} for {duration:.2f}s") + return silent_path except Exception as e: - print(f"Error with Kokoro TTS: {e}") - try: - print("Falling back to gTTS...") - tts = gTTS(text=text, lang='en') - mp3_path = os.path.join(TEMP_FOLDER, f"tts_{safe_text}.mp3") - tts.save(mp3_path) - audio = AudioSegment.from_mp3(mp3_path) - audio.export(file_path, format="wav") - os.remove(mp3_path) - print(f"Fallback TTS saved to {file_path} (gTTS)") - return file_path - except Exception as fallback_error: - print(f"Both TTS methods failed: {fallback_error}") - return generate_silent_audio(duration=max(3, len(text.split()) * 0.5)) + print(f"Error generating silent audio: {e}") + return None -def apply_kenburns_effect(clip, target_resolution, effect_type=None): - """Apply a smooth Ken Burns effect with a single movement pattern.""" +def apply_kenburns_effect(clip, target_resolution, effect_type="random"): + """Apply Ken Burns effect to an ImageClip.""" + print(f"Applying Ken Burns effect: {effect_type} to image clip") target_w, target_h = target_resolution - clip_aspect = clip.w / clip.h - target_aspect = target_w / target_h - if clip_aspect > target_aspect: - new_height = target_h - new_width = int(new_height * clip_aspect) - else: - new_width = target_w - new_height = int(new_width / clip_aspect) + # Ensure clip has dimensions, default if not (shouldn't happen with ImageClip) + clip_w = getattr(clip, 'w', target_w) + clip_h = getattr(clip, 'h', target_h) + if clip_w <= 0 or clip_h <= 0: + print(f"Warning: Invalid clip dimensions ({clip_w}x{clip_h}) for Ken Burns. Using target.") + clip_w, clip_h = target_w, target_h - clip = clip.resize(newsize=(new_width, new_height)) - base_scale = 1.15 - new_width = int(new_width * base_scale) - new_height = int(new_height * base_scale) - clip = clip.resize(newsize=(new_width, new_height)) + clip_aspect = clip_w / clip_h + target_aspect = target_w / target_h + + # --- Resize to cover target aspect ratio --- + if clip_aspect > target_aspect: # Image wider than target + new_h = target_h + new_w = int(new_h * clip_aspect) + else: # Image taller than target + new_w = target_w + new_h = int(new_w / clip_aspect) + # Ensure dimensions are at least target size + new_w = max(new_w, target_w) + new_h = max(new_h, target_h) + clip = clip.resize(newsize=(new_w, new_h)) + + # --- Further scale up for movement room --- + scale_factor = 1.15 # How much bigger to make it for panning/zooming + scaled_w = int(new_w * scale_factor) + scaled_h = int(new_h * scale_factor) + # Use ANTIALIAS for potentially better quality on downscale during resize + try: + clip = clip.resize(newsize=(scaled_w, scaled_h)) #, resample=Image.Resampling.LANCZOS) # Check Pillow version for LANCZOS + except Exception as resize_err: + print(f"Warning: High-quality resize failed ({resize_err}). Using default.") + clip = clip.resize(newsize=(scaled_w, scaled_h)) - max_offset_x = new_width - target_w - max_offset_y = new_height - target_h - available_effects = ["zoom-in", "zoom-out", "pan-left", "pan-right", "up-left"] - if effect_type is None or effect_type == "random": + # --- Define effect parameters --- + available_effects = ["zoom-in", "zoom-out", "pan-left", "pan-right", "pan-up", "pan-down", "diag-tl-br", "diag-tr-bl", "static"] + if effect_type == "random" or effect_type not in available_effects: effect_type = random.choice(available_effects) + print(f" -> Selected effect: {effect_type}") + + if effect_type == "static": # Option for no movement + return clip.resize((target_w, target_h)) # Just resize and center + + + zoom_amount = 0.10 # Percentage zoom + start_zoom, end_zoom = 1.0, 1.0 + # Start/end centers relative to the scaled image + start_cx, start_cy = scaled_w / 2, scaled_h / 2 + end_cx, end_cy = start_cx, start_cy if effect_type == "zoom-in": - start_zoom = 0.9 - end_zoom = 1.1 - start_center = (new_width / 2, new_height / 2) - end_center = start_center - elif effect_type == "zoom-out": - start_zoom = 1.1 - end_zoom = 0.9 - start_center = (new_width / 2, new_height / 2) - end_center = start_center - elif effect_type == "pan-left": start_zoom = 1.0 + end_zoom = 1.0 + zoom_amount + elif effect_type == "zoom-out": + start_zoom = 1.0 + zoom_amount end_zoom = 1.0 - start_center = (max_offset_x + target_w / 2, (max_offset_y // 2) + target_h / 2) - end_center = (target_w / 2, (max_offset_y // 2) + target_h / 2) + elif effect_type == "pan-left": + start_cx = scaled_w - target_w / 2 # Start right edge + end_cx = target_w / 2 # End left edge elif effect_type == "pan-right": - start_zoom = 1.0 - end_zoom = 1.0 - start_center = (target_w / 2, (max_offset_y // 2) + target_h / 2) - end_center = (max_offset_x + target_w / 2, (max_offset_y // 2) + target_h / 2) - elif effect_type == "up-left": - start_zoom = 1.0 - end_zoom = 1.0 - start_center = (max_offset_x + target_w / 2, max_offset_y + target_h / 2) - end_center = (target_w / 2, target_h / 2) - else: - raise ValueError(f"Unsupported effect_type: {effect_type}") - + start_cx = target_w / 2 # Start left edge + end_cx = scaled_w - target_w / 2 # End right edge + elif effect_type == "pan-up": + start_cy = scaled_h - target_h / 2 # Start bottom edge + end_cy = target_h / 2 # End top edge + elif effect_type == "pan-down": + start_cy = target_h / 2 # Start top edge + end_cy = scaled_h - target_h / 2 # End bottom edge + elif effect_type == "diag-tl-br": # Top-Left to Bottom-Right + start_cx, start_cy = target_w / 2, target_h / 2 + end_cx, end_cy = scaled_w - target_w / 2, scaled_h - target_h / 2 + elif effect_type == "diag-tr-bl": # Top-Right to Bottom-Left + start_cx, start_cy = scaled_w - target_w / 2, target_h / 2 + end_cx, end_cy = target_w / 2, scaled_h - target_h / 2 + + # --- Define the frame transformation function --- def transform_frame(get_frame, t): - frame = get_frame(t) + frame = get_frame(t) # Get the frame (full scaled image for ImageClip) + if not isinstance(frame, np.ndarray): frame = np.array(frame) # Ensure numpy array + if frame.ndim == 2: frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR) # Ensure 3 channels + + # Smooth interpolation (ease-in-out) ratio = t / clip.duration if clip.duration > 0 else 0 - ratio = 0.5 - 0.5 * math.cos(math.pi * ratio) - current_zoom = start_zoom + (end_zoom - start_zoom) * ratio + smooth_ratio = 0.5 - 0.5 * math.cos(math.pi * ratio) + + current_zoom = start_zoom + (end_zoom - start_zoom) * smooth_ratio + current_zoom = max(0.1, current_zoom) # Prevent zero/negative zoom + + # Calculate crop size based on target resolution and current zoom crop_w = int(target_w / current_zoom) crop_h = int(target_h / current_zoom) - current_center_x = start_center[0] + (end_center[0] - start_center[0]) * ratio - current_center_y = start_center[1] + (end_center[1] - start_center[1]) * ratio - min_center_x = crop_w / 2 - max_center_x = new_width - crop_w / 2 - min_center_y = crop_h / 2 - max_center_y = new_height - crop_h / 2 - current_center_x = max(min_center_x, min(current_center_x, max_center_x)) - current_center_y = max(min_center_y, min(current_center_y, max_center_y)) - cropped_frame = cv2.getRectSubPix(frame, (crop_w, crop_h), (current_center_x, current_center_y)) - resized_frame = cv2.resize(cropped_frame, (target_w, target_h), interpolation=cv2.INTER_LANCZOS4) - return resized_frame + # Ensure crop dimensions are valid and within the scaled image bounds + if crop_w <= 0 or crop_h <= 0 or crop_w > scaled_w or crop_h > scaled_h: + # Fallback: Center crop with no zoom if calculated size is invalid + print(f"Warning: Invalid Ken Burns crop size ({crop_w}x{crop_h}) at t={t:.2f}. Using fallback.") + crop_w = min(target_w, scaled_w) + crop_h = min(target_h, scaled_h) + center_x = scaled_w / 2 + center_y = scaled_h / 2 + else: + # Interpolate center position + current_cx = start_cx + (end_cx - start_cx) * smooth_ratio + current_cy = start_cy + (end_cy - start_cy) * smooth_ratio + + # Clamp center position to keep the crop box within the scaled image bounds + min_cx, max_cx = crop_w / 2, scaled_w - crop_w / 2 + min_cy, max_cy = crop_h / 2, scaled_h - crop_h / 2 + center_x = max(min_cx, min(current_cx, max_cx)) + center_y = max(min_cy, min(current_cy, max_cy)) + + # Extract the sub-pixel rectangle and resize + try: + # Use cv2.getRectSubPix for potentially smoother results + cropped_frame = cv2.getRectSubPix(frame, (int(round(crop_w)), int(round(crop_h))), (center_x, center_y)) + # Resize to the final target resolution using high-quality interpolation + resized_frame = cv2.resize(cropped_frame, (target_w, target_h), interpolation=cv2.INTER_LANCZOS4) + return resized_frame + except cv2.error as cv_err: + print(f"OpenCV Error during Ken Burns transform: {cv_err}") + print(f" Frame shape: {frame.shape}, crop_w/h: {crop_w}/{crop_h}, center: {center_x},{center_y}") + # Fallback: Simple resize of the original frame (might look jumpy) + return cv2.resize(frame, (target_w, target_h), interpolation=cv2.INTER_LINEAR) + except Exception as e: + print(f"Unexpected error during Ken Burns transform: {e}") + print(traceback.format_exc()) + return cv2.resize(frame, (target_w, target_h), interpolation=cv2.INTER_LINEAR) # Fallback + + # Apply the transformation return clip.fl(transform_frame) + def resize_to_fill(clip, target_resolution): - """Resize and crop a clip to fill the target resolution while maintaining aspect ratio.""" + """Resize and crop a video clip to fill the target resolution.""" target_w, target_h = target_resolution - clip_aspect = clip.w / clip.h + print(f"Resizing/cropping video clip to fill {target_w}x{target_h}") + + # Ensure clip has dimensions + if not hasattr(clip, 'w') or not hasattr(clip, 'h') or clip.w <= 0 or clip.h <= 0: + print("Error: Cannot resize video clip without valid dimensions.") + # Attempt to get dimensions from the first frame + try: + frame = clip.get_frame(0) + h, w = frame.shape[:2] + clip.w, clip.h = w, h + print(f"Manually set video clip dimensions to {w}x{h} for resize.") + if w <= 0 or h <= 0: raise ValueError("Invalid dimensions from frame.") + except Exception as e: + print(f"Failed to get/set dimensions: {e}. Returning original clip.") + return clip + + clip_w, clip_h = clip.w, clip.h + clip_aspect = clip_w / clip_h target_aspect = target_w / target_h - if clip_aspect > target_aspect: - clip = clip.resize(height=target_h) - crop_amount = (clip.w - target_w) / 2 - clip = clip.crop(x1=crop_amount, x2=clip.w - crop_amount, y1=0, y2=clip.h) - else: - clip = clip.resize(width=target_w) - crop_amount = (clip.h - target_h) / 2 - clip = clip.crop(x1=0, x2=clip.w, y1=crop_amount, y2=clip.h - crop_amount) - - return clip - -def find_mp3_files(): - """Search for any MP3 files in the current directory and subdirectories.""" + if abs(clip_aspect - target_aspect) < 0.01: # If aspect ratios are very close + print("Aspect ratios match. Resizing directly.") + return clip.resize((target_w, target_h)) + + elif clip_aspect > target_aspect: # Clip is wider than target + print("Clip is wider. Resizing height and cropping width.") + resized_clip = clip.resize(height=target_h) + # Calculate crop amount (total pixels to remove, divided by 2 for each side) + crop_x = (resized_clip.w - target_w) / 2 + if crop_x < 0: crop_x = 0 # Safety check + print(f"Cropping width: x1={crop_x:.1f}, x2={resized_clip.w - crop_x:.1f}") + final_clip = resized_clip.crop(x1=crop_x, x2=resized_clip.w - crop_x) + else: # Clip is taller than target + print("Clip is taller. Resizing width and cropping height.") + resized_clip = clip.resize(width=target_w) + # Calculate crop amount + crop_y = (resized_clip.h - target_h) / 2 + if crop_y < 0: crop_y = 0 # Safety check + print(f"Cropping height: y1={crop_y:.1f}, y2={resized_clip.h - crop_y:.1f}") + final_clip = resized_clip.crop(y1=crop_y, y2=resized_clip.h - crop_y) + + # Final check on dimensions (floating point issues might cause slight differences) + if final_clip.w != target_w or final_clip.h != target_h: + print(f"Warning: Final clip dimensions ({final_clip.w}x{final_clip.h}) after crop don't exactly match target ({target_w}x{target_h}). Forcing resize.") + final_clip = final_clip.resize((target_w, target_h)) + + return final_clip + +def find_mp3_files(start_dir='.'): + """Search for MP3 files recursively.""" mp3_files = [] - for root, dirs, files in os.walk('.'): + print(f"Searching for MP3 files in '{start_dir}' and subdirectories...") + for root, dirs, files in os.walk(start_dir): for file in files: - if file.endswith('.mp3'): + if file.lower().endswith('.mp3'): mp3_path = os.path.join(root, file) mp3_files.append(mp3_path) - print(f"Found MP3 file: {mp3_path}") + print(f"Found MP3: {mp3_path}") return mp3_files[0] if mp3_files else None -def add_background_music(final_video, bg_music_volume=0.08): - """Add background music to the final video using any MP3 file found.""" - try: - bg_music_path = find_mp3_files() - if bg_music_path and os.path.exists(bg_music_path): - print(f"Adding background music from: {bg_music_path}") - bg_music = AudioFileClip(bg_music_path) - if bg_music.duration < final_video.duration: - loops_needed = math.ceil(final_video.duration / bg_music.duration) - bg_segments = [bg_music] * loops_needed - bg_music = concatenate_audioclips(bg_segments) - bg_music = bg_music.subclip(0, final_video.duration) - bg_music = bg_music.volumex(bg_music_volume) - video_audio = final_video.audio - mixed_audio = CompositeAudioClip([video_audio, bg_music]) +def add_background_music(final_video, bg_music_path=None, bg_music_volume=0.08): + """Add background music to the final video.""" + print("Attempting to add background music...") + music_to_use = bg_music_path + + # If no path provided via upload, search for local MP3s + if not music_to_use or not os.path.exists(music_to_use): + print(f"BG music path '{bg_music_path}' not found or not given. Searching for local MP3s...") + music_to_use = find_mp3_files() # Search in current dir and subdirs + + if music_to_use and os.path.exists(music_to_use): + print(f"Using background music: {os.path.basename(music_to_use)}") + try: + bg_clip = AudioFileClip(music_to_use) + video_duration = final_video.duration + + # Ensure video has an audio track to mix with, create silent if not + if final_video.audio is None: + print("Warning: Input video has no audio track. Creating silent track for mixing.") + # Create a silent audio clip matching video duration + silent_audio = AudioSegment.silent(duration=int(video_duration * 1000), frame_rate=44100) + silent_path = os.path.join(os.path.dirname(music_to_use), f"temp_silent_{uuid.uuid4().hex[:6]}.wav") # Save near music + silent_audio.export(silent_path, format="wav") + video_audio_clip = AudioFileClip(silent_path) + final_video = final_video.set_audio(video_audio_clip) + # Clean up temporary silent file? Or leave it in temp dir. + # os.remove(silent_path) # Be careful if temp dir is cleaned later + else: + video_audio_clip = final_video.audio + + + # Loop or trim BG music to match video duration + if bg_clip.duration < video_duration: + loops_needed = math.ceil(video_duration / bg_clip.duration) + print(f"Looping background music {loops_needed} times.") + bg_clip = concatenate_audioclips([bg_clip] * loops_needed) + + # Trim precisely to video duration + bg_clip = bg_clip.subclip(0, video_duration) + + # Apply volume adjustment + bg_clip = bg_clip.volumex(bg_music_volume) + + # Mix audio tracks + mixed_audio = CompositeAudioClip([video_audio_clip, bg_clip]) final_video = final_video.set_audio(mixed_audio) - print("Background music added successfully") - else: - print("No MP3 files found, skipping background music") - return final_video - except Exception as e: - print(f"Error adding background music: {e}") - print("Continuing without background music") - return final_video + print(f"Background music added successfully (Volume: {bg_music_volume:.2f})") + + # Close the audio file clip resources + bg_clip.close() + if video_audio_clip != final_video.audio: # Close original video audio if replaced + video_audio_clip.close() + -def create_clip(media_path, asset_type, tts_path, duration=None, effects=None, narration_text=None, segment_index=0): - """Create a video clip with synchronized subtitles and narration.""" + except Exception as e: + print(f"Error processing or adding background music: {e}") + print(traceback.format_exc()) + print("Continuing without background music due to error.") + # Return the original video if mixing failed + return final_video + else: + print("No suitable background music file found or provided. Skipping.") + + return final_video + +def create_caption_clip(text, duration, target_resolution, options): + """Creates a moviepy TextClip for captions with styling.""" + target_w, target_h = target_resolution + settings = { + "fontsize": 40, + "font": 'Arial-Bold', # Ensure font is available on the system + "color": 'white', + "bg_color": 'rgba(0, 0, 0, 0.5)', # Semi-transparent black background + "stroke_color": 'black', + "stroke_width": 1.5, + "align": 'center', + "method": 'caption', # Use caption for automatic line breaking + "size": (target_w * 0.85, None), # Limit width to 85% of screen + "position": "bottom", # Default position keyword + **(options or {}) # Override defaults with provided options + } + + # Convert position keyword to coordinates + pos_keyword = settings["position"] + if pos_keyword == 'bottom': + # Position slightly above the bottom edge + y_pos = target_h * 0.90 - settings["fontsize"] # Adjust based on font size? + settings["position"] = ('center', y_pos) + elif pos_keyword == 'center': + settings["position"] = ('center', 'center') + elif pos_keyword == 'top': + y_pos = target_h * 0.10 + settings["position"] = ('center', y_pos) + # Allow specific tuple coordinates too + elif not isinstance(pos_keyword, (tuple, list)): + print(f"Warning: Unknown caption position keyword '{pos_keyword}'. Defaulting to bottom.") + settings["position"] = ('center', target_h * 0.90 - settings["fontsize"]) + + + print(f"Creating caption: '{text[:30]}...', Size: {settings['fontsize']}, Color: {settings['color']}, Pos: {settings['position']}") + + # Attempt to create the TextClip try: - print(f"Creating clip #{segment_index} with asset_type: {asset_type}, media_path: {media_path}") - if not os.path.exists(media_path) or not os.path.exists(tts_path): - print("Missing media or TTS file") - return None + # Remove position from settings dict before passing to TextClip if it's handled separately or method='caption' handles it + text_clip_args = {k: v for k, v in settings.items() if k != 'position'} + + txt_clip = TextClip(text, **text_clip_args) + txt_clip = txt_clip.set_position(settings["position"]) + txt_clip = txt_clip.set_duration(duration) + print("Caption TextClip created.") + return txt_clip + + except Exception as e: + print(f"ERROR creating TextClip: {e}") + # This often relates to ImageMagick issues (policy, installation, font availability) + print(" >> Check ImageMagick installation and policy configuration.") + print(" >> Ensure the specified font ('{settings['font']}') is installed and accessible.") + print(traceback.format_exc()) + # Return a dummy/empty clip or None to indicate failure + return None - audio_clip = AudioFileClip(tts_path).audio_fadeout(0.2) + +def create_clip(segment_data, temp_dir, target_resolution, caption_options=None, kenburns_effect="random"): + """Create a single video clip segment from state data.""" + idx = segment_data["index"] + media_path = segment_data["media_path"] + asset_type = segment_data["media_type"] + narration = segment_data["narration"] + segment_id = segment_data["segment_id"] + + print(f"\n--- Creating Clip {idx+1} ({segment_id}) ---") + print(f" Media: {os.path.basename(media_path)} ({asset_type})") + print(f" Narration: '{narration[:50]}...'") + + # Validate inputs + if not media_path or not os.path.exists(media_path): + print(f"Error: Media file not found for segment {idx+1}: {media_path}") + return None + if not narration: + print(f"Warning: Empty narration for segment {idx+1}. Generating silent TTS.") + # Fall through, generate_tts will handle empty string + + # 1. Generate TTS Audio + tts_path = generate_tts(narration, temp_dir) + if not tts_path: + print(f"Error: Failed to generate TTS for segment {idx+1}. Skipping clip.") + return None + + try: + audio_clip = AudioFileClip(tts_path) + # Add slight fade out to prevent abrupt cuts + audio_clip = audio_clip.audio_fadeout(0.1) audio_duration = audio_clip.duration - target_duration = audio_duration + 0.2 + # Ensure minimum clip duration (e.g., 1 second), add buffer for fades + target_duration = max(1.5, audio_duration + 0.2) + print(f" Audio duration: {audio_duration:.2f}s, Target clip duration: {target_duration:.2f}s") + except Exception as audio_err: + print(f"Error loading audio clip {os.path.basename(tts_path)}: {audio_err}") + return None # Cannot proceed without audio + + # 2. Create Video/Image Base Clip + base_clip = None + try: if asset_type == "video": - clip = VideoFileClip(media_path) - clip = resize_to_fill(clip, TARGET_RESOLUTION) + print(" Processing video asset...") + # target_resolution passed as (w, h), but VideoFileClip might expect (h, w) sometimes? Check docs. + # Let's assume target_resolution is (width, height) consistently. + clip = VideoFileClip(media_path) #, target_resolution=target_resolution[::-1]) # Try passing target res here? + clip = resize_to_fill(clip, target_resolution) # Resize/crop to fit target + + # Loop or trim video if clip.duration < target_duration: + print(f" Looping video (duration {clip.duration:.2f}s) to fit {target_duration:.2f}s") clip = clip.loop(duration=target_duration) else: - clip = clip.subclip(0, target_duration) + # Start from beginning for simplicity, could add random start + start_time = 0 + clip = clip.subclip(start_time, start_time + target_duration) + + # Apply fade in/out for smoother transitions between video clips + base_clip = clip.fadein(0.2).fadeout(0.2) + elif asset_type == "image": - img = Image.open(media_path) - if img.mode != 'RGB': - with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as temp: - img.convert('RGB').save(temp.name) - media_path = temp.name - img.close() + print(" Processing image asset...") + # ImageClip needs RGB, download_media should have handled conversion clip = ImageClip(media_path).set_duration(target_duration) - clip = apply_kenburns_effect(clip, TARGET_RESOLUTION) - clip = clip.fadein(0.3).fadeout(0.3) + # Apply Ken Burns effect + clip = apply_kenburns_effect(clip, target_resolution, effect_type=kenburns_effect) + # Apply fade in/out (can be longer for images) + base_clip = clip.fadein(0.4).fadeout(0.4) else: + print(f"Error: Unknown asset type '{asset_type}' for segment {idx+1}") + audio_clip.close() # Close audio resource return None - if narration_text and CAPTION_COLOR != "transparent": - try: - words = narration_text.split() - chunks = [] - current_chunk = [] - for word in words: - current_chunk.append(word) - if len(current_chunk) >= 5: - chunks.append(' '.join(current_chunk)) - current_chunk = [] - if current_chunk: - chunks.append(' '.join(current_chunk)) - - chunk_duration = audio_duration / len(chunks) - subtitle_clips = [] - subtitle_y_position = int(TARGET_RESOLUTION[1] * 0.70) - - for i, chunk_text in enumerate(chunks): - start_time = i * chunk_duration - end_time = (i + 1) * chunk_duration - txt_clip = TextClip( - chunk_text, - fontsize=45, - font='Arial-Bold', - color=CAPTION_COLOR, - bg_color='rgba(0, 0, 0, 0.25)', - method='caption', - align='center', - stroke_width=2, - stroke_color=CAPTION_COLOR, - size=(TARGET_RESOLUTION[0] * 0.8, None) - ).set_start(start_time).set_end(end_time) - txt_clip = txt_clip.set_position(('center', subtitle_y_position)) - subtitle_clips.append(txt_clip) - - clip = CompositeVideoClip([clip] + subtitle_clips) - except Exception as sub_error: - print(f"Subtitle error: {sub_error}") - txt_clip = TextClip( - narration_text, - fontsize=28, - color=CAPTION_COLOR, - align='center', - size=(TARGET_RESOLUTION[0] * 0.7, None) - ).set_position(('center', int(TARGET_RESOLUTION[1] / 3))).set_duration(clip.duration) - clip = CompositeVideoClip([clip, txt_clip]) - - clip = clip.set_audio(audio_clip) - print(f"Clip created: {clip.duration:.1f}s") - return clip - except Exception as e: - print(f"Error in create_clip: {str(e)}") + if base_clip is None: # Should not happen if logic is correct + raise ValueError("Base clip creation failed unexpectedly.") + + # Ensure base_clip has the target dimensions after processing + if base_clip.w != target_resolution[0] or base_clip.h != target_resolution[1]: + print(f"Warning: Base clip dimensions ({base_clip.w}x{base_clip.h}) don't match target after processing. Forcing resize.") + base_clip = base_clip.resize(target_resolution) + + + except Exception as visual_err: + print(f"Error processing visual media {os.path.basename(media_path)}: {visual_err}") + print(traceback.format_exc()) + audio_clip.close() + if base_clip: base_clip.close() # Close if partially created return None -def fix_imagemagick_policy(): - """Fix ImageMagick security policies.""" + # 3. Add Captions (if enabled and text exists) + final_clip = base_clip + if caption_options and caption_options.get("enabled", "No") == "Yes" and narration: + print(" Adding captions...") + caption_clip = create_caption_clip(narration, base_clip.duration, target_resolution, caption_options) + if caption_clip: + # Composite the base video/image and the caption text + final_clip = CompositeVideoClip([base_clip, caption_clip]) + print(" Captions added successfully.") + else: + print(" Warning: Failed to create caption clip. Proceeding without captions for this segment.") + final_clip = base_clip # Use the base clip without captions + + # 4. Set Audio try: - print("Attempting to fix ImageMagick security policies...") - policy_paths = [ - "/etc/ImageMagick-6/policy.xml", - "/etc/ImageMagick-7/policy.xml", - "/etc/ImageMagick/policy.xml", - "/usr/local/etc/ImageMagick-7/policy.xml" - ] - found_policy = next((path for path in policy_paths if os.path.exists(path)), None) - if not found_policy: - print("No policy.xml found. Using alternative subtitle method.") - return False - print(f"Modifying policy file at {found_policy}") - os.system(f"sudo cp {found_policy} {found_policy}.bak") - os.system(f"sudo sed -i 's/rights=\"none\"/rights=\"read|write\"/g' {found_policy}") - os.system(f"sudo sed -i 's/]*>/]*>//g' {found_policy}") - print("ImageMagick policies updated successfully.") - return True - except Exception as e: - print(f"Error fixing policies: {e}") - return False + final_clip = final_clip.set_audio(audio_clip) + print(f"Clip {idx+1} created successfully. Duration: {final_clip.duration:.2f}s") + return final_clip + except Exception as set_audio_err: + print(f"Error setting audio for clip {idx+1}: {set_audio_err}") + # Clean up resources + final_clip.close() + audio_clip.close() + return None + + +# --- Gradio UI and State Management --- + +# Helper to create the UI row for editing a single segment +def create_segment_editor_row(segment_data, temp_dir): + """Creates Gradio components for one segment editor row.""" + idx = segment_data["index"] + segment_id = segment_data["segment_id"] + is_video = segment_data["media_type"] == "video" + media_label = f"Segment {idx+1}: Media ({segment_data['media_type']})" + + with gr.Blocks(): # Use Blocks to encapsulate the row structure + with gr.Row(variant="panel", elem_id=f"segment-row-{segment_id}"): + with gr.Column(scale=2): # Media preview and upload + # Use Video component for videos, Image for images + if is_video: + media_preview = gr.Video(label=media_label, value=segment_data["media_path"], interactive=False, height=200) + else: + media_preview = gr.Image(label=media_label, value=segment_data["media_path"], interactive=False, height=200, type="filepath") + + upload_btn = gr.UploadButton("Change Media", file_types=["image", "video"], scale=1) + + with gr.Column(scale=3): # Narration editor + narration_editor = gr.Textbox( + label=f"Segment {idx+1}: Narration (Prompt: '{segment_data['prompt']}')", + value=segment_data["narration"], + lines=5, + interactive=True, + elem_id=f"narration-edit-{segment_id}" # Unique ID for updates + ) + # Display original prompt for reference + # gr.Markdown(f"Original Prompt: `{segment_data['prompt']}`") + + # Return the interactive components and preview component for potential updates + return narration_editor, media_preview, upload_btn + + +# --- Main Gradio App Definition --- +with gr.Blocks(theme=gr.themes.Soft(), title="AI Documentary Generator") as demo: + # --- State Variables --- + # app_state: Holds the list of segment dictionaries + # [ { "segment_id": "...", "index": ..., "prompt": ..., "narration": ..., ... }, ... ] + app_state = gr.State([]) + # temp_dir_state: Holds the path to the temporary directory for the current run + temp_dir_state = gr.State(None) + # ui_state: Holds references to dynamic UI components if needed for updates (Advanced) + # ui_state = gr.State({}) # { segment_id: {"narration_comp": ..., "media_comp": ...}, ... } + + # --- UI Layout --- + gr.Markdown("# AI Documentary Video Generator (Enhanced)") + gr.Markdown("Create humorous documentary-style videos. Enter a concept, edit the AI's script & visuals, customize, and generate!") + + with gr.Row(): + # --- Left Column: Inputs & Controls --- + with gr.Column(scale=1): + gr.Markdown("## 1. Concept & Script") + concept_input = gr.Textbox( + label="Video Concept / Topic", + placeholder="e.g., funny facts about cats, the history of pizza, why squirrels are plotting world domination", + lines=2 + ) + video_ratio_slider = gr.Slider( + 0, 1, value=0.3, step=0.05, + label="Video Clip Preference", + info="Influences % of clips attempted as video vs. image (0=images only, 1=videos only)" + ) + generate_script_btn = gr.Button("Generate Script & Visuals", variant="primary", icon="✨") + + gr.Markdown("## 3. Customization") + with gr.Accordion("Video & Audio Settings", open=False): + resolution_input = gr.Radio( + ["Full HD (1920x1080)", "Vertical Short (1080x1920)"], + label="Target Resolution", value="Full HD (1920x1080)" + ) + kenburns_select = gr.Dropdown( + ["random", "zoom-in", "zoom-out", "pan-left", "pan-right", "pan-up", "pan-down", "diag-tl-br", "diag-tr-bl", "static"], + label="Image Movement (Ken Burns)", value="random" + ) + bg_music_upload = gr.File(label="Optional Background Music (MP3/WAV)", file_types=[".mp3", ".wav", ".aac", ".ogg"]) + bg_music_volume = gr.Slider(0, 0.5, value=0.08, step=0.01, label="BG Music Volume") + + with gr.Accordion("Caption Settings", open=False): + caption_enable_radio = gr.Radio(["Yes", "No"], label="Enable Captions", value="Yes") + caption_font_size = gr.Slider(12, 80, value=44, step=1, label="Font Size") + caption_font_color = gr.ColorPicker(label="Font Color", value="#FFFFFF") # White + caption_bg_color = gr.ColorPicker(label="Background Color (RGBA)", value="#00000080") # Black 50% alpha + caption_position = gr.Dropdown(["bottom", "center", "top"], label="Vertical Position", value="bottom") + # Advanced: Font selection (requires knowing available fonts) + # caption_font = gr.Dropdown(["Arial-Bold", "Impact", "Comic Sans MS"], label="Font", value="Arial-Bold") # Example + + generate_video_btn = gr.Button("Generate Final Video", variant="primary", icon="🎬", interactive=False) # Disabled initially + + + # --- Right Column: Status, Editors, Output --- + with gr.Column(scale=2): + status_update = gr.Markdown("Status: Waiting for concept...") + + gr.Markdown("## 2. Edit Segments") + gr.Markdown("Review the AI-generated narration and visuals below. Edit text directly and use 'Change Media' to upload your own image or video for any segment.") + # This column will be populated dynamically with segment editors + segment_editors_area = gr.Column(elem_id="segment-editors-area") + + gr.Markdown("## 4. Output") + final_video_output = gr.Video(label="Generated Video", interactive=False) + cleanup_message = gr.Markdown("") + + + # --- Event Handlers --- + + # Function triggered by "Generate Script & Visuals" button + def handle_script_generation(concept, video_ratio, current_temp_dir): + print("\n--- Step 1: Generating Script & Initial Visuals ---") + if not concept: + return { status_update: gr.update(value="Status: Please enter a video concept.") } + + # Clean up previous run's temp dir if it exists + if current_temp_dir and os.path.isdir(current_temp_dir): + print(f"Cleaning up previous temporary directory: {current_temp_dir}") + shutil.rmtree(current_temp_dir, ignore_errors=True) + + # Create a new unique temporary directory for this run + temp_dir = tempfile.mkdtemp(prefix="aivideo_") + print(f"Created temporary directory: {temp_dir}") + + status_msg = "Status: Generating script..." + yield { + status_update: gr.update(value=status_msg), + segment_editors_area: gr.update(value=None), # Clear previous editors + final_video_output: gr.update(value=None), # Clear previous video + cleanup_message: gr.update(value=""), + generate_video_btn: gr.update(interactive=False) # Disable final generate btn + } + + script_text = generate_script(concept) + if not script_text or script_text.startswith("Error:"): + shutil.rmtree(temp_dir, ignore_errors=True) # Clean up failed run temp dir + yield { + status_update: gr.update(value=f"Status: Script Generation Failed. {script_text}"), + temp_dir_state: None + } + return + + status_msg = "Status: Script generated. Parsing segments..." + yield { status_update: gr.update(value=status_msg) } + + elements = parse_script(script_text) + if not elements: + shutil.rmtree(temp_dir, ignore_errors=True) + yield { + status_update: gr.update(value="Status: Error parsing script. No segments found."), + temp_dir_state: None + } + return + + num_segments = len(elements) // 2 + status_msg = f"Status: Parsed {num_segments} segments. Generating initial media previews (this may take a while)..." + yield { status_update: gr.update(value=status_msg) } + + # --- Create Initial State (Generate media for each segment) --- + initial_state = [] + segment_map = {} + for elem in elements: # Group by segment_id + s_id = elem.get("segment_id") + if s_id: + if s_id not in segment_map: segment_map[s_id] = {} + segment_map[s_id][elem["type"]] = elem + + processed_segments = 0 + for idx, (s_id, types) in enumerate(segment_map.items()): + if "media" in types and "tts" in types: + media_elem = types["media"] + tts_elem = types["tts"] + prompt = media_elem['prompt'] + + # Simple check for news-related prompts + is_news_prompt = any(kw in prompt.lower() for kw in ["news", "breaking", "report", "update"]) + + # Generate initial media suggestion + media_asset = generate_media(prompt, temp_dir, video_ratio, is_news=is_news_prompt) + + if media_asset: + segment_data = { + "segment_id": s_id, + "index": idx, + "prompt": prompt, + "narration": tts_elem["text"], + "original_narration": tts_elem["text"], + "duration": tts_elem["duration"], # Keep initial estimate, recalculate if needed + "media_path": media_asset["path"], + "media_type": media_asset["asset_type"], + "original_media_path": media_asset["path"], + "user_uploaded": False, + "source": media_asset.get("source", "unknown") # Track where media came from + } + initial_state.append(segment_data) + processed_segments += 1 + status_msg = f"Status: Generated media for segment {processed_segments}/{num_segments} ('{prompt[:20]}...')" + yield { status_update: gr.update(value=status_msg) } + else: + print(f"Warning: Failed to get initial media for segment {idx+1} (Prompt: {prompt}). Skipping segment.") + status_msg = f"Status: Failed media for segment {idx+1}/{num_segments}. Skipping." + yield { status_update: gr.update(value=status_msg) } + time.sleep(0.5) # Pause briefly on failure -# ---------------- Main Function with Gradio Integration ---------------- # -def generate_video(user_input, resolution, caption_option): - """Generate a video based on user input via Gradio.""" - global TARGET_RESOLUTION, CAPTION_COLOR, TEMP_FOLDER - import shutil - - # Set resolution - if resolution == "Full": - TARGET_RESOLUTION = (1920, 1080) - elif resolution == "Short": - TARGET_RESOLUTION = (1080, 1920) - else: - TARGET_RESOLUTION = (1920, 1080) # Default + else: + print(f"Warning: Incomplete segment data for {s_id}. Skipping.") + + + if not initial_state: + shutil.rmtree(temp_dir, ignore_errors=True) + yield { + status_update: gr.update(value="Status: Error generating initial media or no valid segments found. Please try a different concept."), + temp_dir_state: None + } + return + + + # --- Dynamically Create Editor UI --- + print(f"Creating UI for {len(initial_state)} segments...") + # We need to build the UI components *within* this function context + # so we can wire them up correctly. + ui_components = {} # To store references if needed, e.g., for updates + with gr.Blocks() as editor_ui_block: # Create a temporary Blocks context to build the UI + for segment_data in initial_state: + s_id = segment_data["segment_id"] + narration_comp, media_comp, upload_comp = create_segment_editor_row(segment_data, temp_dir) + ui_components[s_id] = {"narration": narration_comp, "media": media_comp, "upload": upload_comp} + + # --- Wire up event handlers --- + # Use partial to pass segment_id and potentially component references + # Narration Change: + narration_comp.change( + fn=handle_narration_change, + inputs=[narration_comp, app_state], # Pass the component itself and state + outputs=[app_state], # Output updated state + # Pass segment_id using _js trick or find another way if needed + # This might require restructuring state or using elem_id lookup + # Let's assume handle_narration_change can find the segment by component ref or value for now + # A cleaner way might be adding segment_id as a hidden component in the row + ) + # Media Upload: + upload_comp.upload( + fn=partial(handle_media_upload, segment_id=s_id, temp_dir=temp_dir), # Pass s_id and temp_dir + inputs=[upload_comp, app_state], # Pass upload component and state + outputs=[app_state, media_comp], # Update state AND the preview component + ) + + status_msg = f"Status: Ready for editing. {len(initial_state)} segments loaded." + print("Segment editors created and events wired.") + + # Return the updates: new state, temp_dir, status, the dynamically created UI, and enable final button + yield { + app_state: initial_state, + temp_dir_state: temp_dir, + status_update: gr.update(value=status_msg), + segment_editors_area: gr.update(value=editor_ui_block), # Replace area content with new UI + generate_video_btn: gr.update(interactive=True) # Enable final generation button + } + + + # Handler for narration textbox change + def handle_narration_change(new_narration, current_app_state, evt: gr.EventData): + # --- Find the segment associated with the changed component --- + # This is tricky. Gradio's event data might not easily give the segment_id. + # Option 1: Use elem_id if accessible via evt.target? (Needs verification) + # Option 2: Iterate state and match original narration? (Brittle if user edits slightly) + # Option 3: Add a hidden gr.Textbox(value=segment_id) in the row and include it in inputs. (Most robust) + + # --- Simplified Approach (Less Robust): Assume order or match text --- + # This needs improvement for robustness, using a hidden ID is better. + # Let's *assume* we can get the index or ID somehow. For now, print warning. + print(f"Narration changed to: '{new_narration[:30]}...'") + print("Warning: Linking narration change to specific segment state needs a robust ID mechanism (e.g., hidden component).") + # Placeholder: Find segment by original text (prone to errors) + found_segment = None + for segment in current_app_state: + # This matching is weak. Need a better way. + if segment["original_narration"] == new_narration: # Unlikely to work well + found_segment = segment + break + + if found_segment: + print(f"Updating narration for segment {found_segment['segment_id']}") + found_segment["narration"] = new_narration + else: + print("Could not reliably link narration change to state segment.") + + + # Return the potentially modified state + return current_app_state + + + # Handler for media upload button + def handle_media_upload(uploaded_file, current_app_state, segment_id, temp_dir): + print(f"\nMedia uploaded for segment: {segment_id}") + if uploaded_file is None: + print("Upload event triggered but file is None.") + # Need to return original state and original media preview value + target_segment = next((s for s in current_app_state if s["segment_id"] == segment_id), None) + original_media_path = target_segment["original_media_path"] if target_segment else None + # Determine if original was video or image to return correct update type + is_video = target_segment["media_type"] == "video" if target_segment else False + return current_app_state, gr.Video.update(value=original_media_path) if is_video else gr.Image.update(value=original_media_path) + + + # Find the segment in the state + target_segment = None + for segment in current_app_state: + if segment["segment_id"] == segment_id: + target_segment = segment + break + + if not target_segment: + print(f"Error: Could not find segment {segment_id} in state to update media.") + # Return original state and no change to media preview (or handle error state) + # This requires knowing the original preview value, which is complex here. + # Simplification: Return state, let UI potentially be out of sync on error. + return current_app_state, gr.update() # No change update + + + # Process the uploaded file + original_file_path = uploaded_file.name # Gradio provides a temp path + file_name = os.path.basename(original_file_path) + file_ext = os.path.splitext(file_name)[1].lower() + save_path = os.path.join(temp_dir, f"user_upload_{segment_id}{file_ext}") + + print(f"Copying uploaded file '{file_name}' to '{os.path.basename(save_path)}'") + try: + shutil.copy(original_file_path, save_path) + except Exception as e: + print(f"Error copying uploaded file: {e}") + # Return original state and no preview update + is_video = target_segment["media_type"] == "video" + return current_app_state, gr.Video.update(value=target_segment["media_path"]) if is_video else gr.Image.update(value=target_segment["media_path"]) + + + # Update the segment state + target_segment["media_path"] = save_path + target_segment["user_uploaded"] = True + if file_ext in ['.mp4', '.mov', '.avi', '.webm', '.mkv']: + target_segment["media_type"] = "video" + print("Media type set to VIDEO") + media_update = gr.Video.update(value=save_path) # Update Video component + elif file_ext in ['.jpg', '.jpeg', '.png', '.webp', '.bmp', '.gif']: + target_segment["media_type"] = "image" + print("Media type set to IMAGE") + # Validate/convert uploaded image if necessary + validated_path = download_image(save_path, save_path) # Use download_image for validation/conversion + if validated_path: + target_segment["media_path"] = validated_path # Update path if converted + media_update = gr.Image.update(value=validated_path) # Update Image component + else: + print("Uploaded image failed validation/conversion. Reverting state.") + # Revert state changes + target_segment["media_path"] = target_segment["original_media_path"] # Or previous path if edits allowed + target_segment["user_uploaded"] = False + target_segment["media_type"] = "video" if target_segment["original_media_path"].lower().endswith(('.mp4', '.mov')) else "image" + # Return original preview value + is_video = target_segment["media_type"] == "video" + media_update = gr.Video.update(value=target_segment["media_path"]) if is_video else gr.Image.update(value=target_segment["media_path"]) + else: + print(f"Warning: Unknown uploaded file type '{file_ext}'. Assuming image.") + target_segment["media_type"] = "image" # Default assumption + media_update = gr.Image.update(value=save_path) - # Set caption color - CAPTION_COLOR = "white" if caption_option == "Yes" else "transparent" - # Create a unique temporary folder - TEMP_FOLDER = tempfile.mkdtemp() + print(f"Segment {segment_id} state updated with new media: {os.path.basename(target_segment['media_path'])}") - # Fix ImageMagick policy - fix_success = fix_imagemagick_policy() - if not fix_success: - print("Will use alternative methods if needed") + # Return the updated state and the update for the media preview component + return current_app_state, media_update - print("Generating script from API...") - script = generate_script(user_input) - if not script: - print("Failed to generate script.") - shutil.rmtree(TEMP_FOLDER) - return None - print("Generated Script:\n", script) - elements = parse_script(script) - if not elements: - print("Failed to parse script into elements.") - shutil.rmtree(TEMP_FOLDER) - return None - print(f"Parsed {len(elements)//2} script segments.") - paired_elements = [] - for i in range(0, len(elements), 2): - if i + 1 < len(elements): - paired_elements.append((elements[i], elements[i + 1])) + # Handler for "Generate Final Video" button + def handle_final_generation(current_app_state, temp_dir, + resolution_str, kenburns_style, + bg_music_file, bg_volume, + caption_enabled, cap_font_size, cap_font_color, cap_bg_color, cap_position): + print("\n--- Step 3: Generating Final Video ---") + start_time = time.time() - if not paired_elements: - print("No valid script segments found.") - shutil.rmtree(TEMP_FOLDER) - return None + if not current_app_state: + yield { status_update: gr.update(value="Status: No script data loaded. Please generate script first.") } + return + if not temp_dir or not os.path.isdir(temp_dir): + yield { status_update: gr.update(value="Status: Error - Temporary directory missing or invalid.") } + return - clips = [] - for idx, (media_elem, tts_elem) in enumerate(paired_elements): - print(f"\nProcessing segment {idx+1}/{len(paired_elements)} with prompt: '{media_elem['prompt']}'") - media_asset = generate_media(media_elem['prompt'], current_index=idx, total_segments=len(paired_elements)) - if not media_asset: - print(f"Skipping segment {idx+1} due to missing media asset.") - continue - tts_path = generate_tts(tts_elem['text'], tts_elem['voice']) - if not tts_path: - print(f"Skipping segment {idx+1} due to TTS generation failure.") - continue - clip = create_clip( - media_path=media_asset['path'], - asset_type=media_asset['asset_type'], - tts_path=tts_path, - duration=tts_elem['duration'], - effects=media_elem.get('effects', 'fade-in'), - narration_text=tts_elem['text'], - segment_index=idx - ) - if clip: - clips.append(clip) + status_msg = f"Status: Starting final video generation for {len(current_app_state)} segments..." + yield { status_update: gr.update(value=status_msg), final_video_output: gr.update(value=None) } + + # --- Prepare Configuration --- + if "Vertical Short" in resolution_str: + target_resolution = (1080, 1920) else: - print(f"Clip creation failed for segment {idx+1}.") + target_resolution = (1920, 1080) # Default Full HD + + caption_options = { + "enabled": caption_enabled, # "Yes" or "No" + "fontsize": cap_font_size, + "color": cap_font_color, + "bg_color": cap_bg_color, # Pass RGBA string + "position": cap_position, + # Add other fixed options if needed + "font": 'Arial-Bold', # Or make this configurable + "stroke_color": '#000000', # Black stroke + "stroke_width": 1.5, + } + + bg_music_path = bg_music_file.name if bg_music_file else None # Get path from Gradio file object + + # --- Process Clips --- + clips = [] + total_segments = len(current_app_state) + for i, segment_data in enumerate(current_app_state): + status_msg = f"Status: Processing segment {i+1}/{total_segments} ('{segment_data['prompt'][:25]}...')" + yield { status_update: gr.update(value=status_msg) } + + # Pass all necessary data to create_clip + clip = create_clip( + segment_data=segment_data, + temp_dir=temp_dir, + target_resolution=target_resolution, + caption_options=caption_options, + kenburns_effect=kenburns_style + ) + + if clip: + clips.append(clip) + print(f"Segment {i+1} clip added.") + else: + print(f"Warning: Failed to create clip for segment {i+1}. Skipping.") + # Attempt to continue without the failed clip - if not clips: - print("No clips were successfully created.") - shutil.rmtree(TEMP_FOLDER) - return None - print("\nConcatenating clips...") - final_video = concatenate_videoclips(clips, method="compose") - final_video = add_background_music(final_video, bg_music_volume=0.08) - - print(f"Exporting final video to {OUTPUT_VIDEO_FILENAME}...") - final_video.write_videofile(OUTPUT_VIDEO_FILENAME, codec='libx264', fps=24, preset='veryfast') - print(f"Final video saved as {OUTPUT_VIDEO_FILENAME}") - - # Clean up - print("Cleaning up temporary files...") - shutil.rmtree(TEMP_FOLDER) - print("Temporary files removed.") - - return OUTPUT_VIDEO_FILENAME - -# ---------------- Gradio Interface ---------------- # -iface = gr.Interface( - fn=generate_video, - inputs=[ - gr.Textbox(label="Video Concept", placeholder="Enter your video concept here..."), - gr.Radio(["Full", "Short"], label="Resolution", value="Full"), - gr.Radio(["Yes", "No"], label="Captions", value="Yes") - ], - outputs=gr.Video(label="Generated Video"), - title="AI Documentary Video Generator", - description="Create a funny documentary-style video based on your concept. Note: Generation may take several minutes on CPU." -) + if not clips: + yield { status_update: gr.update(value="Status: Error - No valid clips were created. Video generation failed.") } + # Consider cleanup? + # if temp_dir and os.path.isdir(temp_dir): shutil.rmtree(temp_dir, ignore_errors=True) + # yield { temp_dir_state: None } + return + + # --- Concatenate & Finalize --- + status_msg = f"Status: Concatenating {len(clips)} clips..." + yield { status_update: gr.update(value=status_msg) } -# Launch the interface -iface.launch(share=True) \ No newline at end of file + final_video = None + try: + # Use compose method for potentially better handling of varying clip sizes/fps? Check docs. + final_video = concatenate_videoclips(clips, method="compose") + print("Clips concatenated successfully.") + except Exception as concat_err: + print(f"Error during video concatenation: {concat_err}") + print(traceback.format_exc()) + # Attempt cleanup of individual clips before erroring + for c in clips: + try: c.close() + except: pass + yield { status_update: gr.update(value=f"Status: Error during video concatenation: {concat_err}") } + return # Stop generation + + # Add background music (if provided and concatenation succeeded) + if final_video: + status_msg = "Status: Adding background music (if provided)..." + yield { status_update: gr.update(value=status_msg) } + final_video = add_background_music(final_video, bg_music_path, bg_volume) + + # Export Final Video + timestamp = time.strftime("%Y%m%d_%H%M%S") + output_filename = f"{OUTPUT_VIDEO_FILENAME_BASE}_{timestamp}.mp4" + # Save outside the temp dir for easier user access + final_output_path = os.path.abspath(output_filename) # Save in script's directory + + status_msg = f"Status: Exporting final video to {output_filename} (this may take time)..." + yield { status_update: gr.update(value=status_msg) } + + try: + print(f"Writing final video to: {final_output_path}") + # Use recommended settings for web compatibility & performance + final_video.write_videofile( + final_output_path, + codec='libx264', # Good quality/compatibility codec + audio_codec='aac', # Standard audio codec + temp_audiofile=os.path.join(temp_dir, f'temp_audio_{timestamp}.aac'), # Explicit temp audio file + preset='medium', # 'medium' or 'fast' for balance, 'ultrafast' for speed + ffmpeg_params=[ # Ensure compatibility + '-pix_fmt', 'yuv420p', + '-profile:v', 'high', + '-level', '4.0', # Broad compatibility level + # '-tune', 'fastdecode', # Optional: optimize for playback speed + '-movflags', '+faststart' # Important for web streaming + ], + threads=max(1, os.cpu_count() // 2), # Use multiple threads + logger='bar', # Show progress bar + fps=24 # Standard FPS + ) + final_duration = final_video.duration + print(f"Final video exported successfully ({final_duration:.1f}s).") + + except Exception as write_err: + print(f"Error writing final video file: {write_err}") + print(traceback.format_exc()) + yield { status_update: gr.update(value=f"Status: Error writing video file: {write_err}") } + # Don't delete temp dir on write error, user might want intermediate files + return # Stop generation + finally: + # --- Resource Cleanup --- + print("Closing video clips...") + if final_video: + try: final_video.close() + except: pass + for c in clips: + try: c.close() + except: pass + # Close any opened audio files explicitly if needed (AudioFileClip handles this mostly) + + + end_time = time.time() + total_time = end_time - start_time + status_msg = f"Status: Video generation complete! Saved as {output_filename} ({final_duration:.1f}s). Total time: {total_time:.1f}s." + cleanup_msg_text = f"Temporary files are in: {temp_dir}\n(You can manually delete this folder later)" + + # --- Optional: Auto-cleanup --- + # print(f"Cleaning up temporary directory: {temp_dir}") + # shutil.rmtree(temp_dir, ignore_errors=True) + # cleanup_msg_text = "Temporary files automatically cleaned up." + # temp_dir_state_update = None # Clear state if cleaned up + + yield { + status_update: gr.update(value=status_msg), + final_video_output: gr.update(value=final_output_path), # Show the final video + cleanup_message: gr.update(value=cleanup_msg_text), + # temp_dir_state: temp_dir_state_update # Update temp dir state if cleaned + } + + + # --- Wire UI component events to handler functions --- + + # Generate Script Button + generate_script_btn.click( + fn=handle_script_generation, + inputs=[concept_input, video_ratio_slider, temp_dir_state], # Pass current temp_dir for cleanup + outputs=[app_state, temp_dir_state, status_update, segment_editors_area, generate_video_btn] # Update state, temp_dir, status, editor UI, final button interactivity + ) + + # Final Generate Button + generate_video_btn.click( + fn=handle_final_generation, + inputs=[ + app_state, temp_dir_state, + resolution_input, kenburns_select, + bg_music_upload, bg_music_volume, + caption_enable_radio, caption_font_size, caption_font_color, cap_bg_color, caption_position + ], + outputs=[status_update, final_video_output, cleanup_message] # Update status, output video, cleanup msg + ) + + # Note: Event handlers for dynamic components (narration change, media upload) + # are wired inside the `handle_script_generation` function when the components are created. + + +# --- Optional: Attempt ImageMagick Policy Fix on Startup --- +# Run this only if you consistently have caption rendering issues. +# May require running the script with sudo or manual execution of the commands. +# fix_imagemagick_policy() + + +# --- Launch the Gradio App --- +if __name__ == "__main__": + print("Launching Gradio App...") + # share=True exposes it publicly via Gradio's tunneling service. Remove if running locally only. + # debug=True provides more detailed error messages in the console. + demo.launch(share=True, debug=True) \ No newline at end of file