diff --git "a/app.py" "b/app.py" --- "a/app.py" +++ "b/app.py" @@ -1,926 +1,2059 @@ -# Import necessary libraries -from kokoro import KPipeline + +from kokoro import KPipeline # Keep Kokoro separate as it's not from moviepy + import soundfile as sf import torch -import os + +from PIL import Image +import tempfile +import random +import cv2 +import math +import os, requests, io, time, re, random from moviepy.editor import ( - VideoFileClip, concatenate_videoclips, AudioFileClip, ImageClip, - CompositeVideoClip, TextClip, concatenate_audioclips + VideoFileClip, # Corrected typo here + concatenate_videoclips, + AudioFileClip, + ImageClip, + CompositeVideoClip, + TextClip, + CompositeAudioClip, + ColorClip # Included ColorClip in the main import ) -import moviepy.video.fx.all as vfx +import moviepy.video.fx.all as vfx # Keep this separate for fx effects import moviepy.config as mpy_config from pydub import AudioSegment -from PIL import Image, ImageDraw, ImageFont +from pydub.generators import Sine + import numpy as np from bs4 import BeautifulSoup import base64 from urllib.parse import quote -import pysrt +# pysrt is imported but not used in the provided code snippets, keeping for completeness +# import pysrt from gtts import gTTS -import gradio as gr -import tempfile -import random -import cv2 -import math -import requests -import time -import re -import shutil +import gradio as gr # Import Gradio +import shutil # Needed for temp folder cleanup +import subprocess # Needed for sudo commands in fix_imagemagick_policy + # Initialize Kokoro TTS pipeline (using American English) -pipeline = KPipeline(lang_code='a') # Use voice 'af_heart' for American English -# Ensure ImageMagick binary is set -mpy_config.change_settings({"IMAGEMAGICK_BINARY": "/usr/bin/convert"}) +# Ensure you have the required voice models downloaded for Kokoro if needed, +# or it will fall back to gTTS. 'a' for American English uses voice 'af_heart'. +# Add a flag to check if Kokoro initialized successfully +kokoro_initialized = False +pipeline = None # Initialize pipeline to None +try: + # Check if the required voice model is available or if it needs downloading + # Depending on Kokoro version/setup, this might implicitly check/download + # If Kokoro initialization itself is problematic, this try/except will catch it + pipeline = KPipeline(lang_code='a') # 'a' is often mapped to 'af_heart' or similar US voice + kokoro_initialized = True + print("Kokoro TTS pipeline initialized successfully.") +except Exception as e: + print(f"Warning: Could not initialize Kokoro TTS pipeline: {e}. Will rely on gTTS.") + # pipeline remains None + + +# Ensure ImageMagick binary is set (Adjust path as needed for your system) +# This line requires imagemagick to be installed and the path correct. +# If TextClip fails, check ImageMagick installation and policy.xml (handled by fix_imagemagick_policy). +# Common paths: "/usr/bin/convert", "/usr/local/bin/convert", "C:\\Program Files\\ImageMagick-X.Y.Z-Q16\\convert.exe" +# You might need to adjust this based on your OS and installation +IMAGICK_BINARY_DEFAULT_PATH = "/usr/bin/convert" # Default path, check your system +# Add more common paths to check +common_imagemagick_paths = [ + "/usr/bin/convert", + "/usr/local/bin/convert", + "/opt/homebrew/bin/convert", # Homebrew on macOS ARM + "/usr/local/opt/imagemagick/bin/convert", # Older Homebrew + "C:\\Program Files\\ImageMagick-X.Y.Z-Q16\\convert.exe", # Windows example, adjust version + # Add other paths as needed for your environment +] + +found_imagemagick_binary = None +for path in common_imagemagick_paths: + # Check if path is not None or empty before checking existence + if path and os.path.exists(path): + found_imagemagick_binary = path + break + +if found_imagemagick_binary: + print(f"Found ImageMagick binary at: {found_imagemagick_binary}") + mpy_config.change_settings({"IMAGEMAGICK_BINARY": found_imagemagick_binary}) +else: + print("Warning: ImageMagick binary 'convert' not found in common locations.") + print("TextClip may fail. Please install ImageMagick or update the IMAGICK_BINARY setting if it's installed elsewhere.") + # Still try to set a default path, though it might be wrong + mpy_config.change_settings({"IMAGEMAGICK_BINARY": IMAGICK_BINARY_DEFAULT_PATH}) + # ---------------- Global Configuration ---------------- # +# Using the user's provided API keys PEXELS_API_KEY = 'BhJqbcdm9Vi90KqzXKAhnEHGsuFNv4irXuOjWtT761U49lRzo03qBGna' OPENROUTER_API_KEY = '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' -OPENROUTER_MODEL = "mistralai/mistral-small-3.1-24b-instruct:free" +OPENROUTER_MODEL = "mistralai/mistral-small-3.1-24b-instruct:free" # Or another preferred model OUTPUT_VIDEO_FILENAME = "final_video.mp4" USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" -# Global variables to be set per run -TARGET_RESOLUTION = None +# Maximum number of script segments to display for editing +MAX_SEGMENTS_FOR_EDITING = 15 # Limit for UI manageability + +# Global placeholder for the temporary folder, will be created per run TEMP_FOLDER = None # ---------------- Helper Functions ---------------- # -def generate_script(user_input): +# Most helper functions remain the same, but some might need minor adjustments +# to use passed parameters instead of global state (e.g., resolution). +# We'll update create_clip and add_background_music accordingly. + +def generate_script(user_input, api_key, model): + """Generate documentary script with proper OpenRouter handling.""" + if not api_key or api_key.startswith('YOUR_OPENROUTER_API_KEY'): + print("OpenRouter API key not set or is placeholder. Skipping script generation.") + return "[Error] API key not configured." + headers = { - 'Authorization': f'Bearer {OPENROUTER_API_KEY}', - 'HTTP-Referer': 'https://your-domain.com', + 'Authorization': f'Bearer {api_key}', + 'HTTP-Referer': 'https://your-domain.com', # Replace with your app's domain 'X-Title': 'AI Documentary Maker' } - prompt = f"""Short Documentary Script GeneratorInstructions: -If I say "use this," just output the script exactly as I gave it. + + # Updated prompt based on user's specific requirements for tone and style + prompt = f"""Short Documentary Script Generator + +Instructions: +If I say "use this," output the script exactly as given. If I only give topics, generate a script based on them. -If I provide a full script, rewrite it without any changes. Make everything short simple and humarous funny and act as serious but humarous. And don't say anything off topic. Also alway say a funny statement to subscribe based on the video topic at the end. Use normal conversational text like a normal person talking and avoid AI phase make the statements humanize and normal conversational -And always talk negative views don't really wants any positive statement and the full script will be one video about one topic so keep in mind +If I provide a full script, rewrite it without any changes. +Make everything short, simple, humorous, funny, and act serious but be humorous. +Don't say anything off topic. +Always end with a funny call to subscribe based on the video topic. +Use normal conversational text, avoid AI phrases, humanize the statements. +Always focus on negative or cynical views, avoid positive statements. +The full script should be for a single video about one topic. + Formatting Rules: -Title in Square Brackets: -Each section starts with a one-word title inside [ ] (max two words if necessary). -This title will be used as a search term for Pexels footage. -Casual & Funny Narration: -Each section has 5-10 words of narration. -Keep it natural, funny, and unpredictable (not robotic, poetic, or rhythmic). -No Special Formatting: -No bold, italics, or special characters. You are a assistant AI your task is to create script. You aren't a chatbot. So, don't write extra text -Generalized Search Terms: -If a term is too specific, make it more general for Pexels search. -Scene-Specific Writing: -Each section describes only what should be shown in the video. -Output Only the Script, and also make it funny and humarous and helirous and also add to subscribe with a funny statement like subscribe now or ..... +Title in Square Brackets: Each section starts with a one-word title inside [ ] (max two words if necessary). This title will be used as a search term for Pexels footage. +Casual & Funny Narration: Each section has 5-15 words of narration. Keep it natural, funny, and unpredictable. +No Special Formatting: No bold, italics, or special characters. +Generalized Search Terms: If a term is too specific, make it more general for Pexels search. +Scene-Specific Writing: Each section describes only what should be shown in the video. +Output Only the Script, make it funny/humorous/hilarious, and add a funny subscribe statement. No extra text, just the script. + Example Output: -[North Korea] -Top 5 unknown facts about North Korea. -[Invisibility] -North Korea’s internet speed is so fast… it doesn’t exist. -[Leadership] -Kim Jong-un once won an election with 100% votes… against himself. -[Magic] -North Korea discovered time travel. That’s why their news is always from the past. +[Cats] +They plot world domination while napping. +[Dogs] +Loyalty is just a bribe for snacks. +[Humans] +The only species that pays to live on a planet they destroy. +[Future] +It looks suspiciously like the present, but with more screens. [Warning] -Subscribe now, or Kim Jong-un will send you a free one-way ticket… to North Korea. -[Freedom] -North Korean citizens can do anything… as long as it's government-approved. -Now here is the Topic/scrip: {user_input} +Subscribe or a cat will steal your bandwidth. + +Now here is the Topic/script: {user_input} """ + + data = { - 'model': OPENROUTER_MODEL, + 'model': model, 'messages': [{'role': 'user', 'content': prompt}], - 'temperature': 0.4, - 'max_tokens': 5000 + 'temperature': 0.7, # Increased temperature slightly for more unpredictable humor + 'max_tokens': 500 # Limit token response to keep scripts short } + try: response = requests.post( 'https://openrouter.ai/api/v1/chat/completions', headers=headers, json=data, - timeout=30 + timeout=45 # Increased timeout ) - if response.status_code == 200: - response_data = response.json() - if 'choices' in response_data and len(response_data['choices']) > 0: - return response_data['choices'][0]['message']['content'] - else: - print("Unexpected response format:", response_data) - return None + + response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) + + response_data = response.json() + if 'choices' in response_data and len(response_data['choices']) > 0: + script_text = response_data['choices'][0]['message']['content'] + # Basic post-processing to remove potential markdown code blocks + if script_text.startswith("```") and script_text.endswith("```"): + # Find the first and last ``` lines + first_code_block = script_text.find("```") + last_code_block = script_text.rfind("```") + if first_code_block != -1 and last_code_block != -1 and first_code_block < last_code_block: + # Extract content between the markers, removing the language specifier line if present + content_start = script_text.find('\n', first_code_block) + 1 + content_end = last_code_block + script_text = script_text[content_start:content_end].strip() + else: # Simple case, remove from start and end + script_text = script_text.strip("` \n") + + return script_text else: - print(f"API Error {response.status_code}: {response.text}") - return None + print("Unexpected response format:", response_data) + return "[Error] Unexpected API response format." + + except requests.exceptions.RequestException as e: + print(f"API Request failed: {str(e)}") + return f"[Error] API request failed: {str(e)}" except Exception as e: - print(f"Request failed: {str(e)}") - return None + print(f"An unexpected error occurred during script generation: {e}") + return f"[Error] An unexpected error occurred: {str(e)}" + def parse_script(script_text): - sections = {} + """ + Parse the generated script into a list of segment dictionaries. + Each dictionary includes original prompt, narration text, estimated duration, and placeholder for uploaded media. + Handles potential API errors returned as strings. + """ + if script_text.startswith("[Error]"): + print(f"Skipping parse due to script generation error: {script_text}") + return [] + + segments = [] current_title = None current_text = "" + try: - for line in script_text.splitlines(): + lines = script_text.strip().splitlines() + if not lines: + print("Script text is empty.") + return [] + + for line in lines: line = line.strip() if line.startswith("[") and "]" in line: bracket_start = line.find("[") - bracket_end = line.find("]", bracket_start) + bracket_end = line.find("]", bracket_start) # Corrected line here if bracket_start != -1 and bracket_end != -1: - if current_title is not None: - sections[current_title] = current_text.strip() - current_title = line[bracket_start+1:bracket_end] + # Add previous segment if title and text are found + if current_title is not None and current_text.strip(): + # Estimate duration based on word count (adjust factor as needed) + duration = max(2.0, len(current_text.split()) * 0.4) # Minimum 2s, approx 0.4s per word + segments.append({ + "original_prompt": current_title.strip(), + "text": current_text.strip(), + "duration": duration, + "uploaded_media": None # Placeholder for user uploaded file path + }) + current_title = line[bracket_start+1:bracket_end].strip() current_text = line[bracket_end+1:].strip() - elif current_title: + elif current_title: # Append text if no new title found but currently parsing a segment + current_text += line + " " + elif current_title: # Append text to the current segment current_text += line + " " - if current_title: - sections[current_title] = current_text.strip() - elements = [] - for title, narration in sections.items(): - if not title or not narration: - continue - media_element = {"type": "media", "prompt": title, "effects": "fade-in"} - words = narration.split() - duration = max(3, len(words) * 0.5) - tts_element = {"type": "tts", "text": narration, "voice": "en", "duration": duration} - elements.append(media_element) - elements.append(tts_element) - return elements + # Ignore lines before the first [Title] + + # Add the last segment + if current_title is not None and current_text.strip(): + duration = max(2.0, len(current_text.split()) * 0.4) + segments.append({ + "original_prompt": current_title.strip(), + "text": current_text.strip(), + "duration": duration, + "uploaded_media": None + }) + + # Limit segments to MAX_SEGMENTS_FOR_EDITING + if len(segments) > MAX_SEGMENTS_FOR_EDITING: + print(f"Warning: Script generated {len(segments)} segments, limiting to {MAX_SEGMENTS_FOR_EDITING} for editing.") + segments = segments[:MAX_SEGMENTS_FOR_EDITING] + + print(f"Parsed {len(segments)} segments.") + return segments except Exception as e: print(f"Error parsing script: {e}") return [] -def search_pexels_videos(query, pexels_api_key): - headers = {'Authorization': pexels_api_key} +# Pexels and Google Image search and download functions remain unchanged +# Using the global PEXELS_API_KEY directly now. +def search_pexels_videos(query): + """Search for a video on Pexels by query and return a random HD video.""" + if not PEXELS_API_KEY or PEXELS_API_KEY.startswith('YOUR_PEXELS_API_KEY'): + print("Pexels API key not set or is placeholder. Skipping video search.") + return None + headers = {'Authorization': PEXELS_API_KEY} base_url = "https://api.pexels.com/videos/search" num_pages = 3 videos_per_page = 15 - max_retries = 3 + max_retries = 2 # Reduced retries for faster failure retry_delay = 1 + search_query = query all_videos = [] + for page in range(1, num_pages + 1): for attempt in range(max_retries): try: - params = {"query": search_query, "per_page": videos_per_page, "page": page} + params = {"query": search_query, "per_page": videos_per_page, "page": page, "orientation": "landscape"} # Added orientation response = requests.get(base_url, headers=headers, params=params, timeout=10) + if response.status_code == 200: data = response.json() videos = data.get("videos", []) - if not videos: - print(f"No videos found on page {page}.") - break + + # Filter for HD videos first, then fallback to other qualities + hd_videos_on_page = [] + other_videos_on_page = [] for video in videos: video_files = video.get("video_files", []) - for file in video_files: - if file.get("quality") == "hd": - all_videos.append(file.get("link")) - break - break + # Sort video files by quality preference if possible + video_files_sorted = sorted(video_files, key=lambda x: {'hd': 0, 'sd': 1}.get(x.get('quality'), 2)) + + for file in video_files_sorted: + link = file.get("link") + quality = file.get("quality") + if link: + if quality == "hd": + hd_videos_on_page.append(link) + break # Found the best quality for this video entry + else: + other_videos_on_page.append(link) + # Don't break, keep looking for HD for this video entry + + all_videos.extend(hd_videos_on_page) # Add HD videos found + if not hd_videos_on_page: # If no HD found on this page, add other videos found on this page + all_videos.extend(other_videos_on_page) + + if not videos: + print(f"No videos found on page {page} for query '{query}'.") + break # No videos on this page or subsequent ones + + + break # Success for this page attempt + elif response.status_code == 429: - print(f"Rate limit hit (attempt {attempt+1}/{max_retries}). Retrying in {retry_delay} seconds...") + print(f"Pexels rate limit hit (attempt {attempt+1}/{max_retries}). Retrying in {retry_delay}s for query '{query}'...") time.sleep(retry_delay) retry_delay *= 2 else: - print(f"Error fetching videos: {response.status_code} {response.text}") - if attempt < max_retries - 1: - print(f"Retrying in {retry_delay} seconds...") - time.sleep(retry_delay) - retry_delay *= 2 - else: - break + print(f"Pexels video search error {response.status_code}: {response.text} for query '{query}'") + break # Non-recoverable error or too many retries + except requests.exceptions.RequestException as e: - print(f"Request exception: {e}") + print(f"Pexels video request exception (attempt {attempt+1}/{max_retries}) for query '{query}': {e}") if attempt < max_retries - 1: - print(f"Retrying in {retry_delay} seconds...") time.sleep(retry_delay) retry_delay *= 2 else: - break + break # Too many retries + + # Stop searching if no videos were found on the last page check + if not videos and page > 1: + print(f"Stopping Pexels video search for '{query}' as no videos were found on page {page}.") + break + + if all_videos: - random_video = random.choice(all_videos) - print(f"Selected random video from {len(all_videos)} HD videos") + # Prioritize picking an HD video if any were collected + hd_options = [link for link in all_videos if 'hd' in link.lower()] # Simple check, might not be perfect + if hd_options: + random_video = random.choice(hd_options) + print(f"Selected random HD video from {len(hd_options)} options for query '{query}'.") + else: + # If no HD options, pick from the entire list (which includes SD and potentially others) + random_video = random.choice(all_videos) + print(f"Selected random video (likely SD or other quality) from {len(all_videos)} options for query '{query}' (no HD found).") return random_video else: - print("No suitable videos found after searching all pages.") + print(f"No suitable videos found after searching all pages for query '{query}'.") return None -def search_pexels_images(query, pexels_api_key): - headers = {'Authorization': pexels_api_key} + +def search_pexels_images(query): + """Search for an image on Pexels by query.""" + if not PEXELS_API_KEY or PEXELS_API_KEY.startswith('YOUR_PEXELS_API_KEY'): + print("Pexels API key not set or is placeholder. Skipping image search.") + return None + headers = {'Authorization': PEXELS_API_KEY} url = "https://api.pexels.com/v1/search" - params = {"query": query, "per_page": 5, "orientation": "landscape"} - max_retries = 3 + params = {"query": query, "per_page": 15, "orientation": "landscape"} # Increased per_page + + max_retries = 2 retry_delay = 1 + for attempt in range(max_retries): try: response = requests.get(url, headers=headers, params=params, timeout=10) + if response.status_code == 200: data = response.json() photos = data.get("photos", []) if photos: - photo = random.choice(photos[:min(5, len(photos))]) + # Choose from the top results + photo = random.choice(photos[:min(10, len(photos))]) img_url = photo.get("src", {}).get("original") + print(f"Found {len(photos)} images on Pexels for query '{query}', selected one.") return img_url else: - print(f"No images found for query: {query}") + print(f"No images found for query: {query} on Pexels.") return None + elif response.status_code == 429: - print(f"Rate limit hit (attempt {attempt+1}/{max_retries}). Retrying in {retry_delay} seconds...") + print(f"Pexels rate limit hit (attempt {attempt+1}/{max_retries}). Retrying in {retry_delay}s for query '{query}'...") time.sleep(retry_delay) retry_delay *= 2 else: - print(f"Error fetching images: {response.status_code} {response.text}") - if attempt < max_retries - 1: - print(f"Retrying in {retry_delay} seconds...") - time.sleep(retry_delay) - retry_delay *= 2 + print(f"Pexels image search error {response.status_code}: {response.text} for query '{query}'") + break # Non-recoverable error or too many retries + except requests.exceptions.RequestException as e: - print(f"Request exception: {e}") + print(f"Pexels image request exception (attempt {attempt+1}/{max_retries}) for query '{query}': {e}") if attempt < max_retries - 1: - print(f"Retrying in {retry_delay} seconds...") time.sleep(retry_delay) retry_delay *= 2 - print(f"No Pexels images found for query: {query} after all attempts") + else: + break # Too many retries + + print(f"No Pexels images found for query: {query} after all attempts.") return None def search_google_images(query): + """Search for images on Google Images (fallback/news)""" try: + # Using a simple text search method; dedicated Google Image Search APIs are better but may require setup. + # This is prone to breaking if Google changes its HTML structure. search_url = f"https://www.google.com/search?q={quote(query)}&tbm=isch" headers = {"User-Agent": USER_AGENT} - response = requests.get(search_url, headers=headers, timeout=10) + print(f"Searching Google Images for: {query}") + response = requests.get(search_url, headers=headers, timeout=15) + response.raise_for_status() soup = BeautifulSoup(response.text, "html.parser") + + # Find img tags, look for src attributes + # This is a very fragile parsing method, might need adjustment img_tags = soup.find_all("img") image_urls = [] + # Look for src attributes that start with http and aren't data URIs or specific gstatic patterns + # This is a heuristic and might grab incorrect URLs for img in img_tags: src = img.get("src", "") - if src.startswith("http") and "gstatic" not in src: - image_urls.append(src) - if image_urls: - return random.choice(image_urls[:5]) if len(image_urls) >= 5 else image_urls[0] + if src.startswith("http") and "encrypted" not in src and "base64" not in src: # Basic filtering + image_urls.append(src) + elif img.get("data-src", "").startswith("http"): # Some sites use data-src + image_urls.append(img.get("data-src", "")) + + + # Filter out potential tiny icons or invalid URLs + valid_image_urls = [url for url in image_urls if url and "gstatic" not in url and url.split('.')[-1].lower() in ['jpg', 'jpeg', 'png', 'gif', 'bmp']] + + if valid_image_urls: + print(f"Found {len(valid_image_urls)} potential Google Images for query '{query}', picking one.") + return random.choice(valid_image_urls[:min(10, len(valid_image_urls))]) else: - print(f"No Google Images found for query: {query}") + print(f"No valid Google Images found for query: {query}") return None except Exception as e: - print(f"Error in Google Images search: {e}") + print(f"Error in Google Images search for query '{query}': {e}") return None + def download_image(image_url, filename): + """Download an image from a URL to a local file with enhanced error handling.""" + if not image_url: + print("No image URL provided for download.") + return None + try: headers = {"User-Agent": USER_AGENT} - print(f"Downloading image from: {image_url} to {filename}") - response = requests.get(image_url, headers=headers, stream=True, timeout=15) + # print(f"Attempting to download image from: {image_url}") # Keep less noisy + response = requests.get(image_url, headers=headers, stream=True, timeout=20) # Increased timeout response.raise_for_status() + + # Check content type before saving + content_type = response.headers.get('Content-Type', '') + if not content_type.startswith('image/'): + print(f"URL did not return an image Content-Type ({content_type}). Skipping download.") + return None + + # Ensure the directory exists + os.makedirs(os.path.dirname(filename), exist_ok=True) + with open(filename, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) - print(f"Image downloaded successfully to: {filename}") + + # print(f"Potential image downloaded to: {filename}") # Keep less noisy + + # Validate and process the image try: img = Image.open(filename) - img.verify() - img = Image.open(filename) + img.verify() # Verify it's an image file + img = Image.open(filename) # Re-open after verify if img.mode != 'RGB': + # print("Converting image to RGB") # Keep less noisy img = img.convert('RGB') img.save(filename) - print(f"Image validated and processed: {filename}") + # print(f"Image validated and converted to RGB: {filename}") # Keep less noisy return filename except Exception as e_validate: - print(f"Downloaded file is not a valid image: {e_validate}") + print(f"Downloaded file is not a valid image or processing failed for {filename}: {e_validate}") if os.path.exists(filename): - os.remove(filename) + os.remove(filename) # Clean up invalid file return None + except requests.exceptions.RequestException as e_download: - print(f"Image download error: {e_download}") + print(f"Image download error for {image_url}: {e_download}") if os.path.exists(filename): - os.remove(filename) + os.remove(filename) # Clean up partially downloaded file return None except Exception as e_general: - print(f"General error during image processing: {e_general}") + print(f"General error during image download/processing for {filename}: {e_general}") if os.path.exists(filename): - os.remove(filename) + os.remove(filename) # Clean up if needed return None + def download_video(video_url, filename): + """Download a video from a URL to a local file.""" + if not video_url: + print("No video URL provided for download.") + return None try: - response = requests.get(video_url, stream=True, timeout=30) + headers = {"User-Agent": USER_AGENT} # Some sites block direct downloads + print(f"Attempting to download video from: {video_url}") + response = requests.get(video_url, stream=True, timeout=45) # Increased timeout for videos response.raise_for_status() + + # Check content type + content_type = response.headers.get('Content-Type', '') + if not content_type.startswith('video/'): + print(f"URL did not return a video Content-Type ({content_type}). Skipping download.") + return None + + os.makedirs(os.path.dirname(filename), exist_ok=True) + + # Use smaller chunk size for potentially large files + chunk_size = 4096 + downloaded_size = 0 + total_size = int(response.headers.get('content-length', 0)) + with open(filename, 'wb') as f: - for chunk in response.iter_content(chunk_size=8192): - f.write(chunk) - print(f"Video downloaded successfully to: {filename}") - return filename - except Exception as e: - print(f"Video download error: {e}") + for chunk in response.iter_content(chunk_size=chunk_size): + f.write(chunk) + downloaded_size += len(chunk) + # Optional: Add progress updates if needed, but noisy for console + + print(f"Video downloaded successfully to: {filename} ({downloaded_size} bytes)") + # Basic check if the file seems valid (not just 0 bytes) + if os.path.exists(filename) and os.path.getsize(filename) > 1024: # Check for > 1KB + return filename + else: + print(f"Downloaded video file {filename} is too small or empty ({os.path.getsize(filename)} bytes).") + if os.path.exists(filename): + os.remove(filename) + return None + + except requests.exceptions.RequestException as e: + print(f"Video download error for {video_url}: {e}") if os.path.exists(filename): os.remove(filename) return None + except Exception as e_general: + print(f"General error during video download for {filename}: {e_general}") + if os.path.exists(filename): + os.remove(filename) + return None + -def generate_media(prompt, temp_folder, user_upload=None): +def generate_media_asset(prompt, uploaded_media_path): + """ + Generate a visual asset (video or image). Prioritizes user upload, + then searches Pexels video, then Pexels image, then Google Image. + Returns a dict: {'path': , 'asset_type': 'video' or 'image'}. + Ensures the returned path is within the TEMP_FOLDER. + """ safe_prompt = re.sub(r'[^\w\s-]', '', prompt).strip().replace(' ', '_') - if user_upload: - file_ext = os.path.splitext(user_upload)[1].lower() - if file_ext in ['.mp4', '.mov', '.avi']: - shutil.copy(user_upload, os.path.join(temp_folder, f"{safe_prompt}{file_ext}")) - return {"path": os.path.join(temp_folder, f"{safe_prompt}{file_ext}"), "asset_type": "video"} - elif file_ext in ['.jpg', '.jpeg', '.png']: - shutil.copy(user_upload, os.path.join(temp_folder, f"{safe_prompt}.jpg")) - return {"path": os.path.join(temp_folder, f"{safe_prompt}.jpg"), "asset_type": "image"} - if "news" in prompt.lower(): - print(f"News-related query detected: {prompt}. Using Google Images...") - image_file = os.path.join(temp_folder, f"{safe_prompt}_news.jpg") - image_url = search_google_images(prompt) - if image_url: - downloaded_image = download_image(image_url, image_file) - if downloaded_image: - print(f"News image saved to {downloaded_image}") - return {"path": downloaded_image, "asset_type": "image"} - if random.random() < 0.25: - video_file = os.path.join(temp_folder, f"{safe_prompt}_video.mp4") - video_url = search_pexels_videos(prompt, PEXELS_API_KEY) + if not TEMP_FOLDER: + print("Error: TEMP_FOLDER not set for generate_media_asset.") + return None + + os.makedirs(TEMP_FOLDER, exist_ok=True) # Ensure temp folder exists + + # 1. Use user uploaded media if provided + if uploaded_media_path and os.path.exists(uploaded_media_path): + print(f"Using user uploaded media: {uploaded_media_path}") + file_ext = os.path.splitext(uploaded_media_path)[1].lower() + asset_type = 'video' if file_ext in ['.mp4', '.mov', '.avi', '.webm', '.mkv'] else 'image' + # Copy the user file to temp folder to manage cleanup + temp_user_path = os.path.join(TEMP_FOLDER, f"user_upload_{os.path.basename(uploaded_media_path)}") + try: + # Use copy2 to preserve metadata like modification time + shutil.copy2(uploaded_media_path, temp_user_path) + print(f"Copied user upload to temp: {temp_user_path}") + return {"path": temp_user_path, "asset_type": asset_type} + # Handle case where source and destination might be the same (e.g., user uploads from temp) + except shutil.SameFileError: + print(f"User upload is already in temp folder: {uploaded_media_path}") + return {"path": uploaded_media_path, "asset_type": asset_type} + except Exception as e: + print(f"Error copying user file {uploaded_media_path}: {e}. Falling back to search.") + + + # 2. Search Pexels Videos (Increased chance) + # Let's slightly increase video search preference when available + if random.random() < 0.4: # Increase video search chance + video_file = os.path.join(TEMP_FOLDER, f"{safe_prompt}_video.mp4") + print(f"Attempting Pexels video search for: '{prompt}'") + video_url = search_pexels_videos(prompt) # Use global API key if video_url: downloaded_video = download_video(video_url, video_file) if downloaded_video: - print(f"Video asset saved to {downloaded_video}") + print(f"Pexels video asset saved to {downloaded_video}") return {"path": downloaded_video, "asset_type": "video"} - image_file = os.path.join(temp_folder, f"{safe_prompt}.jpg") - image_url = search_pexels_images(prompt, PEXELS_API_KEY) + else: + print(f"Pexels video search failed or found no video for: '{prompt}'") + + # 3. Search Pexels Images + image_file = os.path.join(TEMP_FOLDER, f"{safe_prompt}.jpg") + print(f"Attempting Pexels image search for: '{prompt}'") + image_url = search_pexels_images(prompt) # Use global API key if image_url: downloaded_image = download_image(image_url, image_file) if downloaded_image: - print(f"Image asset saved to {downloaded_image}") + print(f"Pexels image asset saved to {downloaded_image}") return {"path": downloaded_image, "asset_type": "image"} - fallback_terms = ["nature", "people", "landscape", "technology", "business"] + else: + print(f"Pexels image search failed or found no image for: '{prompt}'") + + # 4. Fallback: Search Google Images (especially useful for news/specific things Pexels might not have) + print(f"Attempting Google Images fallback for: '{prompt}'") + google_image_file = os.path.join(TEMP_FOLDER, f"{safe_prompt}_google.jpg") + google_image_url = search_google_images(prompt) + if google_image_url: + downloaded_google_image = download_image(google_image_url, google_image_file) + if downloaded_google_image: + print(f"Google Image asset saved to {downloaded_google_image}") + return {"path": downloaded_google_image, "asset_type": "image"} + else: + print(f"Google Images fallback failed for: '{prompt}'") + + + # 5. Final Fallback: Generic Images if specific search failed + fallback_terms = ["nature", "city", "abstract", "background"] # More generic fallbacks for term in fallback_terms: - print(f"Trying fallback image search with term: {term}") - fallback_file = os.path.join(temp_folder, f"fallback_{term}.jpg") - fallback_url = search_pexels_images(term, PEXELS_API_KEY) + print(f"Trying generic fallback image search with term: '{term}'") + fallback_file = os.path.join(TEMP_FOLDER, f"fallback_{term}.jpg") + fallback_url = search_pexels_images(term) # Use Pexels for fallbacks, global API key if fallback_url: downloaded_fallback = download_image(fallback_url, fallback_file) if downloaded_fallback: - print(f"Fallback image saved to {downloaded_fallback}") + print(f"Generic fallback image saved to {downloaded_fallback}") return {"path": downloaded_fallback, "asset_type": "image"} - print(f"Failed to generate visual asset for prompt: {prompt}") + else: + print(f"Generic fallback image download failed for term: '{term}'") + else: + print(f"Generic fallback image search failed for term: '{term}'") + + + print(f"Failed to generate any visual asset for prompt: '{prompt}' after all attempts.") return None -def generate_tts(text, voice, temp_folder): - safe_text = re.sub(r'[^\w\s-]', '', text[:10]).strip().replace(' ', '_') - file_path = os.path.join(temp_folder, f"tts_{safe_text}.wav") +def generate_silent_audio(duration, sample_rate=24000): + """Generate a silent WAV audio file lasting 'duration' seconds.""" + print(f"Generating {duration:.2f}s of silent audio.") + num_samples = int(duration * sample_rate) + silence = np.zeros(num_samples, dtype=np.float32) + # Use unique filename to avoid conflicts + # Ensure TEMP_FOLDER exists before generating path + if not TEMP_FOLDER: + print("Error: TEMP_FOLDER not set for generate_silent_audio.") + return None + os.makedirs(TEMP_FOLDER, exist_ok=True) + + silent_path = os.path.join(TEMP_FOLDER, f"silent_{abs(hash(duration)) % (10**8)}_{int(time.time())}.wav") + try: + sf.write(silent_path, silence, sample_rate) + print(f"Silent audio generated: {silent_path}") + return silent_path + except Exception as e: + print(f"Error generating silent audio to {silent_path}: {e}") + return None + + +def generate_tts(text, voice='en'): + """ + Generate TTS audio using Kokoro, falling back to gTTS or silent audio if needed. + Ensures temp folder exists. + """ + if not text or not text.strip(): + print("TTS text is empty. Generating silent audio.") + return generate_silent_audio(duration=2.0) # Default silence for empty text + + if not TEMP_FOLDER: + print("Error: TEMP_FOLDER not set for generate_tts.") + return generate_silent_audio(duration=max(2.0, len(text.split()) * 0.4)) + + os.makedirs(TEMP_FOLDER, exist_ok=True) # Ensure temp folder exists + safe_text_hash = str(abs(hash(text)) % (10**10)) # Use a hash for potentially long text + file_path = os.path.join(TEMP_FOLDER, f"tts_{safe_text_hash}.wav") + if os.path.exists(file_path): - print(f"Using cached TTS for text '{text[:10]}...'") + # print(f"Using cached TTS for text hash '{safe_text_hash}'") # Keep less noisy return file_path + + # Estimate duration based on word count (adjust factor as needed), used if TTS fails + target_duration_fallback = max(2.0, len(text.split()) * 0.4) + + # Use the global kokoro_initialized flag + if kokoro_initialized and pipeline: + try: + print(f"Attempting Kokoro TTS for text: '{text[:50]}...'") + kokoro_voice = 'af_heart' if voice == 'en' else voice # Kokoro default American English voice + # Kokoro pipeline might return multiple segments for long text + generator = pipeline(text, voice=kokoro_voice, speed=1.0, split_pattern=r'\n+') # Use speed 1.0 + audio_segments = [] + total_kokoro_duration = 0 # Track actual generated audio duration + + # Some text might result in many small segments, let's limit total time spent on Kokoro + max_kokoro_total_time = 60 # seconds + + start_time = time.time() # Start time for total timeout check + + for i, (gs, ps, audio) in enumerate(generator): + if time.time() - start_time > max_kokoro_total_time: + print(f"Kokoro TTS total time exceeded {max_kokoro_total_time}s.") + break # Exit loop on total timeout + + audio_segments.append(audio) + segment_duration = len(audio) / 24000.0 # Assuming 24000 Hz sample rate + total_kokoro_duration += segment_duration + + if audio_segments: + full_audio = np.concatenate(audio_segments) if len(audio_segments) > 1 else audio_segments[0] + # Calculate actual duration + total_kokoro_duration = len(full_audio) / 24000.0 # Assuming 24000 Hz sample rate + sf.write(file_path, full_audio, 24000) # Use 24000Hz standard + # print(f"TTS audio saved to {file_path} (Kokoro, {total_kokoro_duration:.2f}s)") # Keep less noisy + return file_path + else: + print("Kokoro pipeline returned no audio segments.") + + except Exception as e: + print(f"Error with Kokoro TTS: {e}") + # Continue to gTTS fallback + try: - kokoro_voice = 'af_heart' if voice == 'en' else voice - generator = pipeline(text, voice=kokoro_voice, speed=0.9, split_pattern=r'\n+') - audio_segments = [] - for i, (gs, ps, audio) in enumerate(generator): - audio_segments.append(audio) - full_audio = np.concatenate(audio_segments) if len(audio_segments) > 1 else audio_segments[0] - sf.write(file_path, full_audio, 24000) - print(f"TTS audio saved to {file_path} (Kokoro)") + print(f"Falling back to gTTS for text: '{text[:50]}...'") + tts = gTTS(text=text, lang='en', slow=False) # Use standard speed + mp3_path = os.path.join(TEMP_FOLDER, f"tts_{safe_text_hash}.mp3") + tts.save(mp3_path) + audio = AudioSegment.from_mp3(mp3_path) + audio.export(file_path, format="wav") + if os.path.exists(mp3_path): + os.remove(mp3_path) # Clean up intermediate mp3 + # print(f"Fallback TTS saved to {file_path} (gTTS, {audio.duration_seconds:.2f}s)") # Keep less noisy return file_path - except Exception as e: - print(f"Error with Kokoro TTS: {e}") - try: - print("Falling back to gTTS...") - tts = gTTS(text=text, lang='en') - mp3_path = os.path.join(temp_folder, f"tts_{safe_text}.mp3") - tts.save(mp3_path) - audio = AudioSegment.from_mp3(mp3_path) - audio.export(file_path, format="wav") - os.remove(mp3_path) - print(f"Fallback TTS saved to {file_path} (gTTS)") - return file_path - except Exception as fallback_error: - print(f"Both TTS methods failed: {fallback_error}") - return generate_silent_audio(max(3, len(text.split()) * 0.5), temp_folder) - -def generate_silent_audio(duration, temp_folder, sample_rate=24000): - num_samples = int(duration * sample_rate) - silence = np.zeros(num_samples, dtype=np.float32) - silent_path = os.path.join(temp_folder, f"silent_{int(time.time())}.wav") - sf.write(silent_path, silence, sample_rate) - print(f"Silent audio generated: {silent_path}") - return silent_path + except Exception as fallback_error: + print(f"Both TTS methods failed for text: '{text[:50]}...'. Error: {fallback_error}") + # Use the estimated duration for silent audio + print(f"Generating silent audio of estimated duration {target_duration_fallback:.2f}s.") + return generate_silent_audio(duration=target_duration_fallback) def apply_kenburns_effect(clip, target_resolution, effect_type=None): + """Apply a smooth Ken Burns effect with a single movement pattern.""" target_w, target_h = target_resolution clip_aspect = clip.w / clip.h target_aspect = target_w / target_h + + # Resize clip to fill target resolution while maintaining aspect ratio, then scale up + # This ensures the image covers the whole frame even after scaling and panning if clip_aspect > target_aspect: - new_height = target_h - new_width = int(new_height * clip_aspect) + # Wider than target: match height, scale width + clip = clip.resize(height=target_h) else: - new_width = target_w - new_height = int(new_width / clip_aspect) - clip = clip.resize(newsize=(new_width, new_height)) - base_scale = 1.15 - new_width = int(new_width * base_scale) - new_height = int(new_height * base_scale) + # Taller than target: match width, scale height + clip = clip.resize(width=target_w) + + # Now scale the resized clip up for the Ken Burns movement margin + initial_w, initial_h = clip.size + scale_factor = 1.15 # Scale up by 15% + new_width = int(initial_w * scale_factor) + new_height = int(initial_h * scale_factor) clip = clip.resize(newsize=(new_width, new_height)) + max_offset_x = new_width - target_w max_offset_y = new_height - target_h - available_effects = ["zoom-in", "zoom-out", "pan-left", "pan-right", "up-left"] + + available_effects = ["zoom-in", "zoom-out", "pan-left", "pan-right", "pan-up", "pan-down", "up-left", "down-right"] if effect_type is None or effect_type == "random": effect_type = random.choice(available_effects) + + # Define start and end positions of the top-left corner of the target_resolution window + start_x, start_y = 0, 0 + end_x, end_y = 0, 0 + start_zoom_relative = 1.0 # Relative to target_resolution size + end_zoom_relative = 1.0 + + # Set start/end positions and zoom based on effect type. + # Positions are top-left corner of the target frame within the scaled image coordinates (new_width, new_height). if effect_type == "zoom-in": - start_zoom = 0.9 - end_zoom = 1.1 - start_center = (new_width / 2, new_height / 2) - end_center = start_center + start_zoom_relative = 1.0 # Start covering target_resolution size + end_zoom_relative = scale_factor # End covering target_resolution / scale_factor size (zoomed in) + # Stay centered in the *scaled* image + start_x = max_offset_x / 2 + start_y = max_offset_y / 2 + end_x = max_offset_x / 2 + end_y = max_offset_y / 2 + elif effect_type == "zoom-out": - start_zoom = 1.1 - end_zoom = 0.9 - start_center = (new_width / 2, new_height / 2) - end_center = start_center + start_zoom_relative = scale_factor # Start zoomed in + end_zoom_relative = 1.0 # End at target_resolution size + # Stay centered in the *scaled* image + start_x = max_offset_x / 2 + start_y = max_offset_y / 2 + end_x = max_offset_x / 2 + end_y = max_offset_y / 2 + + # For pan effects, the crop size is constant (target_resolution, which corresponds to zoom_relative=1.0) elif effect_type == "pan-left": - start_zoom = 1.0 - end_zoom = 1.0 - start_center = (max_offset_x + target_w / 2, (max_offset_y // 2) + target_h / 2) - end_center = (target_w / 2, (max_offset_y // 2) + target_h / 2) + start_x = max_offset_x + start_y = max_offset_y / 2 + end_x = 0 + end_y = max_offset_y / 2 elif effect_type == "pan-right": - start_zoom = 1.0 - end_zoom = 1.0 - start_center = (target_w / 2, (max_offset_y // 2) + target_h / 2) - end_center = (max_offset_x + target_w / 2, (max_offset_y // 2) + target_h / 2) + start_x = 0 + start_y = max_offset_y / 2 + end_x = max_offset_x + end_y = max_offset_y / 2 + elif effect_type == "pan-up": + start_x = max_offset_x / 2 + start_y = max_offset_y + end_x = max_offset_x / 2 + end_y = 0 + elif effect_type == "pan-down": + start_x = max_offset_x / 2 + start_y = 0 + end_x = max_offset_x / 2 + end_y = max_offset_y elif effect_type == "up-left": - start_zoom = 1.0 - end_zoom = 1.0 - start_center = (max_offset_x + target_w / 2, max_offset_y + target_h / 2) - end_center = (target_w / 2, target_h / 2) + start_x = max_offset_x + start_y = max_offset_y + end_x = 0 + end_y = 0 + elif effect_type == "down-right": + start_x = 0 + start_y = 0 + end_x = max_offset_x + end_y = max_offset_y else: - raise ValueError(f"Unsupported effect_type: {effect_type}") + # Default to pan-right if type is random but somehow invalid (shouldn't happen with random.choice) + effect_type = 'pan-right' + start_x = 0 + start_y = max_offset_y / 2 + end_x = max_offset_x + end_y = max_offset_y / 2 + print(f"Warning: Unexpected effect type '{effect_type}'. Defaulting to 'pan-right'.") + + def transform_frame(get_frame, t): frame = get_frame(t) - ratio = t / clip.duration if clip.duration > 0 else 0 - ratio = 0.5 - 0.5 * math.cos(math.pi * ratio) - current_zoom = start_zoom + (end_zoom - start_zoom) * ratio - crop_w = int(target_w / current_zoom) - crop_h = int(target_h / current_zoom) - current_center_x = start_center[0] + (end_center[0] - start_center[0]) * ratio - current_center_y = start_center[1] + (end_center[1] - start_center[1]) * ratio - min_center_x = crop_w / 2 - max_center_x = new_width - crop_w / 2 - min_center_y = crop_h / 2 - max_center_y = new_height - crop_h / 2 - current_center_x = max(min_center_x, min(current_center_x, max_center_x)) - current_center_y = max(min_center_y, min(current_center_y, max_center_y)) - cropped_frame = cv2.getRectSubPix(frame, (crop_w, crop_h), (current_center_x, current_center_y)) - resized_frame = cv2.resize(cropped_frame, (target_w, target_h), interpolation=cv2.INTER_LANCZOS4) - return resized_frame + # Use a smooth ease-in/ease-out function + progress = t / clip.duration if clip.duration > 0 else 0 + eased_progress = 0.5 - 0.5 * math.cos(math.pi * progress) # Cosine easing + + # Interpolate zoom relative to target_resolution + current_zoom_relative = start_zoom_relative + (end_zoom_relative - start_zoom_relative) * eased_progress + + # Calculate crop size based on current zoom relative to target resolution + # If zoom_relative is 1, crop size is target_resolution. If zoom_relative is scale_factor, crop size is target_resolution/scale_factor + crop_w = int(target_w / current_zoom_relative) + crop_h = int(target_h / current_zoom_relative) + + # Interpolate position (top-left corner of the target frame within the scaled image) + current_x = start_x + (end_x - start_x) * eased_progress + current_y = start_y + (end_y - start_y) * eased_progress + + # Calculate the center point for cv2.getRectSubPix + center_x = current_x + crop_w / 2 + center_y = current_y + crop_h / 2 + + # Ensure center stays within the bounds of the scaled image (new_width, new_height) + center_x = max(crop_w / 2.0, min(center_x, new_width - crop_w / 2.0)) # Use float division + center_y = max(crop_h / 2.0, min(center_y, new_height - crop_h / 2.0)) + + + try: + # Perform the crop using cv2.getRectSubPix (expects floating point center) + # Ensure frame is a numpy array (moviepy returns numpy arrays) + # Clamp center coordinates just in case, although max/min should handle it + center_x = np.clip(center_x, 0, new_width) + center_y = np.clip(center_y, 0, new_height) + + # Ensure crop dimensions are positive integers + crop_w = max(1, crop_w) + crop_h = max(1, crop_h) + + # Handle cases where crop dimensions might exceed frame dimensions (shouldn't happen with correct logic) + crop_w = min(crop_w, frame.shape[1]) + crop_h = min(crop_h, frame.shape[0]) + + # Ensure crop size is not zero or negative + if crop_w <= 0 or crop_h <= 0: + print(f"Warning: Calculated crop size is non-positive ({crop_w}, {crop_h}) at t={t:.2f}s. Skipping crop/resize.") + return np.zeros((target_h, target_w, 3), dtype=np.uint8) # Return black frame + + + cropped_frame = cv2.getRectSubPix(frame, (crop_w, crop_h), (center_x, center_y)) + # Resize the cropped frame back to the target resolution + resized_frame = cv2.resize(cropped_frame, (target_w, target_h), interpolation=cv2.INTER_LANCZOS4) + + return resized_frame + except Exception as e: + # Log details helpful for debugging Ken Burns issues + frame_shape_info = frame.shape if frame is not None else 'None' + print(f"Error applying Ken Burns transform at t={t:.2f}s: {e}. Frame shape: {frame_shape_info}, Scaled Size: ({new_width}, {new_height}), Center: ({center_x:.2f}, {center_y:.2f}), Crop Size: ({crop_w}, {crop_h}), Target Size: ({target_w}, {target_h})") + # Return a black frame or placeholder in case of error + return np.zeros((target_h, target_w, 3), dtype=np.uint8) + + + # Apply the transformation function return clip.fl(transform_frame) + def resize_to_fill(clip, target_resolution): + """Resize and crop a clip to fill the target resolution while maintaining aspect ratio.""" target_w, target_h = target_resolution clip_aspect = clip.w / clip.h target_aspect = target_w / target_h - if clip_aspect > target_aspect: + + # print(f"Resizing clip {clip.size} to fill target {target_resolution}") + + if clip_aspect > target_aspect: # Clip is wider than target clip = clip.resize(height=target_h) - crop_amount = (clip.w - target_w) / 2 - clip = clip.crop(x1=crop_amount, x2=clip.w - crop_amount, y1=0, y2=clip.h) - else: + # Calculate crop amount to make width match target_w + crop_amount_x = max(0.0, (clip.w - target_w) / 2.0) # Use float division + # Ensure crop coordinates are integers + x1 = int(crop_amount_x) + x2 = int(clip.w - crop_amount_x) + # Handle potential edge cases with integer rounding + x2 = max(x1 + 1, x2) # Ensure at least 1 pixel width if needed + # Ensure crop region is within bounds + x1 = max(0, x1) + x2 = min(clip.w, x2) + + clip = clip.crop(x1=x1, x2=x2, y1=0, y2=clip.h) + else: # Clip is taller than target or same aspect clip = clip.resize(width=target_w) - crop_amount = (clip.h - target_h) / 2 - clip = clip.crop(x1=0, x2=clip.w, y1=crop_amount, y2=clip.h - crop_amount) + # Calculate crop amount to make height match target_h + crop_amount_y = max(0.0, (clip.h - target_h) / 2.0) # Use float division + # Ensure crop coordinates are integers + y1 = int(crop_amount_y) + y2 = int(clip.h - crop_amount_y) + # Handle potential edge cases with integer rounding + y2 = max(y1 + 1, y2) # Ensure at least 1 pixel height if needed + # Ensure crop region is within bounds + y1 = max(0, y1) + y2 = min(clip.h, y2) + + clip = clip.crop(x1=0, x2=clip.w, y1=y1, y2=y2) + + # Final check and resize if dimensions are slightly off due to rounding + if clip.size != target_resolution: + print(f"Warning: Clip size {clip.size} after resize_to_fill does not match target {target_resolution}. Resizing again.") + clip = clip.resize(newsize=target_resolution) + + + # print(f"Clip resized to {clip.size}") return clip def find_mp3_files(): + """Search for any MP3 files in the current directory and subdirectories.""" mp3_files = [] + # Check relative paths first for root, dirs, files in os.walk('.'): for file in files: - if file.endswith('.mp3'): + if file.lower().endswith('.mp3'): mp3_path = os.path.join(root, file) - mp3_files.append(mp3_path) - print(f"Found MP3 file: {mp3_path}") - return mp3_files[0] if mp3_files else None + # Exclude files that are likely temporary or part of internal libraries + if not any(keyword in mp3_path.lower() for keyword in ['temp', '.gradio', 'site-packages', 'dist-packages', 'venv', 'tmp']): # Added 'tmp' + mp3_files.append(mp3_path) + print(f"Found MP3 file: {mp3_path}") + + if mp3_files: + return mp3_files[0] # Return the first one found that isn't excluded + else: + # print("No user-provided MP3 files found in the current directory or subdirectories.") # Keep less noisy + return None + + +def add_background_music(final_video, bg_music_path, bg_music_volume=0.08): + """Add background music to the final video.""" + if not bg_music_path or not os.path.exists(bg_music_path): + print("No valid background music path provided or file not found. Skipping background music.") + return final_video -def add_background_music(final_video, bg_music_volume=0.08): try: - bg_music_path = find_mp3_files() - if bg_music_path and os.path.exists(bg_music_path): - print(f"Adding background music from: {bg_music_path}") - bg_music = AudioFileClip(bg_music_path) - if bg_music.duration < final_video.duration: - loops_needed = math.ceil(final_video.duration / bg_music.duration) - bg_segments = [bg_music] * loops_needed - bg_music = concatenate_audioclips(bg_segments) - bg_music = bg_music.subclip(0, final_video.duration) - bg_music = bg_music.volumex(bg_music_volume) - video_audio = final_video.audio + print(f"Adding background music from: {bg_music_path} with volume {bg_music_volume}") + bg_music = AudioFileClip(bg_music_path) + + # Loop background music if shorter than video + if bg_music.duration < final_video.duration: + loops_needed = math.ceil(final_video.duration / bg_music.duration) + bg_segments = [bg_music.copy() for _ in range(loops_needed)] # Use copy to avoid issues + bg_music = concatenate_audioclips(bg_segments) + # print(f"Looped background music to {bg_music.duration:.2f}s") # Keep less noisy + + # Subclip background music to match video duration + bg_music = bg_music.subclip(0, final_video.duration) + # print(f"Subclipped background music to {bg_music.duration:.2f}s") # Keep less noisy + + # Adjust volume + bg_music = bg_music.volumex(bg_music_volume) + # print(f"Set background music volume to {bg_music_volume}") # Keep less noisy + + # Composite audio + video_audio = final_video.audio + if video_audio: + # Ensure video audio matches video duration before compositing + if abs(video_audio.duration - final_video.duration) > 0.1: + print(f"Adjusting video audio duration ({video_audio.duration:.2f}s) to match video duration ({final_video.duration:.2f}s) for final mix") + try: + video_audio = video_audio.fx(vfx.speedx, factor=video_audio.duration / final_video.duration) + except Exception as e: + print(f"Error adjusting final video audio speed: {e}. Using original audio.") + pass # Proceed with original audio if speedx fails + mixed_audio = CompositeAudioClip([video_audio, bg_music]) - final_video = final_video.set_audio(mixed_audio) - print("Background music added successfully") + # print("Composited video audio and background music") # Keep less noisy else: - print("No MP3 files found, skipping background music") + # Handle case where video might not have audio track initially + mixed_audio = bg_music + print("Warning: Video had no original audio track, only adding background music.") + + final_video = final_video.set_audio(mixed_audio) + print("Background music added successfully.") return final_video except Exception as e: print(f"Error adding background music: {e}") - print("Continuing without background music") + print("Continuing without background music.") return final_video -def create_clip(media_path, asset_type, tts_path, duration, customizations, target_resolution, temp_folder): + +def create_clip(media_asset, tts_path, estimated_duration, target_resolution, + caption_enabled, caption_color, caption_size, caption_position, + caption_bg_color, caption_stroke_color, caption_stroke_width, + narration_text, segment_index): + """Create a video clip with synchronized subtitles and narration.""" try: - print(f"Creating clip with asset_type: {asset_type}, media_path: {media_path}") - if not os.path.exists(media_path) or not os.path.exists(tts_path): - print("Missing media or TTS file") - return None - audio_clip = AudioFileClip(tts_path).audio_fadeout(0.2) - audio_duration = audio_clip.duration - target_duration = audio_duration + 0.2 - if asset_type == "video": - clip = VideoFileClip(media_path) - clip = resize_to_fill(clip, target_resolution) - if clip.duration < target_duration: - clip = clip.loop(duration=target_duration) + print(f"Creating clip #{segment_index} from asset: {media_asset.get('path')}, type: {media_asset.get('asset_type')}") + media_path = media_asset.get('path') + asset_type = media_asset.get('asset_type') + + # Determine actual audio duration + audio_clip = None + audio_duration = estimated_duration # Default to estimated duration + target_clip_duration = estimated_duration # Default target duration + + if tts_path and os.path.exists(tts_path): + try: + audio_clip = AudioFileClip(tts_path).audio_fadeout(0.2) # Fade out TTS slightly + audio_duration = audio_clip.duration + # Ensure clip duration is slightly longer than audio for transitions/padding + target_clip_duration = audio_duration + 0.3 # Add a small buffer after TTS ends + # Ensure target duration is not excessively long + target_clip_duration = min(target_clip_duration, estimated_duration * 3 + 5) # Prevent very long clips if TTS audio is unexpectedly long + # Also ensure a minimum duration even if TTS is very short + target_clip_duration = max(target_clip_duration, 2.0) # Minimum clip duration 2 seconds + + + print(f"TTS audio duration: {audio_duration:.2f}s. Target clip duration: {target_clip_duration:.2f}s (estimated {estimated_duration:.2f}s)") + except Exception as e: + print(f"Error loading TTS audio clip {tts_path}: {e}. Using estimated duration {estimated_duration:.2f}s for clip.") + audio_clip = None # Ensure audio_clip is None if loading fails + target_clip_duration = estimated_duration # Fallback to estimated duration + target_clip_duration = max(target_clip_duration, 2.0) # Ensure minimum duration + + else: + # If no TTS path, use estimated duration as target, ensure minimum + target_clip_duration = max(estimated_duration, 2.0) + + + # Handle missing or invalid media first + if not media_path or not os.path.exists(media_path): + print(f"Skipping clip {segment_index}: Missing or invalid media file {media_path}") + # Create a black clip with silent audio for the target duration + clip = ColorClip(size=target_resolution, color=(0,0,0), duration=target_clip_duration) + print(f"Created placeholder black clip for segment {segment_index}") + # Add placeholder text if captions are enabled and text exists + if caption_enabled and narration_text and caption_color.lower() != "transparent" and narration_text.strip(): + txt_clip = TextClip( + "[Missing Media]\n" + narration_text, # Indicate missing media + fontsize=caption_size, + font='Arial-Bold', # Ensure this font is available + color=caption_color, + bg_color=caption_bg_color, + method='caption', + align='center', + stroke_width=caption_stroke_width, + stroke_color=caption_stroke_color, + size=(target_resolution[0] * 0.9, None) + ).set_position('center').set_duration(target_clip_duration) # Duration matches black clip + clip = CompositeVideoClip([clip, txt_clip]) + + # Add silent audio to the placeholder clip + silent_audio_path = generate_silent_audio(target_clip_duration) + if silent_audio_path and os.path.exists(silent_audio_path): + try: + silent_audio_clip = AudioFileClip(silent_audio_path) + # Ensure silent audio duration matches video clip duration + if abs(silent_audio_clip.duration - clip.duration) > 0.1: + silent_audio_clip = silent_audio_clip.fx(vfx.speedx, factor=silent_audio_clip.duration / clip.duration) + clip = clip.set_audio(silent_audio_clip) + except Exception as e: + print(f"Error setting silent audio to placeholder clip {segment_index}: {e}") + clip = clip.set_audio(None) # Set audio to None if silent audio fails loading else: - clip = clip.subclip(0, target_duration) - if customizations.get('video_brightness', 1.0) != 1.0: - clip = clip.fx(vfx.colorx, customizations['video_brightness']) - if customizations.get('video_contrast', 1.0) != 1.0: - clip = clip.fx(vfx.contrast, customizations['video_contrast']) - if customizations.get('video_speed', 1.0) != 1.0: - clip = clip.fx(vfx.speedx, customizations['video_speed']) + clip = clip.set_audio(None) # Set audio to None if silent audio generation fails + + return clip # Return the placeholder clip + + # Process media if path is valid + if asset_type == "video": + try: + clip = VideoFileClip(media_path) + print(f"Loaded video clip from {media_path} with duration {clip.duration:.2f}s") + clip = resize_to_fill(clip, target_resolution) + if clip.duration < target_clip_duration: + print("Looping video clip") + # Loop the video to match the target duration + clip = clip.loop(duration=target_clip_duration) + else: + # Subclip the video to match the target duration + clip = clip.subclip(0, target_clip_duration) + clip = clip.fadein(0.2).fadeout(0.2) # Add simple transitions + print(f"Video clip processed to duration {clip.duration:.2f}s") + + except Exception as e: + print(f"Error processing video clip {media_path} for segment {segment_index}: {e}") + # Fallback to a black clip if video processing fails + print(f"Creating placeholder black clip instead for segment {segment_index}") + clip = ColorClip(size=target_resolution, color=(0,0,0), duration=target_clip_duration) + if caption_enabled and narration_text and caption_color.lower() != "transparent" and narration_text.strip(): + txt_clip = TextClip( + "[Video Error]\n" + narration_text, # Indicate video error + fontsize=caption_size, color=caption_color, bg_color=caption_bg_color, method='caption', align='center', + stroke_width=caption_stroke_width, stroke_color=caption_stroke_color, + size=(target_resolution[0] * 0.9, None) + ).set_position('center').set_duration(target_clip_duration) + clip = CompositeVideoClip([clip, txt_clip]) + + elif asset_type == "image": - img = Image.open(media_path) - if img.mode != 'RGB': - with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False, dir=temp_folder) as temp: - img.convert('RGB').save(temp.name) - media_path = temp.name - img.close() - clip = ImageClip(media_path).set_duration(target_duration) - clip = apply_kenburns_effect(clip, target_resolution, customizations.get('kenburns_effect', 'random')) - clip = clip.fadein(customizations.get('fade_in_duration', 0.3)).fadeout(customizations.get('fade_out_duration', 0.3)) - if customizations.get('image_brightness', 1.0) != 1.0: - clip = clip.fx(vfx.colorx, customizations['image_brightness']) - if customizations.get('image_contrast', 1.0) != 1.0: - clip = clip.fx(vfx.contrast, customizations['image_contrast']) + try: + img = Image.open(media_path) + # Ensure image is in RGB format before passing to ImageClip + if img.mode != 'RGB': + print("Converting image to RGB") + img = img.convert('RGB') + # ImageClip accepts numpy arrays + img_array = np.array(img) + img.close() # Close the PIL image + clip = ImageClip(img_array).set_duration(target_clip_duration) + else: + img.close() # Close the PIL image + clip = ImageClip(media_path).set_duration(target_clip_duration) + + # print(f"Loaded image clip from {media_path} with duration {clip.duration:.2f}s") # Keep less noisy + clip = apply_kenburns_effect(clip, target_resolution) # Ken Burns with random effect + clip = clip.fadein(0.3).fadeout(0.3) # Add simple transitions + # print(f"Image clip processed to duration {clip.duration:.2f}s with Ken Burns") # Keep less noisy + + + except Exception as e: + print(f"Error processing image clip {media_path} for segment {segment_index}: {e}") + # Fallback to a black clip if image processing fails + print(f"Creating placeholder black clip instead for segment {segment_index}") + clip = ColorClip(size=target_resolution, color=(0,0,0), duration=target_clip_duration) + if caption_enabled and narration_text and caption_color.lower() != "transparent" and narration_text.strip(): + txt_clip = TextClip( + "[Image Error]\n" + narration_text, # Indicate image error + fontsize=caption_size, color=caption_color, bg_color=caption_bg_color, method='caption', align='center', + stroke_width=caption_stroke_width, stroke_color=caption_stroke_color, + size=(target_resolution[0] * 0.9, None) + ).set_position('center').set_duration(target_clip_duration) + clip = CompositeVideoClip([clip, txt_clip]) + else: - return None - if customizations.get('text_color') != "transparent": + print(f"Unknown asset type '{asset_type}' for segment {segment_index}. Creating placeholder.") + # Create a placeholder black clip + clip = ColorClip(size=target_resolution, color=(0,0,0), duration=target_clip_duration) + if caption_enabled and narration_text and caption_color.lower() != "transparent" and narration_text.strip(): + txt_clip = TextClip( + "[Unknown Media Type Error]\n" + narration_text, # Indicate unknown type error + fontsize=caption_size, color=caption_color, bg_color=caption_bg_color, method='caption', align='center', + stroke_width=caption_stroke_width, stroke_color=caption_stroke_color, + size=(target_resolution[0] * 0.9, None) + ).set_position('center').set_duration(target_clip_duration) + clip = CompositeVideoClip([clip, txt_clip]) + + + # Set the audio for the clip + if audio_clip: + # Ensure audio clip duration matches video clip duration after processing + if abs(audio_clip.duration - clip.duration) > 0.1: # Allow slight difference (e.g., 100ms) + print(f"Adjusting audio duration ({audio_clip.duration:.2f}s) to match video duration ({clip.duration:.2f}s) for segment {segment_index}") + try: + audio_clip = audio_clip.fx(vfx.speedx, factor=audio_clip.duration / clip.duration) + except Exception as e: + print(f"Error adjusting audio speed for segment {segment_index}: {e}. Using original audio duration.") + # If speeding fails, maybe just loop or subclip the audio? Or regenerate silent audio. + # For now, if speedx fails, let's just attach the original audio and hope for the best timing wise. + pass # Keep the original audio_clip if speedx fails + + clip = clip.set_audio(audio_clip) + else: + # If TTS failed or audio loading failed, ensure video clip has no audio or silent audio + print(f"No valid audio for clip {segment_index}. Setting silent audio.") + silent_audio_path = generate_silent_audio(clip.duration) # Generate silent audio matching the clip's final duration + if silent_audio_path and os.path.exists(silent_audio_path): + try: + silent_audio_clip = AudioFileClip(silent_audio_path) + # Should match duration, but double check + if abs(silent_audio_clip.duration - clip.duration) > 0.1: + silent_audio_clip = silent_audio_clip.fx(vfx.speedx, factor=silent_audio_clip.duration / clip.duration) + clip = clip.set_audio(silent_audio_clip) + except Exception as e: + print(f"Error setting silent audio for segment {segment_index}: {e}") + clip = clip.set_audio(None) # Set audio to None if silent audio fails loading + else: + clip = clip.set_audio(None) # Set audio to None if silent audio generation fails + + + # Add subtitles if enabled and text exists + if caption_enabled and narration_text and caption_color.lower() != "transparent" and narration_text.strip(): try: - words = customizations['text'].split() - chunks = [] - current_chunk = [] - for word in words: - current_chunk.append(word) - if len(current_chunk) >= customizations.get('text_words_per_chunk', 5): - chunks.append(' '.join(current_chunk)) - current_chunk = [] - if current_chunk: - chunks.append(' '.join(current_chunk)) - chunk_duration = audio_duration / len(chunks) if chunks else audio_duration - subtitle_clips = [] - subtitle_y = target_resolution[1] * customizations.get('text_position_y', 0.70) - for i, chunk_text in enumerate(chunks): - start_time = i * chunk_duration - end_time = (i + 1) * chunk_duration + # Determine total audio duration (using actual if available, else estimated) + # Use clip.duration for subtitle timing as the clip's duration is final + actual_clip_duration_for_subtitles = clip.duration + if actual_clip_duration_for_subtitles <= 0: + print(f"Clip duration is zero or negative for segment {segment_index}, cannot add subtitles.") + else: + # Simple word-based chunking for subtitles + words = narration_text.split() + # Calculate average word duration based on clip duration and word count + total_words = len(words) + average_word_duration = actual_clip_duration_for_subtitles / total_words if total_words > 0 else 0.5 # Default if no words + + subtitle_clips = [] + current_time = 0 + chunk_size = 6 # Words per caption chunk (adjust as needed for readability) + + for i in range(0, total_words, chunk_size): + chunk_words = words[i:i+chunk_size] + chunk_text = ' '.join(chunk_words) + # Estimate chunk duration based on word count * average word duration + estimated_chunk_duration = len(chunk_words) * average_word_duration + + start_time = current_time + # Ensure end time doesn't exceed the *clip* duration + end_time = min(current_time + estimated_chunk_duration, clip.duration) + # Ensure minimal duration for a chunk + if end_time - start_time < 0.1 and i + chunk_size < total_words: + end_time = min(start_time + 0.1, clip.duration) # Give it at least 0.1s + + if start_time >= end_time: break # Avoid 0 or negative duration clips + + + # Determine vertical position + if caption_position == "Top": + subtitle_y_position = int(target_resolution[1] * 0.05) # Slightly lower than top edge + elif caption_position == "Middle": + # Calculate vertical center, then subtract half the estimated text height + # Estimate text height based on font size and number of lines (adjust factor as needed) + estimated_text_lines = max(1, math.ceil(len(chunk_words) / chunk_size)) # Crude estimate, at least 1 line + estimated_total_text_height = estimated_text_lines * caption_size * 1.2 # 1.2 is line spacing approx + subtitle_y_position = int(target_resolution[1] * 0.5) - int(estimated_total_text_height / 2) + # Ensure position is not off-screen (allow negative slightly for vertical alignment) + # subtitle_y_position = max(0, subtitle_y_position) # Don't clamp to 0 for Middle, let moviepy handle it + + else: # Default to Bottom + # Position from the bottom edge + # positioning the top-left of the text box at 85% of height often looks good for bottom captions. + subtitle_y_position = int(target_resolution[1] * 0.85) # Top-left of text box is at 85% height + + + txt_clip = TextClip( + chunk_text, + fontsize=caption_size, + font='Arial-Bold', # Ensure this font is available or use a common system font + color=caption_color, + bg_color=caption_bg_color, # Use background color + method='caption', # Enables text wrapping + align='center', + stroke_width=caption_stroke_width, # Use stroke + stroke_color=caption_stroke_color, # Use stroke color + size=(target_resolution[0] * 0.9, None) # Caption width max 90% of video width + ).set_start(start_time).set_end(end_time) + + # Position is tuple ('center', y_position) + txt_clip = txt_clip.set_position(('center', subtitle_y_position)) + subtitle_clips.append(txt_clip) + current_time = end_time # Move to the end of the current chunk + + if subtitle_clips: + clip = CompositeVideoClip([clip] + subtitle_clips) + # print(f"Added {len(subtitle_clips)} subtitle chunks to clip {segment_index}.") # Keep less noisy + # else: + # print(f"No subtitle clips generated for segment {segment_index} (might be due to text/duration issues).") # Keep less noisy + + + except Exception as sub_error: + print(f"Error adding subtitles for segment {segment_index}: {sub_error}") + # Fallback to a single centered text overlay if detailed subtitling fails + try: txt_clip = TextClip( - chunk_text, - fontsize=customizations.get('text_size', 45), - font=customizations.get('text_font', 'Arial-Bold'), - color=customizations.get('text_color', 'white'), - bg_color=customizations.get('bg_color', 'rgba(0, 0, 0, 0.25)'), + narration_text, + fontsize=caption_size, + font='Arial-Bold', + color=caption_color, + bg_color=caption_bg_color, method='caption', - align=customizations.get('text_alignment', 'center'), - stroke_width=customizations.get('text_stroke_width', 2), - stroke_color=customizations.get('text_stroke_color', 'white'), - size=(target_resolution[0] * customizations.get('text_width_ratio', 0.8), None) - ).set_start(start_time).set_end(end_time) - txt_clip = txt_clip.set_position((customizations.get('text_alignment', 'center'), subtitle_y)) - subtitle_clips.append(txt_clip) - clip = CompositeVideoClip([clip] + subtitle_clips) - except Exception as sub_error: - print(f"Subtitle error: {sub_error}") - txt_clip = TextClip( - customizations['text'], - fontsize=customizations.get('text_size', 45), - font=customizations.get('text_font', 'Arial-Bold'), - color=customizations.get('text_color', 'white'), - align=customizations.get('text_alignment', 'center'), - size=(target_resolution[0] * customizations.get('text_width_ratio', 0.8), None) - ).set_position((customizations.get('text_alignment', 'center'), int(target_resolution[1] / 3))).set_duration(clip.duration) - clip = CompositeVideoClip([clip, txt_clip]) - clip = clip.set_audio(audio_clip) - print(f"Clip created: {clip.duration:.1f}s") + align='center', + stroke_width=caption_stroke_width, + stroke_color=caption_stroke_color, + size=(target_resolution[0] * 0.8, None) + ).set_position(('center', int(target_resolution[1] * 0.75))).set_duration(clip.duration) # Position slightly above bottom + clip = CompositeVideoClip([clip, txt_clip]) + print(f"Added simple fallback subtitle for segment {segment_index}.") + except Exception as fallback_sub_error: + print(f"Simple fallback subtitle failed for segment {segment_index}: {fallback_sub_error}") + + + # Ensure final clip duration is explicitly set (already done earlier based on audio) + # clip = clip.set_duration(clip.duration) + + # print(f"Clip {segment_index} created successfully: {clip.duration:.2f}s") # Keep less noisy return clip except Exception as e: - print(f"Error in create_clip: {str(e)}") - return None + print(f"Critical error in create_clip for segment {segment_index}: {str(e)}") + # Create a black clip with error message if anything goes wrong during the main process + # Use a safe duration if previous duration calculation also failed + error_duration = target_clip_duration if 'target_clip_duration' in locals() and target_clip_duration > 0 else (estimated_duration if estimated_duration > 0 else 3.0) + print(f"Creating error placeholder black clip for segment {segment_index} with duration {error_duration:.2f}s.") + black_clip = ColorClip(size=target_resolution, color=(0,0,0), duration=error_duration) + error_text = f"Error in segment {segment_index}" + if narration_text: error_text += f":\n{narration_text[:50]}..." + error_txt_clip = TextClip( + error_text, + fontsize=30, + color="red", + align='center', + size=(target_resolution[0] * 0.9, None) + ).set_position('center').set_duration(error_duration) + clip = CompositeVideoClip([black_clip, error_txt_clip]) + silent_audio_path = generate_silent_audio(error_duration) + if silent_audio_path and os.path.exists(silent_audio_path): + try: + clip = clip.set_audio(AudioFileClip(silent_audio_path)) + except Exception as audio_e: + print(f"Error setting silent audio for error clip {segment_index}: {audio_e}") + clip = clip.set_audio(None) + else: + clip = clip.set_audio(None) + return clip + def fix_imagemagick_policy(): - try: - print("Attempting to fix ImageMagick security policies...") - policy_paths = [ + """Attempt to fix ImageMagick security policies required by TextClip.""" + print("Attempting to fix ImageMagick security policies...") + + # Use the found binary path if available, otherwise use default list + if found_imagemagick_binary: + # Assuming policy.xml is relative to the binary path or in a standard location + # This is a heuristic, may need manual path depending on installation + # Normalize binary path to handle symlinks etc. + real_imagemagick_binary_path = os.path.realpath(found_imagemagick_binary) + binary_dir = os.path.dirname(real_imagemagick_binary_path) + policy_paths_to_check = [ + os.path.join(binary_dir, '..', 'etc', 'ImageMagick-7', 'policy.xml'), + os.path.join(binary_dir, '..', 'etc', 'ImageMagick-6', 'policy.xml'), + os.path.join(binary_dir, '..', 'etc', 'ImageMagick', 'policy.xml'), + os.path.join(binary_dir, '..', 'share', 'ImageMagick-7', 'policy.xml'), + os.path.join(binary_dir, '..', 'share', 'ImageMagick-6', 'policy.xml'), + os.path.join(binary_dir, '..', 'share', 'ImageMagick', 'policy.xml'), + # Add more paths relative to binary if needed + ] + # Add standard system paths as fallbacks + policy_paths_to_check.extend([ "/etc/ImageMagick-6/policy.xml", "/etc/ImageMagick-7/policy.xml", "/etc/ImageMagick/policy.xml", - "/usr/local/etc/ImageMagick-7/policy.xml" - ] - found_policy = next((path for path in policy_paths if os.path.exists(path)), None) - if not found_policy: - print("No policy.xml found. Using alternative subtitle method.") - return False - print(f"Modifying policy file at {found_policy}") - os.system(f"sudo cp {found_policy} {found_policy}.bak") - os.system(f"sudo sed -i 's/rights=\"none\"/rights=\"read|write\"/g' {found_policy}") - os.system(f"sudo sed -i 's/]*>/]*>//g' {found_policy}") - print("ImageMagick policies updated successfully.") - return True - except Exception as e: - print(f"Error fixing policies: {e}") + "/usr/local/etc/ImageMagick-7/policy.xml", # macports/homebrew path + "/usr/share/ImageMagick/policy.xml", + "/usr/share/ImageMagick-6/policy.xml", + "/usr/share/ImageMagick-7/policy.xml", + os.path.join(os.environ.get('MAGICK_HOME', '') if os.environ.get('MAGICK_HOME') else '.', 'policy.xml'), # Check MAGICK_HOME + ]) + else: + # Only check standard system paths if binary wasn't found + policy_paths_to_check = [ + "/etc/ImageMagick-6/policy.xml", + "/etc/ImageMagick-7/policy.xml", + "/etc/ImageMagick/policy.xml", + "/usr/local/etc/ImageMagick-7/policy.xml", # macports/homebrew path + "/usr/share/ImageMagick/policy.xml", + "/usr/share/ImageMagick-6/policy.xml", + "/usr/share/ImageMagick-7/policy.xml", + os.path.join(os.environ.get('MAGICK_HOME', '') if os.environ.get('MAGICK_HOME') else '.', 'policy.xml'), # Check MAGICK_HOME + ] + + + # Filter out empty paths and check existence, prioritize unique paths + existing_policy_paths = [] + seen_paths = set() + for path in policy_paths_to_check: + if path and os.path.exists(path) and path not in seen_paths: + existing_policy_paths.append(path) + seen_paths.add(path) + + + found_policy = None + if existing_policy_paths: + found_policy = existing_policy_paths[0] # Use the first unique one found + + if not found_policy: + print("No policy.xml found in common locations. TextClip may fail.") + print("Consider installing ImageMagick and checking its installation path and policy.xml location.") return False -# ---------------- Gradio Interface Logic ---------------- # -def generate_initial_clips(user_input, resolution): - global TARGET_RESOLUTION, TEMP_FOLDER - if resolution == "Full": - TARGET_RESOLUTION = (1920, 1080) - elif resolution == "Short": - TARGET_RESOLUTION = (1080, 1920) - else: - TARGET_RESOLUTION = (1920, 1080) - TEMP_FOLDER = tempfile.mkdtemp() + print(f"Attempting to modify policy file at {found_policy}") + try: + # Create a backup - use a unique name + backup_path = f"{found_policy}.bak_aivgen_{int(time.time())}" + if os.path.exists(found_policy): + shutil.copy2(found_policy, backup_path) + print(f"Created backup at {backup_path}") + else: + print(f"Warning: Policy file {found_policy} not found at copy stage, cannot create backup.") + + + # Read the original policy file (handle potential permission issues) + policy_content = None + try: + with open(found_policy, 'r') as f: + policy_content = f.read() + except Exception as e: + print(f"Error reading policy file {found_policy}: {e}. Attempting with sudo cat...") + try: + # Use sudo cat to read if direct read fails + process = subprocess.Popen(['sudo', 'cat', found_policy], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = process.communicate() + if process.returncode == 0: + policy_content = stdout.decode('utf-8') + print("Read policy file content using sudo.") + else: + print(f"Failed to read policy file using sudo cat. Result code: {process.returncode}. Error: {stderr.decode('utf-8')}") + print("Manual intervention may be required.") + return False + except FileNotFoundError: + print(f"sudo command not found. Cannot read policy file with sudo.") + return False + except Exception as e_sudo_read: + print(f"Error executing sudo cat: {e_sudo_read}") + print("Manual intervention may be required.") + return False + + if policy_content is None: + print("Failed to read policy file content.") + return False + + # Use regex to find and replace the specific policy lines + # Allow read and write rights for PDF, EPS, PS, etc. potentially restricted formats + # Also ensure path policies allow reading/writing files + # Be more specific with replacements to avoid unintended side effects + modified_content = re.sub( + r'', # Added /? for self-closing tag + r'', # Ensure it ends with self-closing tag + modified_content + ) + + # Also handle a more general case if the above didn't match, but with caution + # This attempts to change any 'rights="none"' on 'coder' or 'path' domains + # if the specific patterns weren't matched. + def _replace_none_rights(match): + domain = match.group(1) + rest = match.group(2) + # Only replace if rights is currently "none" + if 'rights="none"' in match.group(0): + print(f"Applying general policy fix for domain '{domain}'") + return f'' + return match.group(0) # Return original if no "none" rights found + + modified_content = re.sub( + r'', + _replace_none_rights, + modified_content + ) + + + # Write the modified content back (handle potential permission issues) + try: + with open(found_policy, 'w') as f: + f.write(modified_content) + print("ImageMagick policies updated successfully (direct write).") + return True + except IOError as e: + print(f"Direct write failed: {e}. Attempting with sudo tee...") + # Fallback to using os.system with sudo tee if direct write fails + # This requires the user to be able to run sudo commands without a password prompt for the script's execution + # and tee needs to be available. + # Using subprocess is safer than os.system for piping + try: + # Write modified content to a temporary file first + # Ensure TEMP_FOLDER is set before creating a temp file path + if not TEMP_FOLDER: + print("Error: TEMP_FOLDER not set for sudo write fallback.") + return False + os.makedirs(TEMP_FOLDER, exist_ok=True) # Ensure temp folder exists + + temp_policy_file = os.path.join(TEMP_FOLDER, "temp_policy_modified.xml") + with open(temp_policy_file, 'w') as f: + f.write(modified_content) + + # Use sudo tee to overwrite the original file + # sudo tee < temp_file + cmd = ['sudo', 'tee', found_policy] + print(f"Executing: {' '.join(cmd)} < {temp_policy_file}") + + # Using subprocess with stdin redirection + with open(temp_policy_file, 'rb') as f_in: # Open in binary mode for input + process = subprocess.Popen(cmd, stdin=f_in, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = process.communicate() + + if process.returncode == 0: + print("ImageMagick policies updated successfully using sudo tee.") + return True + else: + print(f"Failed to update ImageMagick policies using sudo tee. Result code: {process.returncode}. Error: {stderr.decode('utf-8')}") + print("Please manually edit your policy.xml to grant read/write rights for coder and path domains.") + print("Example: Change to ") + return False + except FileNotFoundError: + print(f"sudo or tee command not found. Cannot write policy file with sudo.") + return False + except Exception as e_sudo_write: + print(f"Error executing sudo tee process: {e_sudo_write}") + print("Manual intervention may be required.") + return False + finally: + # Clean up the temporary file + if 'temp_policy_file' in locals() and os.path.exists(temp_policy_file): + os.remove(temp_policy_file) + + + except Exception as e_general: + print(f"General error during ImageMagick policy modification: {e_general}") + print("Manual intervention may be required.") + return False + + +# ---------------- Gradio Interface Functions ---------------- # + +def generate_script_and_show_editor(user_input, resolution_choice, + caption_enabled_choice, caption_color, + caption_size, caption_position, caption_bg_color, + caption_stroke_color, caption_stroke_width): + """ + Generates the script, parses it, stores segments in state, + and prepares the UI updates to show the editing interface. + Uses yield to update status. + """ + global TEMP_FOLDER + # Clean up previous run's temp folder if it exists + if TEMP_FOLDER and os.path.exists(TEMP_FOLDER): + print(f"Cleaning up previous temp folder: {TEMP_FOLDER}") + try: + # Use onerror to log errors during cleanup + def onerror(func, path, exc_info): + print(f"Error cleaning up {path}: {exc_info[1]}") + shutil.rmtree(TEMP_FOLDER, onerror=onerror) + except Exception as e: + print(f"Error starting cleanup of temp folder {TEMP_FOLDER}: {e}") + + # Create a new unique temporary folder for this run + # Add a suffix based on time to minimize collision risk if cleanup fails + TEMP_FOLDER = tempfile.mkdtemp(prefix="aivgen_") + print(f"Created new temp folder: {TEMP_FOLDER}") + + # Store global style choices in state or use them directly (let's store in state) + # Gradio State can hold a single object. Let's use a dict. + run_config = { + "resolution": (1920, 1080) if resolution_choice == "Full (1920x1080)" else (1080, 1920), + "caption_enabled": caption_enabled_choice == "Yes", + "caption_color": caption_color, + "caption_size": caption_size, + "caption_position": caption_position, + "caption_bg_color": caption_bg_color, + "caption_stroke_color": caption_stroke_color, + "caption_stroke_width": caption_stroke_width, + "temp_folder": TEMP_FOLDER # Store temp folder path + } + + # Initial status update and hide editing/video areas + # Yielding multiple updates in a list/tuple works for simultaneous updates + # The outputs need to match the order specified in the .click() outputs list + # Outputs list: 0=run_config_state, 1=status_output, 2=editing_area, 3=final_video_output, 4=script_preview_markdown, + # 5..5+MAX-1=segment_text_inputs, 5+MAX..5+2MAX-1=segment_file_inputs, 5+2MAX..5+3MAX-1=segment_editing_groups, + # 5+3MAX=segments_state + num_dynamic_outputs = MAX_SEGMENTS_FOR_EDITING * 3 # Textbox, File, Group per segment + + # Prepare initial updates for all dynamic components to be hidden + initial_textbox_updates = [gr.update(visible=False, value="") for _ in range(MAX_SEGMENTS_FOR_EDITING)] + initial_file_updates = [gr.update(visible=False, value=None) for _ in range(MAX_SEGMENTS_FOR_EDITING)] + initial_group_visibility_updates = [gr.update(visible=False) for _ in range(MAX_SEGMENTS_FOR_EDITING)] + initial_label_updates = [gr.update(value="") for _ in range(MAX_SEGMENTS_FOR_EDITING)] # Clear prompt labels initially + + + yield (run_config, # 0 + gr.update(value="Generating script...", visible=True), # 1 + gr.update(visible=False), # 2 editing area + gr.update(value=None, visible=False), # 3 video output + gr.update(visible=False, value="### Generated Script Preview\n\nGenerating script..."), # 4 raw script preview + # Outputs for dynamic components (initially hide/clear all) - Indices 5 onwards + *initial_textbox_updates, # segment_text_inputs + *initial_file_updates, # segment_file_inputs + *initial_group_visibility_updates, # segment_editing_groups + *initial_label_updates, # segment_prompt_labels - MUST BE INCLUDED IF JS UPDATES THEM VIA OUTPUTS LIST + [], # segments_state - This is the LAST element updated + ) + + + script_text = generate_script(user_input, OPENROUTER_API_KEY, OPENROUTER_MODEL) + + # Determine raw script preview content + raw_script_preview_content = f"### Generated Script Preview\n\n```\n{script_text}\n```" if script_text and not script_text.startswith("[Error]") else f"### Generated Script Preview\n\n{script_text}" + + if not script_text or script_text.startswith("[Error]"): + # Update status and keep editing/video areas hidden + yield (run_config, + gr.update(value=f"Script generation failed: {script_text}", visible=True), + gr.update(visible=False), + gr.update(value=None, visible=False), + gr.update(visible=True, value=raw_script_preview_content), # Show raw script preview on error + # Outputs for dynamic components (all hidden) + *[gr.update(visible=False, value="") for _ in range(MAX_SEGMENTS_FOR_EDITING)], + *[gr.update(visible=False, value=None) for _ in range(MAX_SEGMENTS_FOR_EDITING)], + *[gr.update(visible=False) for _ in range(MAX_SEGMENTS_FOR_EDITING)], + *[gr.update(value="") for _ in range(MAX_SEGMENTS_FOR_EDITING)], # Clear prompt labels + [], # segments_state remains empty + ) + return # Stop execution + + + yield (run_config, + gr.update(value="Parsing script...", visible=True), + gr.update(visible=False), + gr.update(value=None, visible=False), + gr.update(visible=True, value=raw_script_preview_content), # Show raw script preview + *[gr.update(visible=False, value="") for _ in range(MAX_SEGMENTS_FOR_EDITING)], + *[gr.update(visible=False, value=None) for _ in range(MAX_SEGMENTS_FOR_EDITING)], + *[gr.update(visible=False) for _ in range(MAX_SEGMENTS_FOR_EDITING)], + *[gr.update(value="") for _ in range(MAX_SEGMENTS_FOR_EDITING)], # Clear prompt labels + [], # segments_state will be updated next + ) + + + segments = parse_script(script_text) + + # Prepare updates for dynamic editing components based on parsed segments + textbox_updates = [] + file_updates = [] + group_visibility_updates = [] + label_updates = [] # Updates for prompt labels + + for i in range(MAX_SEGMENTS_FOR_EDITING): + if i < len(segments): + # Show group, populate text, clear file upload, set prompt label + textbox_updates.append(gr.update(value=segments[i]['text'], visible=True)) + file_updates.append(gr.update(value=None, visible=True)) # Clear previous uploads + group_visibility_updates.append(gr.update(visible=True)) + label_updates.append(gr.update(value=f"Segment {i+1} (Prompt: {segments[i]['original_prompt']})", visible=True)) # Set label value and show + else: + # Hide unused groups and clear their values + textbox_updates.append(gr.update(value="", visible=False)) + file_updates.append(gr.update(value=None, visible=False)) + group_visibility_updates.append(gr.update(visible=False)) + label_updates.append(gr.update(value="", visible=False)) # Clear label value and hide + + + # Final yield to update UI: show editing area, populate fields, update state + yield (run_config, # 0 + gr.update(value=f"Script generated with {len(segments)} segments. Edit segments below.", visible=True), # 1 + gr.update(visible=True), # 2 Show Editing area + gr.update(value=None, visible=False), # 3 Ensure video output is hidden and cleared + gr.update(visible=True, value=raw_script_preview_content), # 4 Show raw script preview + # Dynamic outputs - Indices 5 onwards + *textbox_updates, # 5 Update textboxes (visibility and value) + *file_updates, # 6 Update file uploads (visibility and value) + *group_visibility_updates, # 7 Update visibility of groups + *label_updates, # 8 Update prompt labels (visibility and value) + segments, # 9 Update the state with parsed segments - This is the LAST element updated + ) + + +def generate_video_from_edited(run_config, segments_data, segment_texts, segment_uploads, bg_music_volume): + """ + Takes the edited segment data (text, uploaded files) and configuration, + and generates the final video. + Uses yield to update status. + """ + if not segments_data: + yield "No segments to process. Generate script first.", None + return + + global TEMP_FOLDER + # Ensure TEMP_FOLDER is correctly set from run_config + TEMP_FOLDER = run_config.get("temp_folder") + if not TEMP_FOLDER or not os.path.exists(TEMP_FOLDER): + yield "Error: Temporary folder not found from run config. Please regenerate script.", None + # Attempt cleanup just in case temp folder existed but was invalid + if TEMP_FOLDER and os.path.exists(TEMP_FOLDER): + try: + shutil.rmtree(TEMP_FOLDER) + except Exception as e: + print(f"Error cleaning up invalid temp folder {TEMP_FOLDER}: {e}") + TEMP_FOLDER = None # Reset global + return + + # Extract config from run_config + TARGET_RESOLUTION = run_config.get("resolution", (1920, 1080)) # Default if missing + CAPTION_ENABLED = run_config.get("caption_enabled", True) # Default if missing + CAPTION_COLOR = run_config.get("caption_color", "#FFFFFF") # Default if missing + CAPTION_SIZE = run_config.get("caption_size", 45) # Default if missing + CAPTION_POSITION = run_config.get("caption_position", "Bottom") # Default if missing + CAPTION_BG_COLOR = run_config.get("caption_bg_color", "rgba(0, 0, 0, 0.4)") # Default if missing + CAPTION_STROKE_COLOR = run_config.get("caption_stroke_color", "#000000") # Default if missing + CAPTION_STROKE_WIDTH = run_config.get("caption_stroke_width", 2) # Default if missing + + + # Update segments_data with potentially edited text and uploaded file paths + # segment_texts and segment_uploads are lists of values from the Gradio components + processed_segments = [] + # Iterate up to the minimum of state segments and provided inputs + num_segments_to_process = min(len(segments_data), len(segment_texts), len(segment_uploads), MAX_SEGMENTS_FOR_EDITING) + + if num_segments_to_process == 0: + yield "No segments to process after reading editor inputs. Script might be empty or inputs missing.", None + # Clean up + if TEMP_FOLDER and os.path.exists(TEMP_FOLDER): + try: + shutil.rmtree(TEMP_FOLDER) + print(f"Cleaned up temp folder: {TEMP_FOLDER}") + except Exception as e: + print(f"Error cleaning up temp folder {TEMP_FOLDER}: {e}") + TEMP_FOLDER = None # Reset global + return + + + for i in range(num_segments_to_process): + segment = segments_data[i] # Get original segment data + processed_segment = segment.copy() # Make a copy + # Use edited text, strip whitespace + processed_segment['text'] = segment_texts[i].strip() if segment_texts[i] is not None else segment.get('text', '').strip() + # Use uploaded media path (will be None if nothing uploaded) + processed_segment['uploaded_media'] = segment_uploads[i] + processed_segments.append(processed_segment) + + + yield "Fixing ImageMagick policy...", None + # Call fix_imagemagick_policy again just before video generation as a safeguard + # This might require the user to enter a password if sudo is needed. + # It's better to handle permissions manually or configure sudoers. fix_imagemagick_policy() - print("Generating script from API...") - script = generate_script(user_input) - if not script: - print("Failed to generate script.") - shutil.rmtree(TEMP_FOLDER) - return None, [], "" - print("Generated Script:\n", script) - elements = parse_script(script) - if not elements: - print("Failed to parse script into elements.") - shutil.rmtree(TEMP_FOLDER) - return None, [], "" - print(f"Parsed {len(elements)//2} script segments.") - paired_elements = [] - for i in range(0, len(elements), 2): - if i + 1 < len(elements): - paired_elements.append((elements[i], elements[i + 1])) - if not paired_elements: - print("No valid script segments found.") - shutil.rmtree(TEMP_FOLDER) - return None, [], "" - clips_data = [] - for idx, (media_elem, tts_elem) in enumerate(paired_elements): - print(f"\nProcessing segment {idx+1}/{len(paired_elements)} with prompt: '{media_elem['prompt']}'") - media_asset = generate_media(media_elem['prompt'], TEMP_FOLDER) - if not media_asset: - print(f"Skipping segment {idx+1} due to missing media asset.") - continue - tts_path = generate_tts(tts_elem['text'], tts_elem['voice'], TEMP_FOLDER) - if not tts_path: - print(f"Skipping segment {idx+1} due to TTS generation failure.") - continue - clips_data.append({ - 'media_path': media_asset['path'], - 'asset_type': media_asset['asset_type'], - 'tts_path': tts_path, - 'text': tts_elem['text'], - 'customizations': { - 'text_color': '#FFFFFF', - 'text_size': 45, - 'text_font': 'Arial-Bold', - 'text_alignment': 'center', - 'text_position_y': 0.70, - 'text_width_ratio': 0.8, - 'text_words_per_chunk': 5, - 'text_stroke_width': 2, - 'text_stroke_color': '#FFFFFF', - 'bg_color': 'rgba(0, 0, 0, 0.25)', - 'video_brightness': 1.0, - 'video_contrast': 1.0, - 'video_speed': 1.0, - 'image_brightness': 1.0, - 'image_contrast': 1.0, - 'kenburns_effect': 'random', - 'fade_in_duration': 0.3, - 'fade_out_duration': 0.3 - } - }) - if not clips_data: - print("No clips were successfully created.") - shutil.rmtree(TEMP_FOLDER) - return None, [], "" - return clips_data, [clip['text'] for clip in clips_data], script - -def update_clips(clips_data, text_inputs, media_uploads, global_customizations, per_clip_customizations): - for i, (text, upload, per_clip_cust) in enumerate(zip(text_inputs, media_uploads, per_clip_customizations)): - if i < len(clips_data): - clips_data[i]['text'] = text - if upload: - media_asset = generate_media(clips_data[i]['customizations']['prompt'] if 'prompt' in clips_data[i]['customizations'] else f"clip_{i}", TEMP_FOLDER, upload) - if media_asset: - clips_data[i]['media_path'] = media_asset['path'] - clips_data[i]['asset_type'] = media_asset['asset_type'] - # Update customizations: per-clip overrides global - clips_data[i]['customizations'] = {**global_customizations, **per_clip_cust} - clips_data[i]['customizations']['text'] = text - return clips_data - -def generate_final_video(clips_data): + clips = [] - for clip_data in clips_data: + yield "Generating media and audio for clips...", None + + total_segments = len(processed_segments) + for idx, segment in enumerate(processed_segments): + yield f"Processing segment {idx+1}/{total_segments}...", None + print(f"\nProcessing segment {idx+1}/{total_segments} (Prompt: '{segment.get('original_prompt', 'N/A')[:30]}...')") + + # Determine media source: uploaded or generated + media_asset = generate_media_asset( + segment.get('original_prompt', 'background'), # Use original prompt for search if available, else a generic term + segment.get('uploaded_media') # Pass uploaded media path + ) + + # Generate TTS audio + tts_path = generate_tts(segment.get('text', '')) # Use edited text, default to empty string if None/missing + + # Create the video clip for this segment clip = create_clip( - media_path=clip_data['media_path'], - asset_type=clip_data['asset_type'], - tts_path=clip_data['tts_path'], - duration=AudioFileClip(clip_data['tts_path']).duration, - customizations=clip_data['customizations'], + media_asset=media_asset if media_asset else {"path": None, "asset_type": None}, # Pass dummy if generate_media_asset failed + tts_path=tts_path, + estimated_duration=segment.get('duration', 3.0), # Use estimated duration as a fallback reference target_resolution=TARGET_RESOLUTION, - temp_folder=TEMP_FOLDER + caption_enabled=CAPTION_ENABLED, + caption_color=CAPTION_COLOR, + caption_size=CAPTION_SIZE, + caption_position=CAPTION_POSITION, + caption_bg_color=CAPTION_BG_COLOR, + caption_stroke_color=CAPTION_STROKE_COLOR, + caption_stroke_width=CAPTION_STROKE_WIDTH, + narration_text=segment.get('text', ''), # Pass narration text for captions + segment_index=idx+1 ) + if clip: clips.append(clip) else: - print(f"Clip creation failed for segment.") + print(f"Skipping segment {idx+1} due to clip creation failure.") + # If create_clip returns None (shouldn't happen with fallback logic, but as safety) + # Add a placeholder black clip + placeholder_duration = segment.get('duration', 3.0) # Use estimated duration or default + placeholder_clip = ColorClip(size=TARGET_RESOLUTION, color=(0,0,0), duration=placeholder_duration) + silent_audio_path = generate_silent_audio(placeholder_duration) + if silent_audio_path and os.path.exists(silent_audio_path): + placeholder_clip = placeholder_clip.set_audio(AudioFileClip(silent_audio_path)) + error_text = f"Segment {idx+1} Failed" + if segment.get('text'): error_text += f":\n{segment['text'][:50]}..." + error_txt_clip = TextClip(error_text, fontsize=30, color="red", align='center', size=(TARGET_RESOLUTION[0] * 0.9, None)).set_position('center').set_duration(placeholder_duration) + placeholder_clip = CompositeVideoClip([placeholder_clip, error_txt_clip]) + clips.append(placeholder_clip) + + if not clips: - print("No clips were successfully created.") - shutil.rmtree(TEMP_FOLDER) - return None + yield "No clips were successfully created. Video generation failed.", None + # Clean up + if TEMP_FOLDER and os.path.exists(TEMP_FOLDER): + try: + shutil.rmtree(TEMP_FOLDER) + print(f"Cleaned up temp folder: {TEMP_FOLDER}") + except Exception as e: + print(f"Error cleaning up temp folder {TEMP_FOLDER}: {e}") + TEMP_FOLDER = None # Reset global + return + + yield "Concatenating clips...", None print("\nConcatenating clips...") - final_video = concatenate_videoclips(clips, method="compose") - final_video = add_background_music(final_video, bg_music_volume=0.08) + try: + final_video = concatenate_videoclips(clips, method="compose") + except Exception as e: + print(f"Error concatenating clips: {e}") + yield f"Error concatenating clips: {e}", None + # Clean up + if TEMP_FOLDER and os.path.exists(TEMP_FOLDER): + try: + shutil.rmtree(TEMP_FOLDER) + print(f"Cleaned up temp folder: {TEMP_FOLDER}") + except Exception as e: + print(f"Error cleaning up temp folder {TEMP_FOLDER}: {e}") + TEMP_FOLDER = None # Reset global + return + + + yield "Adding background music...", None + bg_music_path = find_mp3_files() # Find background music + final_video = add_background_music(final_video, bg_music_path, bg_music_volume=bg_music_volume) # Use volume from input + + + yield f"Exporting final video to {OUTPUT_VIDEO_FILENAME}...", None print(f"Exporting final video to {OUTPUT_VIDEO_FILENAME}...") - final_video.write_videofile(OUTPUT_VIDEO_FILENAME, codec='libx264', fps=24, preset='veryfast') - print(f"Final video saved as {OUTPUT_VIDEO_FILENAME}") - print("Cleaning up temporary files...") - shutil.rmtree(TEMP_FOLDER) - print("Temporary files removed.") - return OUTPUT_VIDEO_FILENAME - -# ---------------- Gradio Interface ---------------- # -with gr.Blocks() as iface: - gr.Markdown("# Highly Customizable AI Video Generator") - gr.Markdown("Input a concept, edit AI-generated clips, customize extensively, and generate your video!") - - # Step 1: Prompt Input and Script Generation - with gr.Row(): - user_input = gr.Textbox(label="Video Concept", placeholder="Enter your video concept here...") - resolution = gr.Radio(["Full", "Short"], label="Resolution", value="Full") - generate_script_btn = gr.Button("Generate Script and Clips") - - # Display Generated Script - script_output = gr.Textbox(label="Generated Script", interactive=False) - - # Global Customization Options - with gr.Group(): - gr.Markdown("## Global Customization Options") - with gr.Row(): - global_text_color = gr.ColorPicker(label="Text Color", value="#FFFFFF") - global_text_size = gr.Slider(20, 100, step=1, label="Text Size", value=45) - global_text_font = gr.Dropdown(["Arial-Bold", "Times-Roman", "Courier"], label="Text Font", value="Arial-Bold") - with gr.Row(): - global_text_alignment = gr.Dropdown(["center", "left", "right"], label="Text Alignment", value="center") - global_text_position_y = gr.Slider(0.1, 0.9, step=0.05, label="Text Y Position", value=0.70) - global_text_width_ratio = gr.Slider(0.5, 1.0, step=0.05, label="Text Width Ratio", value=0.8) + output_path = None + try: + # Use a temporary output file first for safety, within TEMP_FOLDER + temp_output_filename = os.path.join(TEMP_FOLDER, f"temp_final_video_{int(time.time())}.mp4") + final_video.write_videofile(temp_output_filename, codec='libx264', fps=24, preset='veryfast') + + # Ensure the destination directory for the final output exists (current dir) + os.makedirs(os.path.dirname(OUTPUT_VIDEO_FILENAME) or '.', exist_ok=True) + + # Move the final file to the intended location after successful export + final_output_path = OUTPUT_VIDEO_FILENAME + try: + shutil.move(temp_output_filename, final_output_path) + print(f"Final video saved as {final_output_path}") + output_path = final_output_path + except shutil.SameFileError: + print(f"Output path is the same as temp path, no move needed: {temp_output_filename}") + output_path = temp_output_filename + except Exception as e: + print(f"Error moving temporary file {temp_output_filename} to final destination {final_output_path}: {e}") + # If move fails, return the temp file path or None + output_path = temp_output_filename # Return temp path so user can access it + print(f"Returning video from temporary path: {output_path}") + + + except Exception as e: + print(f"Error exporting video: {e}") + output_path = None + yield f"Video export failed: {e}", None # Provide error message in status + + # Clean up temporary folder + yield "Cleaning up temporary files...", output_path # Update status before cleanup + if TEMP_FOLDER and os.path.exists(TEMP_FOLDER): + try: + # Use onerror to log errors during cleanup + def onerror(func, path, exc_info): + print(f"Error cleaning up {path}: {exc_info[1]}") + shutil.rmtree(TEMP_FOLDER, onerror=onerror) + print(f"Cleaned up temp folder: {TEMP_FOLDER}") + except Exception as e: + print(f"Error starting cleanup of temp folder {TEMP_FOLDER}: {e}") + TEMP_FOLDER = None # Reset global + + yield "Done!", output_path # Final status update + + +# ---------------- Gradio Interface Definition (Blocks) ---------------- # + +# Need lists to hold the dynamic UI components for segments +segment_editing_groups = [] +segment_prompt_labels = [] # List to hold the prompt Labels +segment_text_inputs = [] +segment_file_inputs = [] + +with gr.Blocks() as demo: + gr.Markdown("# 🤖 AI Documentary Video Generator 🎬") + gr.Markdown("Enter a concept to generate a funny documentary script. You can then edit the script text and replace the suggested media for each segment before generating the final video.") + + # --- Global Settings --- + with gr.Accordion("Global Settings", open=True): + user_concept_input = gr.Textbox(label="Video Concept", placeholder="e.g., The secret life of pigeons, Why socks disappear in the laundry, The futility of alarm clocks...") with gr.Row(): - global_text_words_per_chunk = gr.Slider(3, 10, step=1, label="Words per Subtitle Chunk", value=5) - global_text_stroke_width = gr.Slider(0, 5, step=1, label="Text Stroke Width", value=2) - global_text_stroke_color = gr.ColorPicker(label="Text Stroke Color", value="#FFFFFF") + resolution_radio = gr.Radio(["Full (1920x1080)", "Short (1080x1920)"], label="Video Resolution", value="Full (1920x1080)") + bg_music_volume_slider = gr.Slider(minimum=0, maximum=0.5, value=0.08, step=0.01, label="Background Music Volume", info="Lower volume keeps narration clear.") # Adjusted max volume + + + # --- Caption Settings --- + with gr.Accordion("Caption Settings", open=False): + caption_enabled_radio = gr.Radio(["Yes", "No"], label="Show Captions?", value="Yes") with gr.Row(): - global_bg_color = gr.ColorPicker(label="Background Color", value="rgba(0, 0, 0, 0.25)") - global_video_brightness = gr.Slider(0.5, 1.5, step=0.05, label="Video Brightness", value=1.0) - global_video_contrast = gr.Slider(0.5, 1.5, step=0.05, label="Video Contrast", value=1.0) + caption_color_picker = gr.ColorPicker(label="Caption Text Color", value="#FFFFFF") # Default white + caption_bg_color_picker = gr.ColorPicker(label="Caption Background Color (with transparency)", value="rgba(0, 0, 0, 0.4)") # Default semi-transparent black, slightly more opaque with gr.Row(): - global_video_speed = gr.Slider(0.5, 2.0, step=0.1, label="Video Speed", value=1.0) - global_image_brightness = gr.Slider(0.5, 1.5, step=0.05, label="Image Brightness", value=1.0) - global_image_contrast = gr.Slider(0.5, 1.5, step=0.05, label="Image Contrast", value=1.0) + caption_size_slider = gr.Slider(minimum=20, maximum=80, value=45, step=1, label="Caption Font Size") # Adjusted max size + caption_stroke_width_slider = gr.Slider(minimum=0, maximum=5, value=2, step=0.5, label="Caption Stroke Width") with gr.Row(): - global_kenburns_effect = gr.Dropdown(["random", "zoom-in", "zoom-out", "pan-left", "pan-right", "up-left"], label="Ken Burns Effect", value="random") - global_fade_in_duration = gr.Slider(0.1, 1.0, step=0.1, label="Fade In Duration", value=0.3) - global_fade_out_duration = gr.Slider(0.1, 1.0, step=0.1, label="Fade Out Duration", value=0.3) - - # Clip Editing Interface - clip_state = gr.State() - text_inputs = gr.State([]) - with gr.Column(): - gr.Markdown("## Edit Clips") - clip_editors = gr.Column() - - def create_clip_editors(clips_data, text_list, script): - if not clips_data: - return gr.update(visible=False), clips_data, text_list - with clip_editors: - editors = [] - text_inputs_list = [] - media_uploads_list = [] - per_clip_customizations = [] - for i, clip in enumerate(clips_data): - with gr.Group(): - gr.Markdown(f"### Clip {i+1}: {clip['customizations'].get('prompt', 'Untitled')}") - text_input = gr.Textbox(label="Narration Text", value=clip['text'], interactive=True) - media_upload = gr.File(label="Upload Image/Video (Optional)", type="filepath") - with gr.Accordion("Advanced Customization", open=False): - text_color = gr.ColorPicker(label="Text Color (Override)", value=clip['customizations']['text_color']) - text_size = gr.Slider(20, 100, step=1, label="Text Size (Override)", value=clip['customizations']['text_size']) - text_font = gr.Dropdown(["Arial-Bold", "Times-Roman", "Courier"], label="Text Font (Override)", value=clip['customizations']['text_font']) - text_alignment = gr.Dropdown(["center", "left", "right"], label="Text Alignment (Override)", value=clip['customizations']['text_alignment']) - text_position_y = gr.Slider(0.1, 0.9, step=0.05, label="Text Y Position (Override)", value=clip['customizations']['text_position_y']) - text_width_ratio = gr.Slider(0.5, 1.0, step=0.05, label="Text Width Ratio (Override)", value=clip['customizations']['text_width_ratio']) - text_words_per_chunk = gr.Slider(3, 10, step=1, label="Words per Chunk (Override)", value=clip['customizations']['text_words_per_chunk']) - text_stroke_width = gr.Slider(0, 5, step=1, label="Text Stroke Width (Override)", value=clip['customizations']['text_stroke_width']) - text_stroke_color = gr.ColorPicker(label="Text Stroke Color (Override)", value=clip['customizations']['text_stroke_color']) - bg_color = gr.ColorPicker(label="Background Color (Override)", value=clip['customizations']['bg_color']) - video_brightness = gr.Slider(0.5, 1.5, step=0.05, label="Video Brightness (Override)", value=clip['customizations']['video_brightness']) - video_contrast = gr.Slider(0.5, 1.5, step=0.05, label="Video Contrast (Override)", value=clip['customizations']['video_contrast']) - video_speed = gr.Slider(0.5, 2.0, step=0.1, label="Video Speed (Override)", value=clip['customizations']['video_speed']) - image_brightness = gr.Slider(0.5, 1.5, step=0.05, label="Image Brightness (Override)", value=clip['customizations']['image_brightness']) - image_contrast = gr.Slider(0.5, 1.5, step=0.05, label="Image Contrast (Override)", value=clip['customizations']['image_contrast']) - kenburns_effect = gr.Dropdown(["random", "zoom-in", "zoom-out", "pan-left", "pan-right", "up-left"], label="Ken Burns Effect (Override)", value=clip['customizations']['kenburns_effect']) - fade_in_duration = gr.Slider(0.1, 1.0, step=0.1, label="Fade In Duration (Override)", value=clip['customizations']['fade_in_duration']) - fade_out_duration = gr.Slider(0.1, 1.0, step=0.1, label="Fade Out Duration (Override)", value=clip['customizations']['fade_out_duration']) - editors.append([text_input, media_upload, text_color, text_size, text_font, text_alignment, text_position_y, text_width_ratio, text_words_per_chunk, text_stroke_width, text_stroke_color, bg_color, video_brightness, video_contrast, video_speed, image_brightness, image_contrast, kenburns_effect, fade_in_duration, fade_out_duration]) - text_inputs_list.append(text_input) - media_uploads_list.append(media_upload) - per_clip_customizations.append({ - 'text_color': text_color, - 'text_size': text_size, - 'text_font': text_font, - 'text_alignment': text_alignment, - 'text_position_y': text_position_y, - 'text_width_ratio': text_width_ratio, - 'text_words_per_chunk': text_words_per_chunk, - 'text_stroke_width': text_stroke_width, - 'text_stroke_color': text_stroke_color, - 'bg_color': bg_color, - 'video_brightness': video_brightness, - 'video_contrast': video_contrast, - 'video_speed': video_speed, - 'image_brightness': image_brightness, - 'image_contrast': image_contrast, - 'kenburns_effect': kenburns_effect, - 'fade_in_duration': fade_in_duration, - 'fade_out_duration': fade_out_duration - }) - return gr.update(visible=True), clips_data, text_inputs_list - return gr.update(visible=False), clips_data, text_list - + caption_position_radio = gr.Radio(["Top", "Middle", "Bottom"], label="Caption Position", value="Bottom") + caption_stroke_color_picker = gr.ColorPicker(label="Caption Stroke Color", value="#000000") # Default black stroke + + + generate_script_btn = gr.Button("Generate Script", variant="primary") + + # --- Status and Script Output --- + status_output = gr.Label(label="Status", value="", visible=True) # Always visible + # Using Markdown to show raw script content + script_preview_markdown = gr.Markdown("### Generated Script Preview\n\nScript will appear here...", visible=False) # Initially hidden + + # --- State to hold parsed segments data and run config --- + segments_state = gr.State([]) # List of segment dictionaries + run_config_state = gr.State({}) # Dictionary for run configuration + + # --- Dynamic Editing Area (Initially hidden) --- + # We create MAX_SEGMENTS_FOR_EDITING groups, and show/hide them dynamically + with gr.Column(visible=False, elem_id="editing_area_id") as editing_area: # Added elem_id + gr.Markdown("### Edit Script Segments") + gr.Markdown("Review the AI-generated text and media suggestions below. Edit the text and/or upload your own image/video for any segment. If no file is uploaded, AI will fetch media based on the original prompt.") + for i in range(MAX_SEGMENTS_FOR_EDITING): + # Use gr.Group instead of gr.Box for compatibility + with gr.Group(visible=False) as segment_group: # Each group represents one segment + segment_editing_groups.append(segment_group) + # Use a Label to display the original prompt - it's non-interactive text + # The value will be updated by JS or Python outputs + segment_prompt_label = gr.Label( + f"Segment {i+1} Prompt:", # Initial placeholder text, will be overwritten + show_label=False, + #elem_classes="segment-prompt-label" # Add a class if needed + #data_segment_index=i # Custom data attribute not directly supported in gr.Label, use JS to add it later + ) + segment_prompt_labels.append(segment_prompt_label) + + + segment_text = gr.Textbox(label="Narration Text", lines=2, interactive=True) + segment_text_inputs.append(segment_text) + + segment_file = gr.File(label="Upload Custom Media (Image or Video)", type="filepath", interactive=True) + segment_file_inputs.append(segment_file) + + generate_video_btn = gr.Button("Generate Video", variant="primary") + + + # --- Final Video Output --- + final_video_output = gr.Video(label="Generated Video", visible=False) # Initially hidden + + # --- Event Handlers --- + + # Generate Script Button Click + # Outputs list must match the order of components being updated by yield in generate_script_and_show_editor generate_script_btn.click( - fn=generate_initial_clips, - inputs=[user_input, resolution], - outputs=[clip_state, text_inputs, script_output] - ).then( - fn=create_clip_editors, - inputs=[clip_state, text_inputs, script_output], - outputs=[clip_editors, clip_state, text_inputs] + fn=generate_script_and_show_editor, + inputs=[ + user_concept_input, + resolution_radio, + caption_enabled_radio, + caption_color_picker, + caption_size_slider, + caption_position_radio, + caption_bg_color_picker, + caption_stroke_color_picker, + caption_stroke_width_slider + ], + outputs=[ + run_config_state, # 0 + status_output, # 1 + editing_area, # 2 Show/hide editing area column + final_video_output, # 3 Hide and clear video output + script_preview_markdown, # 4 Update raw script preview + # Outputs for dynamic components (visibility and value updates) - Indices 5 onwards + *segment_text_inputs, # 5 ... + *segment_file_inputs, # ... + *segment_editing_groups, # ... + *segment_prompt_labels, # ... Update prompt labels using Python outputs directly + segments_state, # LAST - Update the state with parsed segments + ] ) - - # Generate Video Button - generate_video_btn = gr.Button("Generate Final Video") - video_output = gr.Video(label="Generated Video") - - def gather_inputs_and_generate(clips_data, *args): - text_inputs = args[:len(clips_data)] - media_uploads = args[len(clips_data):2*len(clips_data)] - per_clip_cust_inputs = args[2*len(clips_data):2*len(clips_data) + 18*len(clips_data)] - global_cust_inputs = args[2*len(clips_data) + 18*len(clips_data):] - per_clip_customizations = [] - for i in range(len(clips_data)): - start_idx = i * 18 - per_clip_customizations.append({ - 'text_color': per_clip_cust_inputs[start_idx], - 'text_size': per_clip_cust_inputs[start_idx + 1], - 'text_font': per_clip_cust_inputs[start_idx + 2], - 'text_alignment': per_clip_cust_inputs[start_idx + 3], - 'text_position_y': per_clip_cust_inputs[start_idx + 4], - 'text_width_ratio': per_clip_cust_inputs[start_idx + 5], - 'text_words_per_chunk': per_clip_cust_inputs[start_idx + 6], - 'text_stroke_width': per_clip_cust_inputs[start_idx + 7], - 'text_stroke_color': per_clip_cust_inputs[start_idx + 8], - 'bg_color': per_clip_cust_inputs[start_idx + 9], - 'video_brightness': per_clip_cust_inputs[start_idx + 10], - 'video_contrast': per_clip_cust_inputs[start_idx + 11], - 'video_speed': per_clip_cust_inputs[start_idx + 12], - 'image_brightness': per_clip_cust_inputs[start_idx + 13], - 'image_contrast': per_clip_cust_inputs[start_idx + 14], - 'kenburns_effect': per_clip_cust_inputs[start_idx + 15], - 'fade_in_duration': per_clip_cust_inputs[start_idx + 16], - 'fade_out_duration': per_clip_cust_inputs[start_idx + 17] - }) - global_customizations = { - 'text_color': global_cust_inputs[0], - 'text_size': global_cust_inputs[1], - 'text_font': global_cust_inputs[2], - 'text_alignment': global_cust_inputs[3], - 'text_position_y': global_cust_inputs[4], - 'text_width_ratio': global_cust_inputs[5], - 'text_words_per_chunk': global_cust_inputs[6], - 'text_stroke_width': global_cust_inputs[7], - 'text_stroke_color': global_cust_inputs[8], - 'bg_color': global_cust_inputs[9], - 'video_brightness': global_cust_inputs[10], - 'video_contrast': global_cust_inputs[11], - 'video_speed': global_cust_inputs[12], - 'image_brightness': global_cust_inputs[13], - 'image_contrast': global_cust_inputs[14], - 'kenburns_effect': global_cust_inputs[15], - 'fade_in_duration': global_cust_inputs[16], - 'fade_out_duration': global_cust_inputs[17] - } - updated_clips = update_clips(clips_data, text_inputs, media_uploads, global_customizations, per_clip_customizations) - return generate_final_video(updated_clips) - + + # Generate Video Button Click generate_video_btn.click( - fn=gather_inputs_and_generate, - inputs=[clip_state] + text_inputs.value + [gr.File(type="filepath")] * len(text_inputs.value) + [gr.ColorPicker(), gr.Slider(), gr.Dropdown(), gr.Dropdown(), gr.Slider(), gr.Slider(), gr.Slider(), gr.Slider(), gr.ColorPicker(), gr.ColorPicker(), gr.Slider(), gr.Slider(), gr.Slider(), gr.Slider(), gr.Slider(), gr.Dropdown(), gr.Slider(), gr.Slider()] * len(text_inputs.value) + [global_text_color, global_text_size, global_text_font, global_text_alignment, global_text_position_y, global_text_width_ratio, global_text_words_per_chunk, global_text_stroke_width, global_text_stroke_color, global_bg_color, global_video_brightness, global_video_contrast, global_video_speed, global_image_brightness, global_image_contrast, global_kenburns_effect, global_fade_in_duration, global_fade_out_duration], - outputs=video_output + fn=generate_video_from_edited, + inputs=[ + run_config_state, # Pass run config + segments_state, # Pass the original parsed segments data (needed for original_prompt and duration) + *segment_text_inputs, # Pass list of edited text values + *segment_file_inputs, # Pass list of uploaded file paths + bg_music_volume_slider # Pass background music volume + ], + outputs=[status_output, final_video_output] # Yield status updates and final video ) -iface.launch(share=True) \ No newline at end of file + # We don't need a segments_state.change JS handler anymore because the prompt labels + # are updated directly by the Python function via the outputs list. + # Removing the segments_state.change event listener entirely. + + +# Launch the interface +if __name__ == "__main__": + # Attempt ImageMagick policy fix on script startup + # This helps but might still require manual sudo depending on system config + fix_imagemagick_policy() + + print("Launching Gradio interface...") + + # Check if API keys are still placeholders (unlikely with hardcoded keys, but good practice) + if PEXELS_API_KEY.startswith('YOUR_PEXELS_API_KEY'): + print("Warning: PEXELS_API_KEY is not configured. Media search may fail.") + if OPENROUTER_API_KEY.startswith('YOUR_OPENROUTER_API_KEY'): + print("Warning: OPENROUTER_API_KEY is not configured. Script generation will fail.") + + demo.launch(share=True) # Set share=True to get a public link +