Spaces:
Build error
Build error
| # Import necessary libraries (Ensure all are installed: moviepy, soundfile, torch, | |
| # pydub, requests, pillow, numpy, beautifulsoup4, gtts, gradio, kokoro, opencv-python) | |
| from kokoro import KPipeline | |
| import soundfile as sf | |
| import torch | |
| import soundfile as sf | |
| import os | |
| from moviepy.editor import VideoFileClip, AudioFileClip, ImageClip | |
| from PIL import Image | |
| import tempfile | |
| import random | |
| import cv2 | |
| import math | |
| import os, requests, io, time, re, random | |
| from moviepy.editor import ( | |
| VideoFileClip, concatenate_videoclips, AudioFileClip, ImageClip, | |
| CompositeVideoClip, TextClip, CompositeAudioClip | |
| ) | |
| import moviepy.video.fx.all as vfx | |
| import moviepy.config as mpy_config | |
| from pydub import AudioSegment | |
| from pydub.generators import Sine | |
| from PIL import Image, ImageDraw, ImageFont | |
| import numpy as np | |
| from bs4 import BeautifulSoup | |
| import base64 | |
| from urllib.parse import quote | |
| # pysrt is imported but not used in the provided code snippets, keeping for completeness | |
| # import pysrt | |
| from gtts import gTTS | |
| import gradio as gr # Import Gradio | |
| import shutil # Needed for temp folder cleanup | |
| # Initialize Kokoro TTS pipeline (using American English) | |
| # Ensure you have the required voice models downloaded for Kokoro if needed, | |
| # or it will fall back to gTTS. 'a' for American English uses voice 'af_heart'. | |
| try: | |
| pipeline = KPipeline(lang_code='a') | |
| print("Kokoro TTS pipeline initialized.") | |
| except Exception as e: | |
| print(f"Warning: Could not initialize Kokoro TTS pipeline: {e}. Will rely on gTTS.") | |
| pipeline = None # Set pipeline to None if initialization fails | |
| # Ensure ImageMagick binary is set (Adjust path as needed for your system) | |
| # This line requires imagemagick to be installed and the path correct. | |
| # If TextClip fails, check ImageMagick installation and policy.xml (handled by fix_imagemagick_policy). | |
| mpy_config.change_settings({"IMAGEMAGICK_BINARY": "/usr/bin/convert"}) | |
| # ---------------- Global Configuration (Some now used as defaults/initial values) ---------------- # | |
| # Replace with your actual keys | |
| PEXELS_API_KEY = os.environ.get('PEXELS_API_KEY', 'YOUR_PEXELS_API_KEY') # Use env var or placeholder | |
| OPENROUTER_API_KEY = os.environ.get('OPENROUTER_API_KEY', 'YOUR_OPENROUTER_API_KEY') # Use env var or placeholder | |
| OPENROUTER_MODEL = "mistralai/mistral-small-3.1-24b-instruct:free" # Or another preferred model | |
| OUTPUT_VIDEO_FILENAME = "final_video.mp4" | |
| USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" | |
| # Maximum number of script segments to display for editing | |
| MAX_SEGMENTS_FOR_EDITING = 15 | |
| # Global placeholder for the temporary folder, will be created per run | |
| TEMP_FOLDER = None | |
| # ---------------- Helper Functions ---------------- # | |
| # Most helper functions remain the same, but some might need minor adjustments | |
| # to use passed parameters instead of global state (e.g., resolution). | |
| # We'll update create_clip and add_background_music accordingly. | |
| def generate_script(user_input, api_key, model): | |
| """Generate documentary script with proper OpenRouter handling.""" | |
| if not api_key or api_key == 'YOUR_OPENROUTER_API_KEY': | |
| print("OpenRouter API key not set. Skipping script generation.") | |
| return "[Error] API key not configured." | |
| headers = { | |
| 'Authorization': f'Bearer {api_key}', | |
| 'HTTP-Referer': 'https://your-domain.com', # Replace with your app's domain | |
| 'X-Title': 'AI Documentary Maker' | |
| } | |
| # Updated prompt based on user's specific requirements for tone and style | |
| prompt = f"""Short Documentary Script Generator | |
| Instructions: | |
| If I say "use this," output the script exactly as given. | |
| If I only give topics, generate a script based on them. | |
| If I provide a full script, rewrite it without any changes. | |
| Make everything short, simple, humorous, funny, and act serious but be humorous. | |
| Don't say anything off topic. | |
| Always end with a funny call to subscribe based on the video topic. | |
| Use normal conversational text, avoid AI phrases, humanize the statements. | |
| Always focus on negative or cynical views, avoid positive statements. | |
| The full script should be for a single video about one topic. | |
| Formatting Rules: | |
| Title in Square Brackets: Each section starts with a one-word title inside [ ] (max two words if necessary). This title will be used as a search term for Pexels footage. | |
| Casual & Funny Narration: Each section has 5-15 words of narration. Keep it natural, funny, and unpredictable. | |
| No Special Formatting: No bold, italics, or special characters. | |
| Generalized Search Terms: If a term is too specific, make it more general for Pexels search. | |
| Scene-Specific Writing: Each section describes only what should be shown in the video. | |
| Output Only the Script, make it funny/humorous/hilarious, and add a funny subscribe statement. | |
| No extra text, just the script. | |
| Example Output: | |
| [Cats] | |
| They plot world domination while napping. | |
| [Dogs] | |
| Loyalty is just a bribe for snacks. | |
| [Humans] | |
| The only species that pays to live on a planet they destroy. | |
| [Future] | |
| It looks suspiciously like the present, but with more screens. | |
| [Warning] | |
| Subscribe or a cat will steal your bandwidth. | |
| Now here is the Topic/script: {user_input} | |
| """ | |
| data = { | |
| 'model': model, | |
| 'messages': [{'role': 'user', 'content': prompt}], | |
| 'temperature': 0.7, # Increased temperature slightly for more unpredictable humor | |
| 'max_tokens': 500 # Limit token response to keep scripts short | |
| } | |
| try: | |
| response = requests.post( | |
| 'https://openrouter.ai/api/v1/chat/completions', | |
| headers=headers, | |
| json=data, | |
| timeout=45 # Increased timeout | |
| ) | |
| response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) | |
| response_data = response.json() | |
| if 'choices' in response_data and len(response_data['choices']) > 0: | |
| script_text = response_data['choices'][0]['message']['content'] | |
| # Basic post-processing to remove potential markdown code blocks | |
| if script_text.startswith("```") and script_text.endswith("```"): | |
| script_text = script_text[script_text.find('\n')+1:script_text.rfind('\n')].strip() | |
| return script_text | |
| else: | |
| print("Unexpected response format:", response_data) | |
| return "[Error] Unexpected API response format." | |
| except requests.exceptions.RequestException as e: | |
| print(f"API Request failed: {str(e)}") | |
| return f"[Error] API request failed: {str(e)}" | |
| except Exception as e: | |
| print(f"An unexpected error occurred during script generation: {e}") | |
| return f"[Error] An unexpected error occurred: {str(e)}" | |
| def parse_script(script_text): | |
| """ | |
| Parse the generated script into a list of segment dictionaries. | |
| Each dictionary includes original prompt, narration text, estimated duration, and placeholder for uploaded media. | |
| Handles potential API errors returned as strings. | |
| """ | |
| if script_text.startswith("[Error]"): | |
| print(f"Skipping parse due to script generation error: {script_text}") | |
| return [] | |
| segments = [] | |
| current_title = None | |
| current_text = "" | |
| try: | |
| lines = script_text.strip().splitlines() | |
| if not lines: | |
| print("Script text is empty.") | |
| return [] | |
| for line in lines: | |
| line = line.strip() | |
| if line.startswith("[") and "]" in line: | |
| bracket_start = line.find("[") | |
| bracket_end = line.find("]", bracket_start) | |
| if bracket_start != -1 and bracket_end != -1: | |
| if current_title is not None and current_text.strip(): | |
| # Estimate duration based on word count (adjust factor as needed) | |
| duration = max(2.0, len(current_text.split()) * 0.4) # Minimum 2s, approx 0.4s per word | |
| segments.append({ | |
| "original_prompt": current_title.strip(), | |
| "text": current_text.strip(), | |
| "duration": duration, | |
| "uploaded_media": None # Placeholder for user uploaded file path | |
| }) | |
| current_title = line[bracket_start+1:bracket_end].strip() | |
| current_text = line[bracket_end+1:].strip() | |
| elif current_title: # Append text if no new title found but currently parsing | |
| current_text += line + " " | |
| elif current_title: # Append text to the current segment | |
| current_text += line + " " | |
| # Add the last segment | |
| if current_title is not None and current_text.strip(): | |
| duration = max(2.0, len(current_text.split()) * 0.4) | |
| segments.append({ | |
| "original_prompt": current_title.strip(), | |
| "text": current_text.strip(), | |
| "duration": duration, | |
| "uploaded_media": None | |
| }) | |
| # Limit segments to MAX_SEGMENTS_FOR_EDITING | |
| if len(segments) > MAX_SEGMENTS_FOR_EDITING: | |
| print(f"Warning: Script generated {len(segments)} segments, limiting to {MAX_SEGMENTS_FOR_EDITING} for editing.") | |
| segments = segments[:MAX_SEGMENTS_FOR_EDITING] | |
| print(f"Parsed {len(segments)} segments.") | |
| return segments | |
| except Exception as e: | |
| print(f"Error parsing script: {e}") | |
| return [] | |
| # Pexels and Google Image search and download functions remain unchanged | |
| def search_pexels_videos(query, pexels_api_key): | |
| """Search for a video on Pexels by query and return a random HD video.""" | |
| if not pexels_api_key or pexels_api_key == 'YOUR_PEXELS_API_KEY': | |
| print("Pexels API key not set. Skipping video search.") | |
| return None | |
| headers = {'Authorization': pexels_api_key} | |
| base_url = "https://api.pexels.com/videos/search" | |
| num_pages = 3 | |
| videos_per_page = 15 | |
| max_retries = 2 # Reduced retries for faster failure | |
| retry_delay = 1 | |
| search_query = query | |
| all_videos = [] | |
| for page in range(1, num_pages + 1): | |
| for attempt in range(max_retries): | |
| try: | |
| params = {"query": search_query, "per_page": videos_per_page, "page": page} | |
| response = requests.get(base_url, headers=headers, params=params, timeout=10) | |
| if response.status_code == 200: | |
| data = response.json() | |
| videos = data.get("videos", []) | |
| if not videos: break # No videos on this page | |
| for video in videos: | |
| video_files = video.get("video_files", []) | |
| for file in video_files: | |
| # Prioritize HD, fall back to SD if no HD found | |
| if file.get("quality") == "hd": | |
| all_videos.append(file.get("link")) | |
| break # Found HD, move to next video | |
| elif file.get("quality") == "sd": # Add SD as fallback | |
| all_videos.append(file.get("link")) # Don't break, keep looking for HD | |
| # After checking all files for a video, if HD was added, break inner loop | |
| # If not, continue to next attempt if needed, otherwise break attempt loop | |
| if any(link for link in all_videos if 'hd' in link.lower()): # Simple check if HD was added | |
| break # Found some HD videos, move to next page or finish | |
| elif response.status_code == 429: | |
| print(f"Pexels rate limit hit (attempt {attempt+1}/{max_retries}). Retrying in {retry_delay}s...") | |
| time.sleep(retry_delay) | |
| retry_delay *= 2 | |
| else: | |
| print(f"Pexels video search error {response.status_code}: {response.text}") | |
| break # Non-recoverable error or too many retries | |
| except requests.exceptions.RequestException as e: | |
| print(f"Pexels video request exception (attempt {attempt+1}/{max_retries}): {e}") | |
| if attempt < max_retries - 1: | |
| time.sleep(retry_delay) | |
| retry_delay *= 2 | |
| else: | |
| break # Too many retries | |
| if not videos and page > 1: break # If no videos found on subsequent pages, stop. | |
| if all_videos: | |
| # Try to pick an HD video if available, otherwise pick any | |
| hd_videos = [link for link in all_videos if 'hd' in link.lower()] | |
| if hd_videos: | |
| random_video = random.choice(hd_videos) | |
| print(f"Selected random HD video from {len(hd_videos)} options.") | |
| else: | |
| random_video = random.choice(all_videos) | |
| print(f"Selected random SD video from {len(all_videos)} options (no HD found).") | |
| return random_video | |
| else: | |
| print("No suitable videos found after searching all pages.") | |
| return None | |
| def search_pexels_images(query, pexels_api_key): | |
| """Search for an image on Pexels by query.""" | |
| if not pexels_api_key or pexels_api_key == 'YOUR_PEXELS_API_KEY': | |
| print("Pexels API key not set. Skipping image search.") | |
| return None | |
| headers = {'Authorization': pexels_api_key} | |
| url = "https://api.pexels.com/v1/search" | |
| params = {"query": query, "per_page": 15, "orientation": "landscape"} # Increased per_page | |
| max_retries = 2 | |
| retry_delay = 1 | |
| for attempt in range(max_retries): | |
| try: | |
| response = requests.get(url, headers=headers, params=params, timeout=10) | |
| if response.status_code == 200: | |
| data = response.json() | |
| photos = data.get("photos", []) | |
| if photos: | |
| # Choose from the top results | |
| photo = random.choice(photos[:min(10, len(photos))]) | |
| img_url = photo.get("src", {}).get("original") | |
| print(f"Found {len(photos)} images, selected one.") | |
| return img_url | |
| else: | |
| print(f"No images found for query: {query} on Pexels.") | |
| return None | |
| elif response.status_code == 429: | |
| print(f"Pexels rate limit hit (attempt {attempt+1}/{max_retries}). Retrying in {retry_delay}s...") | |
| time.sleep(retry_delay) | |
| retry_delay *= 2 | |
| else: | |
| print(f"Pexels image search error {response.status_code}: {response.text}") | |
| break # Non-recoverable error or too many retries | |
| except requests.exceptions.RequestException as e: | |
| print(f"Pexels image request exception (attempt {attempt+1}/{max_retries}): {e}") | |
| if attempt < max_retries - 1: | |
| time.sleep(retry_delay) | |
| retry_delay *= 2 | |
| else: | |
| break # Too many retries | |
| print(f"No Pexels images found for query: {query} after all attempts") | |
| return None | |
| def search_google_images(query): | |
| """Search for images on Google Images (fallback/news)""" | |
| try: | |
| # Using a simple text search method; dedicated Google Image Search APIs are better but may require setup. | |
| # This is prone to breaking if Google changes its HTML structure. | |
| search_url = f"https://www.google.com/search?q={quote(query)}&tbm=isch" | |
| headers = {"User-Agent": USER_AGENT} | |
| print(f"Searching Google Images for: {query}") | |
| response = requests.get(search_url, headers=headers, timeout=15) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.text, "html.parser") | |
| # Find img tags, look for src attributes | |
| # This is a very fragile parsing method, might need adjustment | |
| img_tags = soup.find_all("img") | |
| image_urls = [] | |
| # Look for src attributes that start with http and aren't data URIs or specific gstatic patterns | |
| # This is a heuristic and might grab incorrect URLs | |
| for img in img_tags: | |
| src = img.get("src", "") | |
| if src.startswith("http") and "encrypted" not in src and "base64" not in src: # Basic filtering | |
| image_urls.append(src) | |
| elif img.get("data-src", "").startswith("http"): # Some sites use data-src | |
| image_urls.append(img.get("data-src", "")) | |
| # Filter out potential tiny icons or invalid URLs | |
| valid_image_urls = [url for url in image_urls if url and "gstatic" not in url and url.split('.')[-1].lower() in ['jpg', 'jpeg', 'png', 'gif', 'bmp']] | |
| if valid_image_urls: | |
| print(f"Found {len(valid_image_urls)} potential Google Images, picking one.") | |
| return random.choice(valid_image_urls[:min(10, len(valid_image_urls))]) | |
| else: | |
| print(f"No valid Google Images found for query: {query}") | |
| return None | |
| except Exception as e: | |
| print(f"Error in Google Images search: {e}") | |
| return None | |
| def download_image(image_url, filename): | |
| """Download an image from a URL to a local file with enhanced error handling.""" | |
| if not image_url: | |
| print("No image URL provided for download.") | |
| return None | |
| try: | |
| headers = {"User-Agent": USER_AGENT} | |
| print(f"Attempting to download image from: {image_url}") | |
| response = requests.get(image_url, headers=headers, stream=True, timeout=20) # Increased timeout | |
| response.raise_for_status() | |
| # Check content type before saving | |
| content_type = response.headers.get('Content-Type', '') | |
| if not content_type.startswith('image/'): | |
| print(f"URL did not return an image Content-Type ({content_type}). Skipping download.") | |
| return None | |
| # Ensure the directory exists | |
| os.makedirs(os.path.dirname(filename), exist_ok=True) | |
| with open(filename, 'wb') as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| print(f"Potential image downloaded to: {filename}") | |
| # Validate and process the image | |
| try: | |
| img = Image.open(filename) | |
| img.verify() # Verify it's an image file | |
| img = Image.open(filename) # Re-open after verify | |
| if img.mode != 'RGB': | |
| img = img.convert('RGB') | |
| img.save(filename) | |
| print(f"Image validated and converted to RGB: {filename}") | |
| return filename | |
| except Exception as e_validate: | |
| print(f"Downloaded file is not a valid image or processing failed: {e_validate}") | |
| if os.path.exists(filename): | |
| os.remove(filename) # Clean up invalid file | |
| return None | |
| except requests.exceptions.RequestException as e_download: | |
| print(f"Image download error for {image_url}: {e_download}") | |
| if os.path.exists(filename): | |
| os.remove(filename) # Clean up partially downloaded file | |
| return None | |
| except Exception as e_general: | |
| print(f"General error during image download/processing: {e_general}") | |
| if os.path.exists(filename): | |
| os.remove(filename) # Clean up if needed | |
| return None | |
| def download_video(video_url, filename): | |
| """Download a video from a URL to a local file.""" | |
| if not video_url: | |
| print("No video URL provided for download.") | |
| return None | |
| try: | |
| headers = {"User-Agent": USER_AGENT} # Some sites block direct downloads | |
| print(f"Attempting to download video from: {video_url}") | |
| response = requests.get(video_url, stream=True, timeout=45) # Increased timeout for videos | |
| response.raise_for_status() | |
| # Check content type | |
| content_type = response.headers.get('Content-Type', '') | |
| if not content_type.startswith('video/'): | |
| print(f"URL did not return a video Content-Type ({content_type}). Skipping download.") | |
| return None | |
| os.makedirs(os.path.dirname(filename), exist_ok=True) | |
| with open(filename, 'wb') as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| print(f"Video downloaded successfully to: {filename}") | |
| # Basic check if the file seems valid (not just 0 bytes) | |
| if os.path.exists(filename) and os.path.getsize(filename) > 1024: # Check for > 1KB | |
| return filename | |
| else: | |
| print(f"Downloaded video file {filename} is too small or empty.") | |
| if os.path.exists(filename): | |
| os.remove(filename) | |
| return None | |
| except requests.exceptions.RequestException as e: | |
| print(f"Video download error for {video_url}: {e}") | |
| if os.path.exists(filename): | |
| os.remove(filename) | |
| return None | |
| except Exception as e_general: | |
| print(f"General error during video download: {e_general}") | |
| if os.path.exists(filename): | |
| os.remove(filename) | |
| return None | |
| def generate_media_asset(prompt, uploaded_media_path): | |
| """ | |
| Generate a visual asset (video or image). Prioritizes user upload, | |
| then searches Pexels video, then Pexels image, then Google Image. | |
| Returns a dict: {'path': <file_path>, 'asset_type': 'video' or 'image'}. | |
| """ | |
| safe_prompt = re.sub(r'[^\w\s-]', '', prompt).strip().replace(' ', '_') | |
| os.makedirs(TEMP_FOLDER, exist_ok=True) # Ensure temp folder exists | |
| # 1. Use user uploaded media if provided | |
| if uploaded_media_path and os.path.exists(uploaded_media_path): | |
| print(f"Using user uploaded media: {uploaded_media_path}") | |
| file_ext = os.path.splitext(uploaded_media_path)[1].lower() | |
| asset_type = 'video' if file_ext in ['.mp4', '.mov', '.avi', '.webm'] else 'image' | |
| # Copy the user file to temp folder to manage cleanup | |
| temp_user_path = os.path.join(TEMP_FOLDER, f"user_upload_{os.path.basename(uploaded_media_path)}") | |
| try: | |
| shutil.copy2(uploaded_media_path, temp_user_path) | |
| print(f"Copied user upload to temp: {temp_user_path}") | |
| return {"path": temp_user_path, "asset_type": asset_type} | |
| except Exception as e: | |
| print(f"Error copying user file {uploaded_media_path}: {e}. Falling back to search.") | |
| # 2. Search Pexels Videos (25% chance if no user upload) | |
| # Let's slightly increase video search preference when available | |
| if random.random() < 0.4: # Increase video search chance | |
| video_file = os.path.join(TEMP_FOLDER, f"{safe_prompt}_video.mp4") | |
| print(f"Attempting Pexels video search for: {prompt}") | |
| video_url = search_pexels_videos(prompt, PEXELS_API_KEY) | |
| if video_url: | |
| downloaded_video = download_video(video_url, video_file) | |
| if downloaded_video: | |
| print(f"Pexels video asset saved to {downloaded_video}") | |
| return {"path": downloaded_video, "asset_type": "video"} | |
| else: | |
| print(f"Pexels video search failed or found no video for: {prompt}") | |
| # 3. Search Pexels Images | |
| image_file = os.path.join(TEMP_FOLDER, f"{safe_prompt}.jpg") | |
| print(f"Attempting Pexels image search for: {prompt}") | |
| image_url = search_pexels_images(prompt, PEXELS_API_KEY) | |
| if image_url: | |
| downloaded_image = download_image(image_url, image_file) | |
| if downloaded_image: | |
| print(f"Pexels image asset saved to {downloaded_image}") | |
| return {"path": downloaded_image, "asset_type": "image"} | |
| else: | |
| print(f"Pexels image search failed or found no image for: {prompt}") | |
| # 4. Fallback: Search Google Images (especially useful for news/specific things Pexels might not have) | |
| print(f"Attempting Google Images fallback for: {prompt}") | |
| google_image_file = os.path.join(TEMP_FOLDER, f"{safe_prompt}_google.jpg") | |
| google_image_url = search_google_images(prompt) | |
| if google_image_url: | |
| downloaded_google_image = download_image(google_image_url, google_image_file) | |
| if downloaded_google_image: | |
| print(f"Google Image asset saved to {downloaded_google_image}") | |
| return {"path": downloaded_google_image, "asset_type": "image"} | |
| else: | |
| print(f"Google Images fallback failed for: {prompt}") | |
| # 5. Final Fallback: Generic Images if specific search failed | |
| fallback_terms = ["nature", "city", "abstract", "background"] # More generic fallbacks | |
| for term in fallback_terms: | |
| print(f"Trying generic fallback image search with term: {term}") | |
| fallback_file = os.path.join(TEMP_FOLDER, f"fallback_{term}.jpg") | |
| fallback_url = search_pexels_images(term, PEXELS_API_KEY) # Use Pexels for fallbacks | |
| if fallback_url: | |
| downloaded_fallback = download_image(fallback_url, fallback_file) | |
| if downloaded_fallback: | |
| print(f"Generic fallback image saved to {downloaded_fallback}") | |
| return {"path": downloaded_fallback, "asset_type": "image"} | |
| else: | |
| print(f"Generic fallback image download failed for term: {term}") | |
| else: | |
| print(f"Generic fallback image search failed for term: {term}") | |
| print(f"Failed to generate any visual asset for prompt: {prompt} after all attempts.") | |
| return None | |
| def generate_silent_audio(duration, sample_rate=24000): | |
| """Generate a silent WAV audio file lasting 'duration' seconds.""" | |
| print(f"Generating {duration:.2f}s of silent audio.") | |
| num_samples = int(duration * sample_rate) | |
| silence = np.zeros(num_samples, dtype=np.float32) | |
| # Use unique filename to avoid conflicts | |
| silent_path = os.path.join(TEMP_FOLDER, f"silent_{abs(hash(duration)) % (10**8)}_{int(time.time())}.wav") | |
| try: | |
| sf.write(silent_path, silence, sample_rate) | |
| print(f"Silent audio generated: {silent_path}") | |
| return silent_path | |
| except Exception as e: | |
| print(f"Error generating silent audio: {e}") | |
| return None | |
| def generate_tts(text, voice='en'): | |
| """ | |
| Generate TTS audio using Kokoro, falling back to gTTS or silent audio if needed. | |
| Ensures temp folder exists. | |
| """ | |
| os.makedirs(TEMP_FOLDER, exist_ok=True) # Ensure temp folder exists | |
| safe_text_hash = str(abs(hash(text)) % (10**10)) # Use a hash for potentially long text | |
| file_path = os.path.join(TEMP_FOLDER, f"tts_{safe_text_hash}.wav") | |
| if os.path.exists(file_path): | |
| print(f"Using cached TTS for text hash '{safe_text_hash}'") | |
| return file_path | |
| target_duration = max(2.0, len(text.split()) * 0.4) # Estimate duration if TTS fails | |
| if pipeline: | |
| try: | |
| print(f"Attempting Kokoro TTS for text: '{text[:50]}...'") | |
| kokoro_voice = 'af_heart' if voice == 'en' else voice # Kokoro default American English voice | |
| # Kokoro pipeline might return multiple segments for long text | |
| generator = pipeline(text, voice=kokoro_voice, speed=1.0, split_pattern=r'\n+') # Use speed 1.0 | |
| audio_segments = [] | |
| total_duration = 0 | |
| for i, (gs, ps, audio) in enumerate(generator): | |
| audio_segments.append(audio) | |
| total_duration += len(audio) / 24000.0 # Assuming 24000 Hz sample rate | |
| if audio_segments: | |
| full_audio = np.concatenate(audio_segments) if len(audio_segments) > 1 else audio_segments[0] | |
| sf.write(file_path, full_audio, 24000) # Use 24000Hz standard | |
| print(f"TTS audio saved to {file_path} (Kokoro, {total_duration:.2f}s)") | |
| return file_path | |
| else: | |
| print("Kokoro pipeline returned no audio segments.") | |
| except Exception as e: | |
| print(f"Error with Kokoro TTS: {e}") | |
| # Continue to gTTS fallback | |
| try: | |
| print(f"Falling back to gTTS for text: '{text[:50]}...'") | |
| tts = gTTS(text=text, lang='en', slow=False) # Use standard speed | |
| mp3_path = os.path.join(TEMP_FOLDER, f"tts_{safe_text_hash}.mp3") | |
| tts.save(mp3_path) | |
| audio = AudioSegment.from_mp3(mp3_path) | |
| audio.export(file_path, format="wav") | |
| os.remove(mp3_path) | |
| print(f"Fallback TTS saved to {file_path} (gTTS, {audio.duration_seconds:.2f}s)") | |
| return file_path | |
| except Exception as fallback_error: | |
| print(f"Both TTS methods failed for text: '{text[:50]}...'. Error: {fallback_error}") | |
| # Use the estimated duration for silent audio | |
| return generate_silent_audio(duration=target_duration) | |
| def apply_kenburns_effect(clip, target_resolution, effect_type=None): | |
| """Apply a smooth Ken Burns effect with a single movement pattern.""" | |
| target_w, target_h = target_resolution | |
| clip_aspect = clip.w / clip.h | |
| target_aspect = target_w / target_h | |
| # Resize clip to fill target resolution while maintaining aspect ratio, then scale up | |
| if clip_aspect > target_aspect: | |
| # Wider than target: match height, scale width | |
| clip = clip.resize(height=target_h) | |
| initial_w, initial_h = clip.size | |
| scale_factor = 1.15 | |
| new_width = int(initial_w * scale_factor) | |
| new_height = int(initial_h * scale_factor) | |
| clip = clip.resize(newsize=(new_width, new_height)) | |
| else: | |
| # Taller than target: match width, scale height | |
| clip = clip.resize(width=target_w) | |
| initial_w, initial_h = clip.size | |
| scale_factor = 1.15 | |
| new_width = int(initial_w * scale_factor) | |
| new_height = int(initial_h * scale_factor) | |
| clip = clip.resize(newsize=(new_width, new_height)) | |
| max_offset_x = new_width - target_w | |
| max_offset_y = new_height - target_h | |
| available_effects = ["zoom-in", "zoom-out", "pan-left", "pan-right", "pan-up", "pan-down", "up-left", "down-right"] | |
| if effect_type is None or effect_type == "random": | |
| effect_type = random.choice(available_effects) | |
| # Define start and end scale factors and positions relative to the scaled image size | |
| # Position is the top-left corner of the target resolution frame within the scaled image | |
| start_scale = 1.0 / (1.15 * 1.0) # Scale is relative to the final cropped size. Let's use position instead. | |
| end_scale = 1.0 / (1.15 * 1.0) | |
| # Start and end positions of the top-left corner of the target_resolution window | |
| start_x, start_y = 0, 0 | |
| end_x, end_y = 0, 0 | |
| start_zoom_factor = 1.0 | |
| end_zoom_factor = 1.0 | |
| if effect_type == "zoom-in": | |
| start_zoom_factor = 1.0 | |
| end_zoom_factor = 1.15 | |
| # Stay centered | |
| start_x = max_offset_x / 2 | |
| start_y = max_offset_y / 2 | |
| end_x = max_offset_x / 2 | |
| end_y = max_offset_y / 2 | |
| elif effect_type == "zoom-out": | |
| start_zoom_factor = 1.15 | |
| end_zoom_factor = 1.0 | |
| # Stay centered | |
| start_x = max_offset_x / 2 | |
| start_y = max_offset_y / 2 | |
| end_x = max_offset_x / 2 | |
| end_y = max_offset_y / 2 | |
| elif effect_type == "pan-left": | |
| start_x = max_offset_x | |
| start_y = max_offset_y / 2 | |
| end_x = 0 | |
| end_y = max_offset_y / 2 | |
| elif effect_type == "pan-right": | |
| start_x = 0 | |
| start_y = max_offset_y / 2 | |
| end_x = max_offset_x | |
| end_y = max_offset_y / 2 | |
| elif effect_type == "pan-up": | |
| start_x = max_offset_x / 2 | |
| start_y = max_offset_y | |
| end_x = max_offset_x / 2 | |
| end_y = 0 | |
| elif effect_type == "pan-down": | |
| start_x = max_offset_x / 2 | |
| start_y = 0 | |
| end_x = max_offset_x / 2 | |
| end_y = max_offset_y | |
| elif effect_type == "up-left": | |
| start_x = max_offset_x | |
| start_y = max_offset_y | |
| end_x = 0 | |
| end_y = 0 | |
| elif effect_type == "down-right": | |
| start_x = 0 | |
| start_y = 0 | |
| end_x = max_offset_x | |
| end_y = max_offset_y | |
| else: | |
| # Default to pan-right if type is random but somehow invalid | |
| effect_type = 'pan-right' | |
| start_x = 0 | |
| start_y = max_offset_y / 2 | |
| end_x = max_offset_x | |
| end_y = max_offset_y / 2 | |
| def transform_frame(get_frame, t): | |
| frame = get_frame(t) | |
| # Use a smooth ease-in/ease-out function | |
| progress = t / clip.duration if clip.duration > 0 else 0 | |
| eased_progress = 0.5 - 0.5 * math.cos(math.pi * progress) # Cosine easing | |
| # Interpolate position | |
| current_x = start_x + (end_x - start_x) * eased_progress | |
| current_y = start_y + (end_y - start_y) * eased_progress | |
| # Interpolate zoom (relative to the scaled-up size) | |
| current_zoom_factor = start_zoom_factor + (end_zoom_factor - start_zoom_factor) * eased_progress | |
| # Calculate crop size based on current zoom | |
| crop_w = int(target_w / current_zoom_factor) | |
| crop_h = int(target_h / current_zoom_factor) | |
| # Calculate the center point of the crop window | |
| center_x = current_x + crop_w / 2 | |
| center_y = current_y + crop_h / 2 | |
| # Ensure center stays within the bounds of the scaled image | |
| center_x = max(crop_w / 2, min(center_x, new_width - crop_w / 2)) | |
| center_y = max(crop_h / 2, min(center_y, new_height - crop_h / 2)) | |
| # Perform the crop using cv2.getRectSubPix (expects floating point center) | |
| # Ensure frame is a numpy array (moviepy returns numpy arrays) | |
| try: | |
| cropped_frame = cv2.getRectSubPix(frame, (crop_w, crop_h), (center_x, center_y)) | |
| # Resize the cropped frame back to the target resolution | |
| resized_frame = cv2.resize(cropped_frame, (target_w, target_h), interpolation=cv2.INTER_LANCZOS4) | |
| return resized_frame | |
| except Exception as e: | |
| print(f"Error applying Ken Burns transform at t={t:.2f}s: {e}") | |
| # Return a black frame or placeholder in case of error | |
| return np.zeros((target_h, target_w, 3), dtype=np.uint8) | |
| # Need to return a new clip instance with the effect applied | |
| return clip.fl(transform_frame) | |
| def resize_to_fill(clip, target_resolution): | |
| """Resize and crop a clip to fill the target resolution while maintaining aspect ratio.""" | |
| target_w, target_h = target_resolution | |
| clip_aspect = clip.w / clip.h | |
| target_aspect = target_w / target_h | |
| if clip_aspect > target_aspect: # Clip is wider than target | |
| clip = clip.resize(height=target_h) | |
| # Calculate crop amount to make width match target_w | |
| crop_amount_x = (clip.w - target_w) / 2 | |
| clip = clip.crop(x1=crop_amount_x, x2=clip.w - crop_amount_x, y1=0, y2=clip.h) | |
| else: # Clip is taller than target or same aspect | |
| clip = clip.resize(width=target_w) | |
| # Calculate crop amount to make height match target_h | |
| crop_amount_y = (clip.h - target_h) / 2 | |
| clip = clip.crop(x1=0, x2=clip.w, y1=crop_amount_y, y2=clip.h - crop_amount_y) | |
| # Ensure dimensions are exactly target_resolution after crop | |
| if clip.size != target_resolution: | |
| print(f"Warning: Clip size {clip.size} after resize_to_fill does not match target {target_resolution}. Resizing again.") | |
| clip = clip.resize(newsize=target_resolution) | |
| return clip | |
| def find_mp3_files(): | |
| """Search for any MP3 files in the current directory and subdirectories.""" | |
| mp3_files = [] | |
| # Check relative paths first | |
| for root, dirs, files in os.walk('.'): | |
| for file in files: | |
| if file.lower().endswith('.mp3'): | |
| mp3_path = os.path.join(root, file) | |
| mp3_files.append(mp3_path) | |
| print(f"Found MP3 file: {mp3_path}") | |
| if mp3_files: | |
| return mp3_files[0] # Return the first one found | |
| else: | |
| print("No MP3 files found in the current directory or subdirectories.") | |
| return None | |
| def add_background_music(final_video, bg_music_path, bg_music_volume=0.08): | |
| """Add background music to the final video.""" | |
| if not bg_music_path or not os.path.exists(bg_music_path): | |
| print("No valid background music path provided or file not found. Skipping background music.") | |
| return final_video | |
| try: | |
| print(f"Adding background music from: {bg_music_path}") | |
| bg_music = AudioFileClip(bg_music_path) | |
| # Loop background music if shorter than video | |
| if bg_music.duration < final_video.duration: | |
| loops_needed = math.ceil(final_video.duration / bg_music.duration) | |
| bg_segments = [bg_music] * loops_needed | |
| bg_music = concatenate_audioclips(bg_segments) | |
| # Subclip background music to match video duration | |
| bg_music = bg_music.subclip(0, final_video.duration) | |
| # Adjust volume | |
| bg_music = bg_music.volumex(bg_music_volume) | |
| # Composite audio | |
| video_audio = final_video.audio | |
| if video_audio: | |
| mixed_audio = CompositeAudioClip([video_audio, bg_music]) | |
| else: | |
| # Handle case where video might not have audio track initially | |
| mixed_audio = bg_music | |
| print("Warning: Video had no audio track, only adding background music.") | |
| final_video = final_video.set_audio(mixed_audio) | |
| print("Background music added successfully.") | |
| return final_video | |
| except Exception as e: | |
| print(f"Error adding background music: {e}") | |
| print("Continuing without background music.") | |
| return final_video | |
| def create_clip(media_asset, tts_path, duration, target_resolution, | |
| caption_enabled, caption_color, caption_size, caption_position, | |
| caption_bg_color, caption_stroke_color, caption_stroke_width, | |
| narration_text, segment_index): | |
| """Create a video clip with synchronized subtitles and narration.""" | |
| try: | |
| print(f"Creating clip #{segment_index} from asset: {media_asset.get('path')}, type: {media_asset.get('asset_type')}") | |
| media_path = media_asset.get('path') | |
| asset_type = media_asset.get('asset_type') | |
| if not media_path or not os.path.exists(media_path): | |
| print(f"Skipping clip {segment_index}: Missing media file {media_path}") | |
| # Create a black clip with silent audio for this segment duration | |
| black_clip = ColorClip(size=target_resolution, color=(0,0,0), duration=duration) | |
| silent_audio_path = generate_silent_audio(duration) | |
| if silent_audio_path and os.path.exists(silent_audio_path): | |
| silent_audio_clip = AudioFileClip(silent_audio_path) | |
| if silent_audio_clip.duration < duration: # Should not happen if silent_audio is correct | |
| silent_audio_clip = silent_audio_clip.loop(duration=duration) | |
| black_clip = black_clip.set_audio(silent_audio_clip.subclip(0, duration)) | |
| print(f"Created placeholder black clip for segment {segment_index}") | |
| # Add placeholder text if captions are enabled | |
| if caption_enabled and narration_text and caption_color != "transparent": | |
| txt_clip = TextClip( | |
| "[Missing Media]\n" + narration_text, # Indicate missing media | |
| fontsize=caption_size, | |
| font='Arial-Bold', | |
| color=caption_color, | |
| bg_color=caption_bg_color, | |
| method='caption', | |
| align='center', | |
| stroke_width=caption_stroke_width, | |
| stroke_color=caption_stroke_color, | |
| size=(target_resolution[0] * 0.9, None) | |
| ).set_position('center').set_duration(duration) # Duration matches black clip | |
| black_clip = CompositeVideoClip([black_clip, txt_clip]) | |
| return black_clip | |
| # Determine actual audio duration | |
| audio_clip = None | |
| audio_duration = duration # Default to estimated duration | |
| if tts_path and os.path.exists(tts_path): | |
| try: | |
| audio_clip = AudioFileClip(tts_path).audio_fadeout(0.2) | |
| audio_duration = audio_clip.duration | |
| # Ensure clip duration is slightly longer than audio for transitions/padding | |
| target_clip_duration = audio_duration + 0.3 # Add a small buffer | |
| print(f"TTS audio duration: {audio_duration:.2f}s. Target clip duration: {target_clip_duration:.2f}s") | |
| except Exception as e: | |
| print(f"Error loading TTS audio clip {tts_path}: {e}. Using estimated duration {duration:.2f}s.") | |
| audio_clip = None # Ensure audio_clip is None if loading fails | |
| target_clip_duration = duration # Fallback to estimated duration | |
| if asset_type == "video": | |
| try: | |
| clip = VideoFileClip(media_path) | |
| print(f"Loaded video clip with duration {clip.duration:.2f}s") | |
| clip = resize_to_fill(clip, target_resolution) | |
| if clip.duration < target_clip_duration: | |
| print("Looping video clip") | |
| clip = clip.loop(duration=target_clip_duration) | |
| else: | |
| clip = clip.subclip(0, target_clip_duration) | |
| clip = clip.fadein(0.2).fadeout(0.2) # Add simple transitions | |
| except Exception as e: | |
| print(f"Error processing video clip {media_path}: {e}") | |
| # Fallback to a black clip if video processing fails | |
| print(f"Creating placeholder black clip instead for segment {segment_index}") | |
| clip = ColorClip(size=target_resolution, color=(0,0,0), duration=target_clip_duration) | |
| elif asset_type == "image": | |
| try: | |
| img = Image.open(media_path) | |
| # Ensure image is in RGB format before passing to ImageClip | |
| if img.mode != 'RGB': | |
| print("Converting image to RGB") | |
| img = img.convert('RGB') | |
| # Save back to a temp file or pass numpy array directly if ImageClip supports it | |
| # ImageClip accepts numpy arrays, let's convert | |
| img_array = np.array(img) | |
| img.close() # Close the PIL image | |
| clip = ImageClip(img_array).set_duration(target_clip_duration) | |
| else: | |
| img.close() # Close the PIL image | |
| clip = ImageClip(media_path).set_duration(target_clip_duration) | |
| print(f"Loaded image clip with duration {clip.duration:.2f}s") | |
| clip = apply_kenburns_effect(clip, target_resolution, effect_type="random") # Random Ken Burns | |
| clip = clip.fadein(0.3).fadeout(0.3) # Add simple transitions | |
| except Exception as e: | |
| print(f"Error processing image clip {media_path}: {e}") | |
| # Fallback to a black clip if image processing fails | |
| print(f"Creating placeholder black clip instead for segment {segment_index}") | |
| clip = ColorClip(size=target_resolution, color=(0,0,0), duration=target_clip_duration) | |
| else: | |
| print(f"Unknown asset type {asset_type} for segment {segment_index}. Skipping.") | |
| return None | |
| # Set the audio for the clip if audio_clip was loaded successfully | |
| if audio_clip: | |
| # Ensure audio clip duration matches video clip duration after processing | |
| if abs(audio_clip.duration - clip.duration) > 0.1: # Allow slight difference | |
| print(f"Adjusting audio duration ({audio_clip.duration:.2f}s) to match video duration ({clip.duration:.2f}s)") | |
| audio_clip = audio_clip.fx(vfx.speedx, factor=audio_clip.duration / clip.duration) | |
| clip = clip.set_audio(audio_clip) | |
| else: | |
| # If TTS failed or audio loading failed, ensure video clip has no audio or silent audio | |
| print(f"No valid audio for clip {segment_index}. Setting silent audio.") | |
| silent_audio_path = generate_silent_audio(clip.duration) | |
| if silent_audio_path and os.path.exists(silent_audio_path): | |
| silent_audio_clip = AudioFileClip(silent_audio_path) | |
| if abs(silent_audio_clip.duration - clip.duration) > 0.1: | |
| silent_audio_clip = silent_audio_clip.fx(vfx.speedx, factor=silent_audio_clip.duration / clip.duration) | |
| clip = clip.set_audio(silent_audio_clip) | |
| else: | |
| clip = clip.set_audio(None) # Set audio to None if silent audio fails | |
| # Add subtitles if enabled | |
| if caption_enabled and narration_text and caption_color != "transparent": | |
| try: | |
| # Simple word-based chunking for subtitles | |
| words = narration_text.split() | |
| # Calculate word timings based on total audio duration and word count | |
| # This is a simple approach; for better sync, use a forced aligner or whisper | |
| words_per_second = len(words) / audio_duration if audio_duration > 0 else len(words) | |
| word_duration = 1.0 / words_per_second if words_per_second > 0 else 0.5 # Default if 0 | |
| subtitle_clips = [] | |
| current_time = 0 | |
| chunk_size = 6 # Words per caption chunk (adjust as needed) | |
| for i in range(0, len(words), chunk_size): | |
| chunk_words = words[i:i+chunk_size] | |
| chunk_text = ' '.join(chunk_words) | |
| # Estimate chunk duration based on word count * average word duration | |
| estimated_chunk_duration = len(chunk_words) * word_duration | |
| start_time = current_time | |
| end_time = min(current_time + estimated_chunk_duration, clip.duration) # Ensure end time doesn't exceed clip duration | |
| if start_time >= end_time: break # Avoid 0 or negative duration clips | |
| # Determine vertical position | |
| if caption_position == "Top": | |
| subtitle_y_position = int(target_resolution[1] * 0.1) | |
| elif caption_position == "Middle": | |
| subtitle_y_position = int(target_resolution[1] * 0.5) | |
| else: # Default to Bottom | |
| subtitle_y_position = int(target_resolution[1] * 0.85) # Closer to bottom | |
| txt_clip = TextClip( | |
| chunk_text, | |
| fontsize=caption_size, | |
| font='Arial-Bold', # Ensure this font is available or use a common system font | |
| color=caption_color, | |
| bg_color=caption_bg_color, # Use background color | |
| method='caption', # Enables text wrapping | |
| align='center', | |
| stroke_width=caption_stroke_width, # Use stroke | |
| stroke_color=caption_stroke_color, # Use stroke color | |
| size=(target_resolution[0] * 0.9, None) # Caption width max 90% of video width | |
| ).set_start(start_time).set_end(end_time) | |
| txt_clip = txt_clip.set_position(('center', subtitle_y_position)) | |
| subtitle_clips.append(txt_clip) | |
| current_time = end_time # Move to the end of the current chunk | |
| if subtitle_clips: | |
| clip = CompositeVideoClip([clip] + subtitle_clips) | |
| print(f"Added {len(subtitle_clips)} subtitle chunks to clip {segment_index}.") | |
| else: | |
| print(f"No subtitle clips generated for segment {segment_index}.") | |
| except Exception as sub_error: | |
| print(f"Error adding subtitles for segment {segment_index}: {sub_error}") | |
| # Fallback to a single centered text overlay if detailed subtitling fails | |
| try: | |
| txt_clip = TextClip( | |
| narration_text, | |
| fontsize=caption_size, | |
| font='Arial-Bold', | |
| color=caption_color, | |
| bg_color=caption_bg_color, | |
| method='caption', | |
| align='center', | |
| stroke_width=caption_stroke_width, | |
| stroke_color=caption_stroke_color, | |
| size=(target_resolution[0] * 0.8, None) | |
| ).set_position(('center', int(target_resolution[1] * 0.75))).set_duration(clip.duration) | |
| clip = CompositeVideoClip([clip, txt_clip]) | |
| print(f"Added simple fallback subtitle for segment {segment_index}.") | |
| except Exception as fallback_sub_error: | |
| print(f"Simple fallback subtitle failed for segment {segment_index}: {fallback_sub_error}") | |
| # Ensure final clip duration is set | |
| clip = clip.set_duration(clip.duration) # This might seem redundant but can help fix issues | |
| print(f"Clip {segment_index} created: {clip.duration:.2f}s") | |
| return clip | |
| except Exception as e: | |
| print(f"Critical error in create_clip for segment {segment_index}: {str(e)}") | |
| # Create a black clip with error message if anything goes wrong | |
| error_duration = duration if duration else 3 # Use estimated duration or default | |
| black_clip = ColorClip(size=target_resolution, color=(0,0,0), duration=error_duration) | |
| error_text = f"Error in segment {segment_index}" | |
| if narration_text: error_text += f":\n{narration_text[:50]}..." | |
| error_txt_clip = TextClip( | |
| error_text, | |
| fontsize=30, | |
| color="red", | |
| align='center', | |
| size=(target_resolution[0] * 0.9, None) | |
| ).set_position('center').set_duration(error_duration) | |
| clip = CompositeVideoClip([black_clip, error_txt_clip]) | |
| silent_audio_path = generate_silent_audio(error_duration) | |
| if silent_audio_path and os.path.exists(silent_audio_path): | |
| clip = clip.set_audio(AudioFileClip(silent_audio_path)) | |
| print(f"Created error placeholder clip for segment {segment_index}.") | |
| return clip | |
| def fix_imagemagick_policy(): | |
| """Attempt to fix ImageMagick security policies required by TextClip.""" | |
| print("Attempting to fix ImageMagick security policies...") | |
| policy_paths = [ | |
| "/etc/ImageMagick-6/policy.xml", | |
| "/etc/ImageMagick-7/policy.xml", | |
| "/etc/ImageMagick/policy.xml", # Common symlink path | |
| "/usr/local/etc/ImageMagick-7/policy.xml", # macports/homebrew path | |
| "/usr/share/ImageMagick/policy.xml", # Another common path | |
| "/usr/share/ImageMagick-6/policy.xml", | |
| "/usr/share/ImageMagick-7/policy.xml", | |
| # Add more paths if needed based on typical installations | |
| ] | |
| found_policy = None | |
| for path in policy_paths: | |
| if os.path.exists(path): | |
| found_policy = path | |
| break | |
| if not found_policy: | |
| print("No policy.xml found in common locations. TextClip may fail.") | |
| print("Consider installing ImageMagick and checking its installation path.") | |
| return False | |
| print(f"Attempting to modify policy file at {found_policy}") | |
| try: | |
| # Create a backup | |
| backup_path = f"{found_policy}.bak_aivgen_{int(time.time())}" | |
| shutil.copy2(found_policy, backup_path) | |
| print(f"Created backup at {backup_path}") | |
| # Read the original policy file | |
| with open(found_policy, 'r') as f: | |
| policy_content = f.read() | |
| # Use regex to find and replace the specific policy lines | |
| # Allow read and write rights for PDF, EPS, PS, etc. potentially restricted formats | |
| # Also ensure path policies allow reading/writing files | |
| modified_content = re.sub( | |
| r'<policy domain="coder" rights="none" pattern="(PDF|EPS|PS|XPS)"', | |
| r'<policy domain="coder" rights="read|write" pattern="\1"', # Changed rights to read|write | |
| policy_content | |
| ) | |
| modified_content = re.sub( | |
| r'<policy domain="path" pattern="@\*"[^>]*>', | |
| r'<policy domain="path" pattern="@*" rights="read|write"/>', # Ensure path rights are read|write | |
| modified_content | |
| ) | |
| # More general rights none replacement, be careful with this one | |
| modified_content = re.sub( | |
| r'<policy domain="[^"]+" rights="none"[^>]*>', | |
| lambda m: m.group(0).replace('rights="none"', 'rights="read|write"'), | |
| modified_content | |
| ) | |
| # Write the modified content back | |
| # Use sudo if running as a non-root user in a typical Linux install | |
| try: | |
| with open(found_policy, 'w') as f: | |
| f.write(modified_content) | |
| print("ImageMagick policies updated successfully (direct write).") | |
| return True | |
| except IOError as e: | |
| print(f"Direct write failed: {e}. Attempting with sudo...") | |
| # Fallback to using os.system with sudo if direct write fails | |
| # This requires the user to be able to run sudo commands without a password prompt for the script's execution | |
| temp_policy_file = os.path.join(TEMP_FOLDER, "temp_policy.xml") | |
| with open(temp_policy_file, 'w') as f: | |
| f.write(modified_content) | |
| cmd = f"sudo cp {temp_policy_file} {found_policy}" | |
| print(f"Executing: {cmd}") | |
| result = os.system(cmd) # Returns 0 on success | |
| if result == 0: | |
| print("ImageMagick policies updated successfully using sudo.") | |
| return True | |
| else: | |
| print(f"Failed to update ImageMagick policies using sudo. Result code: {result}.") | |
| print("Please manually edit your policy.xml to grant read/write rights for coder and path domains.") | |
| print("Example: Change <policy domain='coder' rights='none' pattern='PDF'> to <policy domain='coder' rights='read|write' pattern='PDF'>") | |
| return False | |
| finally: | |
| if os.path.exists(temp_policy_file): | |
| os.remove(temp_policy_file) | |
| except Exception as e: | |
| print(f"Error during ImageMagick policy modification: {e}") | |
| print("Manual intervention may be required.") | |
| return False | |
| # ---------------- Gradio Interface Functions ---------------- # | |
| def generate_script_and_show_editor(user_input, resolution_choice, | |
| caption_enabled_choice, caption_color, | |
| caption_size, caption_position, caption_bg_color, | |
| caption_stroke_color, caption_stroke_width): | |
| """ | |
| Generates the script, parses it, stores segments in state, | |
| and prepares the UI updates to show the editing interface. | |
| """ | |
| global TEMP_FOLDER | |
| # Clean up previous run's temp folder if it exists | |
| if TEMP_FOLDER and os.path.exists(TEMP_FOLDER): | |
| print(f"Cleaning up previous temp folder: {TEMP_FOLDER}") | |
| try: | |
| shutil.rmtree(TEMP_FOLDER) | |
| except Exception as e: | |
| print(f"Error cleaning up temp folder {TEMP_FOLDER}: {e}") | |
| # Create a new unique temporary folder for this run | |
| TEMP_FOLDER = tempfile.mkdtemp() | |
| print(f"Created new temp folder: {TEMP_FOLDER}") | |
| # Store global style choices in state or use them directly (let's store in state) | |
| # Gradio State can hold a single object. Let's use a dict. | |
| run_config = { | |
| "resolution": (1920, 1080) if resolution_choice == "Full" else (1080, 1920), | |
| "caption_enabled": caption_enabled_choice == "Yes", | |
| "caption_color": caption_color, | |
| "caption_size": caption_size, | |
| "caption_position": caption_position, | |
| "caption_bg_color": caption_bg_color, | |
| "caption_stroke_color": caption_stroke_color, | |
| "caption_stroke_width": caption_stroke_width, | |
| "temp_folder": TEMP_FOLDER # Store temp folder path | |
| } | |
| yield run_config, gr.update(value="Generating script...", visible=True), gr.update(visible=False) # Update status | |
| script_text = generate_script(user_input, OPENROUTER_API_KEY, OPENROUTER_MODEL) | |
| if not script_text or script_text.startswith("[Error]"): | |
| yield run_config, gr.update(value=f"Script generation failed: {script_text}", visible=True), gr.update(visible=False) | |
| return run_config, gr.update(visible=True), gr.update(visible=False), [], [], [], [] # Clear segment components | |
| yield run_config, gr.update(value="Parsing script...", visible=True), gr.update(visible=False) | |
| segments = parse_script(script_text) | |
| if not segments: | |
| yield run_config, gr.update(value="Failed to parse script or script is empty.", visible=True), gr.update(visible=False) | |
| return run_config, gr.update(visible=True), gr.update(visible=False), [], [], [], [] # Clear segment components | |
| # Prepare updates for dynamic editing components | |
| # We need to return lists of gr.update() calls for the visibility and content | |
| # of each textbox and file component in the editing groups. | |
| textbox_updates = [] | |
| file_updates = [] | |
| group_visibility_updates = [] | |
| for i in range(MAX_SEGMENTS_FOR_EDITING): | |
| if i < len(segments): | |
| # Show group, populate text, clear file upload | |
| textbox_updates.append(gr.update(value=segments[i]['text'], visible=True)) | |
| file_updates.append(gr.update(value=None, visible=True)) # Clear previous uploads | |
| group_visibility_updates.append(gr.update(visible=True)) | |
| else: | |
| # Hide unused groups | |
| textbox_updates.append(gr.update(value="", visible=False)) | |
| file_updates.append(gr.update(value=None, visible=False)) | |
| group_visibility_updates.append(gr.update(visible=False)) | |
| yield (run_config, | |
| gr.update(value="Script generated. Edit segments below.", visible=True), | |
| gr.update(visible=True), # Show Generate Video button | |
| group_visibility_updates, # Update visibility of groups | |
| textbox_updates, # Update textboxes | |
| file_updates, # Update file uploads | |
| segments) # Update the state with parsed segments | |
| def generate_video_from_edited(run_config, segments_data, segment_texts, segment_uploads): | |
| """ | |
| Takes the edited segment data (text, uploaded files) and configuration, | |
| and generates the final video. | |
| """ | |
| if not segments_data: | |
| yield "No segments to process. Generate script first.", None | |
| return | |
| global TEMP_FOLDER | |
| # Ensure TEMP_FOLDER is correctly set from run_config | |
| TEMP_FOLDER = run_config.get("temp_folder") | |
| if not TEMP_FOLDER or not os.path.exists(TEMP_FOLDER): | |
| yield "Error: Temporary folder not found. Please regenerate script.", None | |
| return | |
| # Extract config from run_config | |
| TARGET_RESOLUTION = run_config["resolution"] | |
| CAPTION_ENABLED = run_config["caption_enabled"] | |
| CAPTION_COLOR = run_config["caption_color"] | |
| CAPTION_SIZE = run_config["caption_size"] | |
| CAPTION_POSITION = run_config["caption_position"] | |
| CAPTION_BG_COLOR = run_config["caption_bg_color"] | |
| CAPTION_STROKE_COLOR = run_config["caption_stroke_color"] | |
| CAPTION_STROKE_WIDTH = run_config["caption_stroke_width"] | |
| # Update segments_data with potentially edited text and uploaded file paths | |
| # segment_texts and segment_uploads are lists of values from the Gradio components | |
| processed_segments = [] | |
| for i, segment in enumerate(segments_data): | |
| if i < len(segment_texts): # Ensure we have corresponding input values | |
| processed_segment = segment.copy() # Make a copy | |
| processed_segment['text'] = segment_texts[i] # Use the edited text | |
| processed_segment['uploaded_media'] = segment_uploads[i] # Use the uploaded file path (None if not uploaded) | |
| processed_segments.append(processed_segment) | |
| else: | |
| # This shouldn't happen if state and UI updates are in sync, but as a safeguard | |
| print(f"Warning: Missing input value for segment index {i}. Skipping segment.") | |
| # Or perhaps use the original segment data if no edited input? Let's skip for safety. | |
| # processed_segments.append(segment) # Append original if no input? Depends on desired behavior. | |
| if not processed_segments: | |
| yield "No valid segments to process after editing.", None | |
| return | |
| yield "Fixing ImageMagick policy...", None | |
| fix_imagemagick_policy() # Attempt policy fix before creating clips | |
| clips = [] | |
| yield "Generating media and audio for clips...", None | |
| total_segments = len(processed_segments) | |
| for idx, segment in enumerate(processed_segments): | |
| yield f"Processing segment {idx+1}/{total_segments}...", None | |
| print(f"\nProcessing segment {idx+1}/{total_segments}...") | |
| # Determine media source: uploaded or generated | |
| media_asset = None | |
| if segment.get('uploaded_media') and os.path.exists(segment['uploaded_media']): | |
| print(f"Using uploaded media for segment {idx+1}: {segment['uploaded_media']}") | |
| file_ext = os.path.splitext(segment['uploaded_media'])[1].lower() | |
| asset_type = 'video' if file_ext in ['.mp4', '.mov', '.avi', '.webm'] else 'image' | |
| # Need to copy the uploaded file to the temp folder if it's not already there | |
| try: | |
| temp_upload_path = os.path.join(TEMP_FOLDER, f"user_upload_{idx}{file_ext}") | |
| shutil.copy2(segment['uploaded_media'], temp_upload_path) | |
| media_asset = {"path": temp_upload_path, "asset_type": asset_type} | |
| except Exception as e: | |
| print(f"Error copying user upload {segment['uploaded_media']}: {e}. Attempting to generate media instead.") | |
| media_asset = generate_media_asset(segment['original_prompt'], None) # Pass None for uploaded_media | |
| else: | |
| print(f"No user upload for segment {idx+1}. Generating media from prompt: '{segment['original_prompt']}'") | |
| media_asset = generate_media_asset(segment['original_prompt'], None) # Pass None for uploaded_media | |
| if not media_asset: | |
| print(f"Failed to generate or use media asset for segment {idx+1}. Creating placeholder.") | |
| # Create a dummy asset dict pointing to a non-existent path so create_clip makes a black clip | |
| media_asset = {"path": os.path.join(TEMP_FOLDER, f"dummy_missing_media_{idx}.txt"), "asset_type": "image"} # Use image as dummy type | |
| # Generate TTS audio | |
| tts_path = generate_tts(segment['text'], voice='en') # Using 'en' voice | |
| # Create the video clip for this segment | |
| clip = create_clip( | |
| media_asset=media_asset, | |
| tts_path=tts_path, | |
| duration=segment['duration'], # Use estimated duration as a fallback reference | |
| target_resolution=TARGET_RESOLUTION, | |
| caption_enabled=CAPTION_ENABLED, | |
| caption_color=CAPTION_COLOR, | |
| caption_size=CAPTION_SIZE, | |
| caption_position=CAPTION_POSITION, | |
| caption_bg_color=CAPTION_BG_COLOR, | |
| caption_stroke_color=CAPTION_STROKE_COLOR, | |
| caption_stroke_width=CAPTION_STROKE_WIDTH, | |
| narration_text=segment['text'], | |
| segment_index=idx+1 | |
| ) | |
| if clip: | |
| clips.append(clip) | |
| else: | |
| print(f"Skipping segment {idx+1} due to clip creation failure.") | |
| # Create a placeholder black clip if create_clip returned None | |
| placeholder_duration = segment.get('duration', 3.0) # Use estimated duration or default | |
| placeholder_clip = ColorClip(size=TARGET_RESOLUTION, color=(0,0,0), duration=placeholder_duration) | |
| silent_audio_path = generate_silent_audio(placeholder_duration) | |
| if silent_audio_path and os.path.exists(silent_audio_path): | |
| placeholder_clip = placeholder_clip.set_audio(AudioFileClip(silent_audio_path)) | |
| error_text = f"Segment {idx+1} Failed" | |
| if segment.get('text'): error_text += f":\n{segment['text'][:50]}..." | |
| error_txt_clip = TextClip(error_text, fontsize=30, color="red", align='center', size=(TARGET_RESOLUTION[0] * 0.9, None)).set_position('center').set_duration(placeholder_duration) | |
| placeholder_clip = CompositeVideoClip([placeholder_clip, error_txt_clip]) | |
| clips.append(placeholder_clip) | |
| if not clips: | |
| yield "No clips were successfully created. Video generation failed.", None | |
| # Clean up | |
| if TEMP_FOLDER and os.path.exists(TEMP_FOLDER): | |
| try: | |
| shutil.rmtree(TEMP_FOLDER) | |
| print(f"Cleaned up temp folder: {TEMP_FOLDER}") | |
| except Exception as e: | |
| print(f"Error cleaning up temp folder {TEMP_FOLDER}: {e}") | |
| TEMP_FOLDER = None # Reset global | |
| return | |
| yield "Concatenating clips...", None | |
| print("\nConcatenating clips...") | |
| final_video = concatenate_videoclips(clips, method="compose") | |
| yield "Adding background music...", None | |
| bg_music_path = find_mp3_files() # Find background music | |
| final_video = add_background_music(final_video, bg_music_path, bg_music_volume=0.08) # Use default volume | |
| yield f"Exporting final video to {OUTPUT_VIDEO_FILENAME}...", None | |
| print(f"Exporting final video to {OUTPUT_VIDEO_FILENAME}...") | |
| try: | |
| # Use a temporary output file first for safety | |
| temp_output_filename = os.path.join(TEMP_FOLDER, f"temp_{OUTPUT_VIDEO_FILENAME}") | |
| final_video.write_videofile(temp_output_filename, codec='libx264', fps=24, preset='veryfast') | |
| # Move the final file to the intended location after successful export | |
| shutil.move(temp_output_filename, OUTPUT_VIDEO_FILENAME) | |
| print(f"Final video saved as {OUTPUT_VIDEO_FILENAME}") | |
| output_path = OUTPUT_VIDEO_FILENAME | |
| except Exception as e: | |
| print(f"Error exporting video: {e}") | |
| output_path = None | |
| yield f"Video export failed: {e}", None # Provide error message in status | |
| # Clean up temporary folder | |
| yield "Cleaning up temporary files...", output_path # Update status before cleanup | |
| if TEMP_FOLDER and os.path.exists(TEMP_FOLDER): | |
| try: | |
| shutil.rmtree(TEMP_FOLDER) | |
| print(f"Cleaned up temp folder: {TEMP_FOLDER}") | |
| except Exception as e: | |
| print(f"Error cleaning up temp folder {TEMP_FOLDER}: {e}") | |
| TEMP_FOLDER = None # Reset global | |
| yield "Done!", output_path # Final status update | |
| # ---------------- Gradio Interface Definition (Blocks) ---------------- # | |
| # Need lists to hold the dynamic UI components for segments | |
| segment_editing_groups = [] | |
| segment_text_inputs = [] | |
| segment_file_inputs = [] | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# 🤖 AI Documentary Video Generator 🎬") | |
| gr.Markdown("Enter a concept to generate a funny documentary script. You can then edit the script text and replace the suggested media for each segment before generating the final video.") | |
| # --- Global Settings --- | |
| with gr.Accordion("Global Settings", open=True): | |
| user_concept_input = gr.Textbox(label="Video Concept", placeholder="e.g., The secret life of pigeons, Why socks disappear in the laundry, The futility of alarm clocks...") | |
| with gr.Row(): | |
| resolution_radio = gr.Radio(["Full (1920x1080)", "Short (1080x1920)"], label="Video Resolution", value="Full (1920x1080)") | |
| bg_music_volume_slider = gr.Slider(minimum=0, maximum=1.0, value=0.08, step=0.01, label="Background Music Volume") | |
| # --- Caption Settings --- | |
| with gr.Accordion("Caption Settings", open=False): | |
| caption_enabled_radio = gr.Radio(["Yes", "No"], label="Show Captions?", value="Yes") | |
| caption_color_picker = gr.ColorPicker(label="Caption Text Color", value="#FFFFFF") # Default white | |
| caption_bg_color_picker = gr.ColorPicker(label="Caption Background Color (with transparency)", value="rgba(0, 0, 0, 0.25)") # Default semi-transparent black | |
| caption_size_slider = gr.Slider(minimum=20, maximum=100, value=45, step=1, label="Caption Font Size") | |
| caption_position_radio = gr.Radio(["Top", "Middle", "Bottom"], label="Caption Position", value="Bottom") | |
| caption_stroke_color_picker = gr.ColorPicker(label="Caption Stroke Color", value="#000000") # Default black stroke | |
| caption_stroke_width_slider = gr.Slider(minimum=0, maximum=5, value=2, step=0.5, label="Caption Stroke Width") | |
| generate_script_btn = gr.Button("Generate Script", variant="primary") | |
| # --- Status and Script Output --- | |
| status_output = gr.Label(label="Status", value="") | |
| script_preview_markdown = gr.Markdown("### Generated Script Preview\n\nScript will appear here...") # Optional raw script preview | |
| # --- State to hold parsed segments data and run config --- | |
| segments_state = gr.State([]) # List of segment dictionaries | |
| run_config_state = gr.State({}) # Dictionary for run configuration | |
| # --- Dynamic Editing Area (Initially hidden) --- | |
| # We create MAX_SEGMENTS_FOR_EDITING groups, and show/hide them dynamically | |
| with gr.Column(visible=False) as editing_area: | |
| gr.Markdown("### Edit Script Segments") | |
| gr.Markdown("Review the AI-generated text and media suggestions below. Edit the text and/or upload your own image/video for any segment. If no file is uploaded, AI will fetch media based on the original prompt.") | |
| for i in range(MAX_SEGMENTS_FOR_EDITING): | |
| with gr.Group(visible=False) as segment_group: # Each group represents one segment | |
| segment_editing_groups.append(segment_group) | |
| gr.Markdown(f"**Segment {i+1}** (Prompt: <span id='segment-prompt-{i}'></span>)") # Placeholder for prompt text | |
| # Using JS to update prompt text because Textbox is used for narration | |
| # Alternatively, could use a non-editable gr.Label or gr.Textbox for prompt | |
| segment_text = gr.Textbox(label="Narration Text", lines=2, interactive=True) | |
| segment_text_inputs.append(segment_text) | |
| segment_file = gr.File(label="Upload Custom Media (Image or Video)", type="filepath", interactive=True) | |
| segment_file_inputs.append(segment_file) | |
| generate_video_btn = gr.Button("Generate Video", variant="primary") | |
| # --- Final Video Output --- | |
| final_video_output = gr.Video(label="Generated Video") | |
| # --- Event Handlers --- | |
| # Generate Script Button Click | |
| generate_script_btn.click( | |
| fn=generate_script_and_show_editor, | |
| inputs=[ | |
| user_concept_input, | |
| resolution_radio, | |
| caption_enabled_radio, | |
| caption_color_picker, | |
| caption_size_slider, | |
| caption_position_radio, | |
| caption_bg_color_picker, | |
| caption_stroke_color_picker, | |
| caption_stroke_width_slider | |
| ], | |
| outputs=[ | |
| run_config_state, | |
| status_output, | |
| editing_area, # Show the editing area | |
| # Outputs to update visibility of segment groups | |
| *segment_editing_groups, | |
| # Outputs to update values of segment textboxes | |
| *segment_text_inputs, | |
| # Outputs to update values (clear) of segment file uploads | |
| *segment_file_inputs, | |
| # Output to update the segments_state | |
| segments_state | |
| ] | |
| ) | |
| # Generate Video Button Click | |
| generate_video_btn.click( | |
| fn=generate_video_from_edited, | |
| inputs=[ | |
| run_config_state, # Pass run config | |
| segments_state, # Pass the original parsed segments data | |
| *segment_text_inputs, # Pass list of edited text values | |
| *segment_file_inputs # Pass list of uploaded file paths | |
| ], | |
| outputs=[status_output, final_video_output] # Yield status updates and final video | |
| ) | |
| # Add JS to update segment prompt labels after script generation | |
| # This requires defining IDs in the Markdown previously | |
| demo.load( | |
| None, | |
| None, | |
| None, | |
| _js=f""" | |
| function updateSegmentPrompts(segments_data) {{ | |
| if (!segments_data) return; | |
| for (let i = 0; i < segments_data.length; i++) {{ | |
| const promptSpan = document.getElementById('segment-prompt-' + i); | |
| if (promptSpan) {{ | |
| promptSpan.textContent = segments_data[i].original_prompt; | |
| }} | |
| }} | |
| // Hide unused prompt spans | |
| for (let i = segments_data.length; i < {MAX_SEGMENTS_FOR_EDITING}; i++) {{ | |
| const promptSpan = document.getElementById('segment-prompt-' + i); | |
| if (promptSpan) {{ | |
| promptSpan.textContent = ''; // Clear text | |
| }} | |
| }} | |
| }} | |
| """ | |
| ) | |
| # Trigger the JS function whenever segments_state changes | |
| segments_state.change( | |
| None, | |
| segments_state, | |
| None, | |
| _js=""" | |
| (segments_data) => { | |
| updateSegmentPrompts(segments_data); | |
| } | |
| """ | |
| ) | |
| # Launch the interface | |
| if __name__ == "__main__": | |
| # Attempt ImageMagick policy fix on script startup | |
| # This helps but might still require manual sudo depending on system config | |
| fix_imagemagick_policy() | |
| print("Launching Gradio interface...") | |
| # Make sure to set PEXELS_API_KEY and OPENROUTER_API_KEY environment variables | |
| # or replace 'YOUR_PEXELS_API_KEY' and 'YOUR_OPENROUTER_API_KEY' above. | |
| if PEXELS_API_KEY == 'YOUR_PEXELS_API_KEY': | |
| print("Warning: PEXELS_API_KEY is not configured. Media search may fail.") | |
| if OPENROUTER_API_KEY == 'YOUR_OPENROUTER_API_KEY': | |
| print("Warning: OPENROUTER_API_KEY is not configured. Script generation will fail.") | |
| demo.launch(share=True) # Set share=True to get a public link |