|
import os |
|
import re |
|
import json |
|
import time |
|
import random |
|
import tempfile |
|
import requests |
|
import numpy as np |
|
import uuid |
|
from PIL import Image, ImageDraw, ImageFont |
|
from io import BytesIO |
|
from datetime import datetime |
|
import gradio as gr |
|
from dotenv import load_dotenv |
|
import moviepy.editor as mpy |
|
from moviepy.editor import * |
|
from moviepy.audio.fx.all import volumex |
|
from moviepy.video.fx.all import crop |
|
|
|
|
|
import sys |
|
if sys.platform.startswith('win'): |
|
import asyncio |
|
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
BASE_DIR = os.path.dirname(os.path.abspath(__file__)) |
|
STATIC_DIR = os.path.join(BASE_DIR, "static") |
|
MUSIC_DIR = os.path.join(STATIC_DIR, "music") |
|
FONTS_DIR = os.path.join(STATIC_DIR, "fonts") |
|
STORAGE_DIR = os.path.join(BASE_DIR, "storage") |
|
|
|
|
|
os.makedirs(STATIC_DIR, exist_ok=True) |
|
os.makedirs(MUSIC_DIR, exist_ok=True) |
|
os.makedirs(FONTS_DIR, exist_ok=True) |
|
os.makedirs(STORAGE_DIR, exist_ok=True) |
|
|
|
|
|
def info(message): |
|
timestamp = datetime.now().strftime("%H:%M:%S") |
|
formatted_message = f"[{timestamp}] [INFO] {message}" |
|
print(formatted_message) |
|
return formatted_message |
|
|
|
def success(message): |
|
timestamp = datetime.now().strftime("%H:%M:%S") |
|
formatted_message = f"[{timestamp}] [SUCCESS] {message}" |
|
print(formatted_message) |
|
return formatted_message |
|
|
|
def warning(message): |
|
timestamp = datetime.now().strftime("%H:%M:%S") |
|
formatted_message = f"[{timestamp}] [WARNING] {message}" |
|
print(formatted_message) |
|
return formatted_message |
|
|
|
def error(message): |
|
timestamp = datetime.now().strftime("%H:%M:%S") |
|
formatted_message = f"[{timestamp}] [ERROR] {message}" |
|
print(formatted_message) |
|
return formatted_message |
|
|
|
def get_music_files(): |
|
"""Get list of available music files in the music directory.""" |
|
if not os.path.exists(MUSIC_DIR): |
|
return ["none"] |
|
|
|
music_files = [f for f in os.listdir(MUSIC_DIR) if f.endswith(('.mp3', '.wav'))] |
|
if not music_files: |
|
return ["none"] |
|
|
|
return ["random"] + music_files |
|
|
|
def get_font_files(): |
|
"""Get list of available font files in the fonts directory.""" |
|
if not os.path.exists(FONTS_DIR): |
|
return ["default"] |
|
|
|
font_files = [f.split('.')[0] for f in os.listdir(FONTS_DIR) if f.endswith(('.ttf', '.otf'))] |
|
if not font_files: |
|
return ["default"] |
|
|
|
return ["random"] + font_files |
|
|
|
def choose_random_music(): |
|
"""Selects a random music file from the music directory.""" |
|
if not os.path.exists(MUSIC_DIR): |
|
error(f"Music directory {MUSIC_DIR} does not exist") |
|
return None |
|
|
|
music_files = [f for f in os.listdir(MUSIC_DIR) if f.endswith(('.mp3', '.wav'))] |
|
if not music_files: |
|
warning(f"No music files found in {MUSIC_DIR}") |
|
return None |
|
|
|
return os.path.join(MUSIC_DIR, random.choice(music_files)) |
|
|
|
def choose_random_font(): |
|
"""Selects a random font file from the fonts directory.""" |
|
if not os.path.exists(FONTS_DIR): |
|
error(f"Fonts directory {FONTS_DIR} does not exist") |
|
return "default" |
|
|
|
font_files = [f for f in os.listdir(FONTS_DIR) if f.endswith(('.ttf', '.otf'))] |
|
if not font_files: |
|
warning(f"No font files found in {FONTS_DIR}") |
|
return None |
|
|
|
return font_files[0].split('.')[0] if len(font_files) == 1 else random.choice([f.split('.')[0] for f in font_files]) |
|
|
|
class YouTube: |
|
def __init__(self, niche: str, language: str, |
|
text_gen="g4f", text_model="gpt-4", |
|
image_gen="g4f", image_model="flux", |
|
tts_engine="edge", tts_voice="en-US-AriaNeural", |
|
subtitle_font="default", font_size=80, |
|
text_color="white", highlight_color="blue", |
|
subtitles_enabled=True, highlighting_enabled=True, |
|
subtitle_position="bottom", music_file="random", |
|
enable_music=True, music_volume=0.1, |
|
api_keys=None, progress=gr.Progress()) -> None: |
|
|
|
"""Initialize the YouTube Shorts Generator.""" |
|
self.progress = progress |
|
self.progress(0, desc="Initializing") |
|
|
|
|
|
info(f"Initializing YouTube class") |
|
self._niche = niche |
|
self._language = language |
|
self.text_gen = text_gen |
|
self.text_model = text_model |
|
self.image_gen = image_gen |
|
self.image_model = image_model |
|
self.tts_engine = tts_engine |
|
self.tts_voice = tts_voice |
|
self.subtitle_font = subtitle_font |
|
self.font_size = font_size |
|
self.text_color = text_color |
|
self.highlight_color = highlight_color |
|
self.subtitles_enabled = subtitles_enabled |
|
self.highlighting_enabled = highlighting_enabled |
|
self.subtitle_position = subtitle_position |
|
self.music_file = music_file |
|
self.enable_music = enable_music |
|
self.music_volume = music_volume |
|
self.api_keys = api_keys or {} |
|
self.images = [] |
|
self.logs = [] |
|
|
|
|
|
if 'gemini' in self.api_keys and self.api_keys['gemini']: |
|
os.environ["GEMINI_API_KEY"] = self.api_keys['gemini'] |
|
|
|
if 'assemblyai' in self.api_keys and self.api_keys['assemblyai']: |
|
os.environ["ASSEMBLYAI_API_KEY"] = self.api_keys['assemblyai'] |
|
|
|
if 'elevenlabs' in self.api_keys and self.api_keys['elevenlabs']: |
|
os.environ["ELEVENLABS_API_KEY"] = self.api_keys['elevenlabs'] |
|
|
|
if 'segmind' in self.api_keys and self.api_keys['segmind']: |
|
os.environ["SEGMIND_API_KEY"] = self.api_keys['segmind'] |
|
|
|
if 'openai' in self.api_keys and self.api_keys['openai']: |
|
os.environ["OPENAI_API_KEY"] = self.api_keys['openai'] |
|
|
|
info(f"Niche: {niche}, Language: {language}") |
|
self.log(f"Initialized with niche: {niche}, language: {language}") |
|
self.log(f"Text generator: {text_gen} - Model: {text_model}") |
|
self.log(f"Image generator: {image_gen} - Model: {image_model}") |
|
self.log(f"TTS engine: {tts_engine} - Voice: {tts_voice}") |
|
self.log(f"Subtitles: {'Enabled' if subtitles_enabled else 'Disabled'} - Highlighting: {'Enabled' if highlighting_enabled else 'Disabled'}") |
|
self.log(f"Music: {music_file}") |
|
|
|
def log(self, message): |
|
"""Add a log message to the logs list.""" |
|
timestamp = datetime.now().strftime("%H:%M:%S") |
|
log_entry = f"[{timestamp}] {message}" |
|
self.logs.append(log_entry) |
|
return log_entry |
|
|
|
@property |
|
def niche(self) -> str: |
|
return self._niche |
|
|
|
@property |
|
def language(self) -> str: |
|
return self._language |
|
|
|
def generate_response(self, prompt: str, model: str = None) -> str: |
|
"""Generate a response using the selected text generation model.""" |
|
self.log(f"Generating response for prompt: {prompt[:50]}...") |
|
|
|
try: |
|
if self.text_gen == "gemini": |
|
self.log("Using Google's Gemini model") |
|
|
|
|
|
gemini_api_key = os.environ.get("GEMINI_API_KEY", "") |
|
if not gemini_api_key: |
|
raise ValueError("Gemini API key is not set. Please provide a valid API key.") |
|
|
|
import google.generativeai as genai |
|
genai.configure(api_key=gemini_api_key) |
|
model_to_use = model if model else self.text_model |
|
genai_model = genai.GenerativeModel(model_to_use) |
|
response = genai_model.generate_content(prompt).text |
|
|
|
elif self.text_gen == "g4f": |
|
self.log("Using G4F for text generation") |
|
import g4f |
|
model_to_use = model if model else self.text_model |
|
self.log(f"Using G4F model: {model_to_use}") |
|
response = g4f.ChatCompletion.create( |
|
model=model_to_use, |
|
messages=[{"role": "user", "content": prompt}] |
|
) |
|
|
|
elif self.text_gen == "openai": |
|
self.log("Using OpenAI for text generation") |
|
openai_api_key = os.environ.get("OPENAI_API_KEY", "") |
|
if not openai_api_key: |
|
raise ValueError("OpenAI API key is not set. Please provide a valid API key.") |
|
|
|
from openai import OpenAI |
|
client = OpenAI(api_key=openai_api_key) |
|
model_to_use = model if model else "gpt-3.5-turbo" |
|
|
|
response = client.chat.completions.create( |
|
model=model_to_use, |
|
messages=[{"role": "user", "content": prompt}] |
|
).choices[0].message.content |
|
|
|
else: |
|
|
|
error_msg = f"Unsupported text generator: {self.text_gen}" |
|
self.log(error(error_msg)) |
|
raise ValueError(error_msg) |
|
|
|
self.log(f"Response generated successfully, length: {len(response)} characters") |
|
return response |
|
|
|
except Exception as e: |
|
error_msg = f"Error generating response: {str(e)}" |
|
self.log(error(error_msg)) |
|
raise Exception(error_msg) |
|
|
|
def generate_topic(self) -> str: |
|
"""Generate a topic based on the YouTube Channel niche.""" |
|
self.progress(0.05, desc="Generating topic") |
|
self.log("Generating topic based on niche") |
|
|
|
completion = self.generate_response( |
|
f"Please generate a specific video idea that takes about the following topic: {self.niche}. " |
|
f"Make it exactly one sentence. Only return the topic, nothing else." |
|
) |
|
|
|
if not completion: |
|
self.log(error("Failed to generate Topic.")) |
|
raise Exception("Failed to generate a topic. Please try again with a different niche.") |
|
|
|
self.subject = completion |
|
self.log(success(f"Generated topic: {completion}")) |
|
return completion |
|
|
|
def generate_script(self) -> str: |
|
"""Generate a script for a video, based on the subject and language.""" |
|
self.progress(0.1, desc="Creating script") |
|
self.log("Generating script for video") |
|
|
|
prompt = f""" |
|
Generate a script for youtube shorts video, depending on the subject of the video. |
|
|
|
The script is to be returned as a string with the specified number of paragraphs. |
|
|
|
Here is an example of a string: |
|
"This is an example string." |
|
|
|
Do not under any circumstance reference this prompt in your response. |
|
|
|
Get straight to the point, don't start with unnecessary things like, "welcome to this video". |
|
|
|
Obviously, the script should be related to the subject of the video. |
|
|
|
YOU MUST NOT INCLUDE ANY TYPE OF MARKDOWN OR FORMATTING IN THE SCRIPT, NEVER USE A TITLE. |
|
YOU MUST WRITE THE SCRIPT IN THE LANGUAGE SPECIFIED IN [LANGUAGE]. |
|
ONLY RETURN THE RAW CONTENT OF THE SCRIPT. DO NOT INCLUDE "VOICEOVER", "NARRATOR" OR SIMILAR INDICATORS. |
|
|
|
Subject: {self.subject} |
|
Language: {self.language} |
|
""" |
|
completion = self.generate_response(prompt) |
|
|
|
|
|
completion = re.sub(r"\*", "", completion) |
|
|
|
if not completion: |
|
self.log(error("The generated script is empty.")) |
|
raise Exception("Failed to generate a script. Please try again.") |
|
|
|
if len(completion) > 5000: |
|
self.log(warning("Generated script is too long.")) |
|
raise ValueError("Generated script exceeds 5000 characters. Please try again.") |
|
|
|
self.script = completion |
|
self.log(success(f"Generated script ({len(completion)} chars)")) |
|
return completion |
|
|
|
def generate_metadata(self) -> dict: |
|
"""Generate video metadata (title, description).""" |
|
self.progress(0.15, desc="Creating title and description") |
|
self.log("Generating metadata (title and description)") |
|
|
|
title = self.generate_response( |
|
f"Please generate a YouTube Video Title for the following subject, including hashtags: " |
|
f"{self.subject}. Only return the title, nothing else. Limit the title under 100 characters." |
|
) |
|
|
|
if len(title) > 100: |
|
self.log(warning("Generated title exceeds 100 characters.")) |
|
raise ValueError("Generated title exceeds 100 characters. Please try again.") |
|
|
|
description = self.generate_response( |
|
f"Please generate a YouTube Video Description for the following script: {self.script}. " |
|
f"Only return the description, nothing else." |
|
) |
|
|
|
self.metadata = { |
|
"title": title, |
|
"description": description |
|
} |
|
|
|
self.log(success(f"Generated title: {title}")) |
|
self.log(success(f"Generated description: {description[:50]}...")) |
|
return self.metadata |
|
|
|
def generate_prompts(self, count=5) -> list: |
|
"""Generate AI Image Prompts based on the provided Video Script.""" |
|
self.progress(0.2, desc="Creating image prompts") |
|
self.log(f"Generating {count} image prompts") |
|
|
|
prompt = f""" |
|
Generate {count} Image Prompts for AI Image Generation, |
|
depending on the subject of a video. |
|
Subject: {self.subject} |
|
|
|
The image prompts are to be returned as |
|
a JSON-Array of strings. |
|
|
|
Each search term should consist of a full sentence, |
|
always add the main subject of the video. |
|
|
|
Be emotional and use interesting adjectives to make the |
|
Image Prompt as detailed as possible. |
|
|
|
YOU MUST ONLY RETURN THE JSON-ARRAY OF STRINGS. |
|
YOU MUST NOT RETURN ANYTHING ELSE. |
|
YOU MUST NOT RETURN THE SCRIPT. |
|
|
|
The search terms must be related to the subject of the video. |
|
Here is an example of a JSON-Array of strings: |
|
["image prompt 1", "image prompt 2", "image prompt 3"] |
|
|
|
For context, here is the full text: |
|
{self.script} |
|
""" |
|
|
|
completion = str(self.generate_response(prompt))\ |
|
.replace("```json", "") \ |
|
.replace("```", "") |
|
|
|
image_prompts = [] |
|
|
|
if "image_prompts" in completion: |
|
try: |
|
image_prompts = json.loads(completion)["image_prompts"] |
|
except: |
|
self.log(warning("Failed to parse 'image_prompts' from JSON response.")) |
|
|
|
if not image_prompts: |
|
try: |
|
image_prompts = json.loads(completion) |
|
self.log(f"Parsed image prompts from JSON response.") |
|
except Exception: |
|
self.log(warning("JSON parsing failed. Attempting to extract array using regex...")) |
|
|
|
|
|
r = re.compile(r"\[.*\]", re.DOTALL) |
|
matches = r.findall(completion) |
|
if len(matches) == 0: |
|
self.log(warning("Failed to extract array. Unable to create image prompts.")) |
|
raise ValueError("Failed to generate valid image prompts. Please try again.") |
|
else: |
|
try: |
|
image_prompts = json.loads(matches[0]) |
|
except: |
|
self.log(error("Failed to parse array from regex match.")) |
|
|
|
string_pattern = r'"([^"]*)"' |
|
strings = re.findall(string_pattern, matches[0]) |
|
if strings: |
|
image_prompts = strings |
|
else: |
|
self.log(error("Failed to extract strings from regex match.")) |
|
raise ValueError("Failed to parse image prompts. Please try again.") |
|
|
|
|
|
if len(image_prompts) < count: |
|
self.log(warning(f"Received fewer prompts ({len(image_prompts)}) than requested ({count}).")) |
|
raise ValueError(f"Received only {len(image_prompts)} prompts instead of {count}. Please try again.") |
|
|
|
|
|
image_prompts = image_prompts[:count] |
|
|
|
self.image_prompts = image_prompts |
|
self.log(success(f"Generated {len(self.image_prompts)} Image Prompts")) |
|
for i, prompt in enumerate(self.image_prompts): |
|
self.log(f"Image Prompt {i+1}: {prompt}") |
|
|
|
return image_prompts |
|
|
|
def generate_image(self, prompt) -> str: |
|
"""Generate an image using the selected image generation model.""" |
|
self.log(f"Generating image for prompt: {prompt[:50]}...") |
|
|
|
|
|
if hasattr(self, 'generation_folder') and os.path.exists(self.generation_folder): |
|
image_path = os.path.join(self.generation_folder, f"img_{uuid.uuid4()}_{int(time.time())}.png") |
|
else: |
|
|
|
image_path = os.path.join(STORAGE_DIR, f"img_{uuid.uuid4()}_{int(time.time())}.png") |
|
|
|
if self.image_gen == "prodia": |
|
self.log("Using Prodia provider for image generation") |
|
s = requests.Session() |
|
headers = { |
|
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" |
|
} |
|
|
|
|
|
self.log("Sending generation request to Prodia API") |
|
resp = s.get( |
|
"https://api.prodia.com/generate", |
|
params={ |
|
"new": "true", |
|
"prompt": prompt, |
|
"model": self.image_model, |
|
"negative_prompt": "verybadimagenegative_v1.3", |
|
"steps": "20", |
|
"cfg": "7", |
|
"seed": random.randint(1, 10000), |
|
"sample": "DPM++ 2M Karras", |
|
"aspect_ratio": "square" |
|
}, |
|
headers=headers |
|
) |
|
|
|
if resp.status_code != 200: |
|
raise Exception(f"Prodia API error: {resp.text}") |
|
|
|
job_id = resp.json()['job'] |
|
self.log(f"Job created with ID: {job_id}") |
|
|
|
|
|
max_attempts = 30 |
|
attempts = 0 |
|
while attempts < max_attempts: |
|
attempts += 1 |
|
time.sleep(2) |
|
status = s.get(f"https://api.prodia.com/job/{job_id}", headers=headers).json() |
|
|
|
if status["status"] == "succeeded": |
|
self.log("Image generation successful, downloading result") |
|
img_data = s.get(f"https://images.prodia.xyz/{job_id}.png?download=1", headers=headers).content |
|
with open(image_path, "wb") as f: |
|
f.write(img_data) |
|
self.images.append(image_path) |
|
self.log(success(f"Image saved to: {image_path}")) |
|
return image_path |
|
|
|
elif status["status"] == "failed": |
|
raise Exception(f"Prodia job failed: {status.get('error', 'Unknown error')}") |
|
|
|
|
|
self.log(f"Still processing, attempt {attempts}/{max_attempts}...") |
|
|
|
raise Exception("Prodia job timed out") |
|
|
|
elif self.image_gen == "hercai": |
|
self.log("Using Hercai provider for image generation") |
|
url = f"https://hercai.onrender.com/{self.image_model}/text2image?prompt={prompt}" |
|
r = requests.get(url) |
|
|
|
if r.status_code != 200: |
|
raise Exception(f"Hercai API error: {r.text}") |
|
|
|
parsed = r.json() |
|
if "url" in parsed and parsed["url"]: |
|
self.log("Image URL received from Hercai") |
|
image_url = parsed["url"] |
|
img_data = requests.get(image_url).content |
|
with open(image_path, "wb") as f: |
|
f.write(img_data) |
|
self.images.append(image_path) |
|
self.log(success(f"Image saved to: {image_path}")) |
|
return image_path |
|
else: |
|
raise Exception("No image URL in Hercai response") |
|
|
|
elif self.image_gen == "g4f": |
|
self.log("Using G4F provider for image generation") |
|
from g4f.client import Client |
|
client = Client() |
|
response = client.images.generate( |
|
model=self.image_model, |
|
prompt=prompt, |
|
response_format="url" |
|
) |
|
|
|
if response and response.data and len(response.data) > 0: |
|
image_url = response.data[0].url |
|
image_response = requests.get(image_url) |
|
|
|
if image_response.status_code == 200: |
|
with open(image_path, "wb") as f: |
|
f.write(image_response.content) |
|
self.images.append(image_path) |
|
self.log(success(f"Image saved to: {image_path}")) |
|
return image_path |
|
else: |
|
raise Exception(f"Failed to download image from {image_url}") |
|
else: |
|
raise Exception("No image URL received from G4F") |
|
|
|
elif self.image_gen == "segmind": |
|
self.log("Using Segmind provider for image generation") |
|
api_key = os.environ.get("SEGMIND_API_KEY", "") |
|
if not api_key: |
|
raise ValueError("Segmind API key is not set. Please provide a valid API key.") |
|
|
|
headers = { |
|
"x-api-key": api_key, |
|
"Content-Type": "application/json" |
|
} |
|
|
|
response = requests.post( |
|
"https://api.segmind.com/v1/sdxl-turbo", |
|
json={ |
|
"prompt": prompt, |
|
"negative_prompt": "blurry, low quality, distorted face, text, watermark", |
|
"samples": 1, |
|
"size": "1024x1024", |
|
"guidance_scale": 1.0 |
|
}, |
|
headers=headers |
|
) |
|
|
|
if response.status_code == 200: |
|
with open(image_path, "wb") as f: |
|
f.write(response.content) |
|
self.images.append(image_path) |
|
self.log(success(f"Image saved to: {image_path}")) |
|
return image_path |
|
else: |
|
raise Exception(f"Segmind request failed: {response.status_code} {response.text}") |
|
|
|
elif self.image_gen == "pollinations": |
|
self.log("Using Pollinations provider for image generation") |
|
response = requests.get(f"https://image.pollinations.ai/prompt/{prompt}{random.randint(1,10000)}") |
|
|
|
if response.status_code == 200: |
|
self.log("Image received from Pollinations") |
|
with open(image_path, "wb") as f: |
|
f.write(response.content) |
|
self.images.append(image_path) |
|
self.log(success(f"Image saved to: {image_path}")) |
|
return image_path |
|
else: |
|
raise Exception(f"Pollinations request failed with status code: {response.status_code}") |
|
|
|
else: |
|
|
|
error_msg = f"Unsupported image generator: {self.image_gen}" |
|
self.log(error(error_msg)) |
|
raise ValueError(error_msg) |
|
|
|
def generate_speech(self, text, output_format='mp3') -> str: |
|
"""Generate speech from text using the selected TTS engine.""" |
|
self.progress(0.6, desc="Creating voiceover") |
|
self.log("Generating speech from text") |
|
|
|
|
|
text = re.sub(r'[^\w\s.?!,;:\'"-]', '', text) |
|
|
|
self.log(f"Using TTS Engine: {self.tts_engine}, Voice: {self.tts_voice}") |
|
|
|
|
|
if hasattr(self, 'generation_folder') and os.path.exists(self.generation_folder): |
|
audio_path = os.path.join(self.generation_folder, f"speech_{uuid.uuid4()}_{int(time.time())}.{output_format}") |
|
else: |
|
|
|
audio_path = os.path.join(STORAGE_DIR, f"speech_{uuid.uuid4()}_{int(time.time())}.{output_format}") |
|
|
|
if self.tts_engine == "elevenlabs": |
|
self.log("Using ElevenLabs provider for speech generation") |
|
elevenlabs_api_key = os.environ.get("ELEVENLABS_API_KEY", "") |
|
if not elevenlabs_api_key: |
|
raise ValueError("ElevenLabs API key is not set. Please provide a valid API key.") |
|
|
|
headers = { |
|
"Accept": "audio/mpeg", |
|
"Content-Type": "application/json", |
|
"xi-api-key": elevenlabs_api_key |
|
} |
|
|
|
payload = { |
|
"text": text, |
|
"model_id": "eleven_turbo_v2", |
|
"voice_settings": { |
|
"stability": 0.5, |
|
"similarity_boost": 0.5, |
|
"style": 0.0, |
|
"use_speaker_boost": True |
|
}, |
|
"output_format": "mp3_44100_128", |
|
"optimize_streaming_latency": 0 |
|
} |
|
|
|
|
|
voice_id_mapping = { |
|
"Sarah": "21m00Tcm4TlvDq8ikWAM", |
|
"Brian": "hxppwzoRmvxK7YkDrjhQ", |
|
"Lily": "p7TAj7L6QVq1fE6XGyjR", |
|
"Monika Sogam": "Fc3XhIu9tfgOPOsU1hMr", |
|
"George": "o7lPjDgzlF8ZAeSpqmaN", |
|
"River": "f0k5evLkhJxrIRJXQJvy", |
|
"Matilda": "XrExE9yKIg1WjnnlVkGX", |
|
"Will": "pvKWM1B1sNRNTlEYYAEZ", |
|
"Jessica": "A5EAMYWMCSsLNL1wYxOv", |
|
"default": "21m00Tcm4TlvDq8ikWAM" |
|
} |
|
|
|
|
|
voice_id = voice_id_mapping.get(self.tts_voice, self.tts_voice) |
|
|
|
self.log(f"Using ElevenLabs voice: {self.tts_voice} (ID: {voice_id})") |
|
|
|
response = requests.post( |
|
url=f"https://api.elevenlabs.io/v1/text-to-speech/{voice_id}", |
|
json=payload, |
|
headers=headers |
|
) |
|
|
|
if response.status_code == 200: |
|
with open(audio_path, 'wb') as f: |
|
f.write(response.content) |
|
self.log(success(f"Speech generated successfully using ElevenLabs at {audio_path}")) |
|
else: |
|
try: |
|
error_data = response.json() |
|
error_message = error_data.get('detail', {}).get('message', response.text) |
|
error_status = error_data.get('status', 'error') |
|
raise Exception(f"ElevenLabs API error ({response.status_code}, {error_status}): {error_message}") |
|
except ValueError: |
|
|
|
raise Exception(f"ElevenLabs API error ({response.status_code}): {response.text}") |
|
|
|
elif self.tts_engine == "gtts": |
|
self.log("Using Google TTS provider for speech generation") |
|
from gtts import gTTS |
|
tts = gTTS(text=text, lang=self.language[:2].lower(), slow=False) |
|
tts.save(audio_path) |
|
|
|
elif self.tts_engine == "openai": |
|
self.log("Using OpenAI provider for speech generation") |
|
openai_api_key = os.environ.get("OPENAI_API_KEY", "") |
|
if not openai_api_key: |
|
raise ValueError("OpenAI API key is not set. Please provide a valid API key.") |
|
|
|
from openai import OpenAI |
|
client = OpenAI(api_key=openai_api_key) |
|
|
|
voice = self.tts_voice if self.tts_voice else "alloy" |
|
response = client.audio.speech.create( |
|
model="tts-1", |
|
voice=voice, |
|
input=text |
|
) |
|
response.stream_to_file(audio_path) |
|
|
|
elif self.tts_engine == "edge": |
|
self.log("Using Edge TTS provider for speech generation") |
|
import edge_tts |
|
import asyncio |
|
|
|
voice = self.tts_voice if self.tts_voice else "en-US-AriaNeural" |
|
|
|
async def generate(): |
|
communicate = edge_tts.Communicate(text, voice) |
|
await communicate.save(audio_path) |
|
|
|
asyncio.run(generate()) |
|
|
|
else: |
|
|
|
error_msg = f"Unsupported TTS engine: {self.tts_engine}" |
|
self.log(error(error_msg)) |
|
raise ValueError(error_msg) |
|
|
|
self.log(success(f"Speech generated and saved to: {audio_path}")) |
|
self.tts_path = audio_path |
|
return audio_path |
|
|
|
def generate_subtitles(self, audio_path: str) -> dict: |
|
"""Generate subtitles from audio using AssemblyAI.""" |
|
|
|
if not self.subtitles_enabled: |
|
self.log("Subtitles are disabled, skipping generation") |
|
return { |
|
"wordlevel": [], |
|
"linelevel": [], |
|
"settings": { |
|
"font": self.subtitle_font, |
|
"fontsize": self.font_size, |
|
"color": self.text_color, |
|
"bg_color": self.highlight_color if self.highlighting_enabled else None, |
|
"position": self.subtitle_position, |
|
"highlighting_enabled": self.highlighting_enabled, |
|
"subtitles_enabled": self.subtitles_enabled |
|
} |
|
} |
|
|
|
self.log("Generating subtitles from audio") |
|
try: |
|
import assemblyai as aai |
|
|
|
|
|
aai_api_key = os.environ.get("ASSEMBLYAI_API_KEY", "") |
|
if not aai_api_key: |
|
raise ValueError("AssemblyAI API key is not set. Please provide a valid API key.") |
|
|
|
aai.settings.api_key = aai_api_key |
|
|
|
config = aai.TranscriptionConfig(speaker_labels=False, word_boost=[], format_text=True) |
|
transcriber = aai.Transcriber(config=config) |
|
|
|
self.log("Submitting audio for transcription") |
|
transcript = transcriber.transcribe(audio_path) |
|
|
|
if not transcript or not transcript.words: |
|
raise ValueError("Transcription returned no words.") |
|
|
|
|
|
wordlevel_info = [] |
|
for word in transcript.words: |
|
word_data = { |
|
"word": word.text.strip(), |
|
"start": word.start / 1000.0, |
|
"end": word.end / 1000.0 |
|
} |
|
wordlevel_info.append(word_data) |
|
|
|
self.log(success(f"Transcription successful. Got {len(wordlevel_info)} words.")) |
|
|
|
|
|
|
|
if self.subtitle_font == "random": |
|
FONT = choose_random_font() |
|
self.log(f"Using random font: {FONT}") |
|
else: |
|
FONT = self.subtitle_font |
|
|
|
FONTSIZE = self.font_size |
|
COLOR = self.text_color |
|
BG_COLOR = self.highlight_color if self.highlighting_enabled else None |
|
FRAME_SIZE = (1080, 1920) |
|
|
|
|
|
MAX_CHARS = 30 |
|
MAX_DURATION = 3.0 |
|
MAX_GAP = 1.5 |
|
|
|
|
|
subtitles = [] |
|
line = [] |
|
line_duration = 0 |
|
|
|
for idx, word_data in enumerate(wordlevel_info): |
|
word = word_data["word"] |
|
start = word_data["start"] |
|
end = word_data["end"] |
|
|
|
line.append(word_data) |
|
line_duration += end - start |
|
|
|
temp = " ".join(item["word"] for item in line) |
|
new_line_chars = len(temp) |
|
|
|
duration_exceeded = line_duration > MAX_DURATION |
|
chars_exceeded = new_line_chars > MAX_CHARS |
|
|
|
if idx > 0: |
|
gap = word_data['start'] - wordlevel_info[idx-1]['end'] |
|
maxgap_exceeded = gap > MAX_GAP |
|
else: |
|
maxgap_exceeded = False |
|
|
|
if duration_exceeded or chars_exceeded or maxgap_exceeded: |
|
if line: |
|
subtitle_line = { |
|
"text": " ".join(item["word"] for item in line), |
|
"start": line[0]["start"], |
|
"end": line[-1]["end"], |
|
"words": line |
|
} |
|
subtitles.append(subtitle_line) |
|
line = [] |
|
line_duration = 0 |
|
|
|
|
|
if line: |
|
subtitle_line = { |
|
"text": " ".join(item["word"] for item in line), |
|
"start": line[0]["start"], |
|
"end": line[-1]["end"], |
|
"words": line |
|
} |
|
subtitles.append(subtitle_line) |
|
|
|
self.log(success(f"Generated {len(subtitles)} subtitle lines")) |
|
|
|
|
|
return { |
|
"wordlevel": wordlevel_info, |
|
"linelevel": subtitles, |
|
"settings": { |
|
"font": FONT, |
|
"fontsize": FONTSIZE, |
|
"color": COLOR, |
|
"bg_color": BG_COLOR, |
|
"position": self.subtitle_position, |
|
"highlighting_enabled": self.highlighting_enabled, |
|
"subtitles_enabled": self.subtitles_enabled |
|
} |
|
} |
|
|
|
except Exception as e: |
|
error_msg = f"Error generating subtitles: {str(e)}" |
|
self.log(error(error_msg)) |
|
raise Exception(error_msg) |
|
|
|
def create_subtitle_clip(self, subtitle_data, frame_size): |
|
"""Create subtitle clips for a line of text with word-level highlighting.""" |
|
|
|
if not subtitle_data.get("settings", {}).get("subtitles_enabled", True): |
|
self.log("Subtitles are disabled, skipping subtitle clip creation") |
|
return [] |
|
|
|
settings = subtitle_data["settings"] |
|
font_name = settings["font"] |
|
fontsize = settings["fontsize"] |
|
color = settings["color"] |
|
bg_color = settings["bg_color"] |
|
highlighting_enabled = settings["highlighting_enabled"] |
|
|
|
|
|
try: |
|
font_path = os.path.join(FONTS_DIR, f"{font_name}.ttf") |
|
if os.path.exists(font_path): |
|
pil_font = ImageFont.truetype(font_path, fontsize) |
|
else: |
|
self.log(warning(f"Font {font_name} not found, using default")) |
|
pil_font = ImageFont.load_default() |
|
except Exception as e: |
|
self.log(warning(f"Error loading font: {str(e)}")) |
|
pil_font = ImageFont.load_default() |
|
|
|
|
|
if color.startswith('#'): |
|
text_color_rgb = tuple(int(color.lstrip('#')[i:i+2], 16) for i in (0, 2, 4)) |
|
else: |
|
text_color_rgb = (255, 255, 255) |
|
|
|
if bg_color and bg_color.startswith('#'): |
|
bg_color_rgb = tuple(int(bg_color.lstrip('#')[i:i+2], 16) for i in (0, 2, 4)) |
|
else: |
|
bg_color_rgb = (0, 0, 255) |
|
|
|
|
|
clip_cache = {} |
|
|
|
def create_text_clip(text, bg_color=None, cache_key=None): |
|
|
|
if cache_key and cache_key in clip_cache: |
|
return clip_cache[cache_key] |
|
|
|
try: |
|
|
|
text_width, text_height = pil_font.getbbox(text)[2:4] |
|
|
|
|
|
padding = 10 |
|
img_width = text_width + padding * 2 |
|
img_height = text_height + padding * 2 |
|
|
|
|
|
if bg_color: |
|
img = Image.new('RGB', (img_width, img_height), color=bg_color_rgb) |
|
else: |
|
img = Image.new('RGBA', (img_width, img_height), color=(0, 0, 0, 0)) |
|
|
|
|
|
draw = ImageDraw.Draw(img) |
|
draw.text((padding, padding), text, font=pil_font, fill=text_color_rgb) |
|
|
|
|
|
img_array = np.array(img) |
|
clip = ImageClip(img_array) |
|
|
|
|
|
if cache_key: |
|
clip_cache[cache_key] = (clip, img_width, img_height) |
|
|
|
return clip, img_width, img_height |
|
|
|
except Exception as e: |
|
self.log(warning(f"Error creating text clip: {str(e)}")) |
|
|
|
img = Image.new('RGB', (100, 50), color=(100, 100, 100)) |
|
img_array = np.array(img) |
|
clip = ImageClip(img_array) |
|
return clip, 100, 50 |
|
|
|
subtitle_clips = [] |
|
|
|
|
|
if settings["position"] == "top": |
|
y_buffer = frame_size[1] * 0.1 |
|
elif settings["position"] == "middle": |
|
y_buffer = frame_size[1] * 0.4 |
|
else: |
|
y_buffer = frame_size[1] * 0.7 |
|
|
|
max_width = frame_size[0] * 0.8 |
|
|
|
|
|
word_groups = {} |
|
|
|
|
|
for line_idx, line in enumerate(subtitle_data["linelevel"]): |
|
|
|
line_text = line["text"] |
|
line_start = line["start"] |
|
line_end = line["end"] |
|
line_duration = line_end - line_start |
|
|
|
|
|
lines_data = [] |
|
current_line = [] |
|
current_x = 0 |
|
|
|
for word_data in line["words"]: |
|
word = word_data["word"] |
|
|
|
word_width = pil_font.getbbox(word)[2] + 20 |
|
word_height = pil_font.getbbox(word)[3] + 20 |
|
|
|
|
|
if current_x + word_width > max_width and current_line: |
|
|
|
lines_data.append({ |
|
"words": current_line.copy(), |
|
"total_width": current_x, |
|
"height": max(w["height"] for w in current_line) if current_line else word_height |
|
}) |
|
current_line = [] |
|
current_x = 0 |
|
|
|
|
|
word_info = { |
|
"word": word, |
|
"width": word_width, |
|
"height": word_height, |
|
"start": word_data["start"], |
|
"end": word_data["end"] |
|
} |
|
current_line.append(word_info) |
|
current_x += word_width |
|
|
|
|
|
if current_line: |
|
lines_data.append({ |
|
"words": current_line, |
|
"total_width": current_x, |
|
"height": max(w["height"] for w in current_line) |
|
}) |
|
|
|
|
|
current_y = y_buffer |
|
|
|
for line_data in lines_data: |
|
|
|
line_width = line_data["total_width"] |
|
x_center = (frame_size[0] - line_width) / 2 |
|
|
|
|
|
line_text = " ".join(w["word"] for w in line_data["words"]) |
|
cache_key = f"line_{line_idx}_{line_text}" |
|
line_clip, measured_width, _ = create_text_clip(line_text, None, cache_key) |
|
|
|
|
|
line_clip = line_clip.set_position((x_center, current_y)) |
|
line_clip = line_clip.set_start(line["start"]).set_duration(line_duration) |
|
subtitle_clips.append(line_clip) |
|
|
|
|
|
if highlighting_enabled and bg_color: |
|
current_x = x_center |
|
|
|
|
|
timing_groups = {} |
|
|
|
for word_info in line_data["words"]: |
|
timing_key = f"{word_info['start']:.3f}_{word_info['end']:.3f}" |
|
if timing_key not in timing_groups: |
|
timing_groups[timing_key] = [] |
|
timing_groups[timing_key].append((word_info, current_x)) |
|
current_x += word_info["width"] |
|
|
|
|
|
for timing_key, word_group in timing_groups.items(): |
|
start_time, end_time = map(float, timing_key.split('_')) |
|
|
|
|
|
if len(word_group) == 1: |
|
word_info, x_pos = word_group[0] |
|
word = word_info["word"] |
|
|
|
cache_key = f"word_{word}" |
|
highlight_clip, _, _ = create_text_clip(word, bg_color, cache_key) |
|
highlight_clip = highlight_clip.set_position((x_pos, current_y)) |
|
highlight_clip = highlight_clip.set_start(start_time).set_duration(end_time - start_time) |
|
subtitle_clips.append(highlight_clip) |
|
else: |
|
|
|
|
|
continue_batch = True |
|
batch_start_idx = 0 |
|
|
|
while continue_batch and batch_start_idx < len(word_group): |
|
|
|
batch = [word_group[batch_start_idx]] |
|
batch_x = word_group[batch_start_idx][1] |
|
current_batch_end = batch_start_idx |
|
|
|
|
|
for i in range(batch_start_idx + 1, len(word_group)): |
|
prev_word, prev_x = word_group[i-1] |
|
curr_word, curr_x = word_group[i] |
|
|
|
|
|
if abs(prev_x + prev_word["width"] - curr_x) < 5: |
|
batch.append(word_group[i]) |
|
current_batch_end = i |
|
else: |
|
break |
|
|
|
|
|
if len(batch) > 1: |
|
|
|
batch_text = " ".join(info[0]["word"] for info in batch) |
|
batch_width = batch[-1][1] + batch[-1][0]["width"] - batch[0][1] |
|
|
|
cache_key = f"batch_{batch_text}" |
|
highlight_clip, _, _ = create_text_clip(batch_text, bg_color, cache_key) |
|
highlight_clip = highlight_clip.set_position((batch_x, current_y)) |
|
highlight_clip = highlight_clip.set_start(start_time).set_duration(end_time - start_time) |
|
subtitle_clips.append(highlight_clip) |
|
else: |
|
|
|
word_info, x_pos = batch[0] |
|
word = word_info["word"] |
|
|
|
cache_key = f"word_{word}" |
|
highlight_clip, _, _ = create_text_clip(word, bg_color, cache_key) |
|
highlight_clip = highlight_clip.set_position((x_pos, current_y)) |
|
highlight_clip = highlight_clip.set_start(start_time).set_duration(end_time - start_time) |
|
subtitle_clips.append(highlight_clip) |
|
|
|
|
|
batch_start_idx = current_batch_end + 1 |
|
if batch_start_idx >= len(word_group): |
|
continue_batch = False |
|
|
|
|
|
current_y += line_data["height"] + 10 |
|
|
|
|
|
if len(subtitle_clips) > 200: |
|
self.log(warning(f"Too many subtitle clips ({len(subtitle_clips)}), limiting to 200 for performance")) |
|
subtitle_clips = subtitle_clips[:200] |
|
|
|
self.log(f"Created {len(subtitle_clips)} subtitle clips (optimized)") |
|
return subtitle_clips |
|
|
|
def combine(self) -> str: |
|
"""Combine images, audio, and subtitles into a final video.""" |
|
self.progress(0.8, desc="Creating final video") |
|
self.log("Combining images and audio into final video") |
|
try: |
|
|
|
import tempfile |
|
temp_dir = tempfile.mkdtemp() |
|
|
|
|
|
if hasattr(self, 'generation_folder') and os.path.exists(self.generation_folder): |
|
output_path = os.path.join(self.generation_folder, f"output_{int(time.time())}.mp4") |
|
else: |
|
output_path = os.path.join(STORAGE_DIR, f"output_{int(time.time())}.mp4") |
|
|
|
|
|
if not self.images: |
|
raise ValueError("No images available for video creation") |
|
|
|
if not hasattr(self, 'tts_path') or not self.tts_path or not os.path.exists(self.tts_path): |
|
raise ValueError("No TTS audio file available") |
|
|
|
|
|
tts_clip = AudioFileClip(self.tts_path) |
|
max_duration = tts_clip.duration |
|
|
|
|
|
num_images = len(self.images) |
|
req_dur = max_duration / num_images |
|
|
|
|
|
self.log("Processing images (optimized)") |
|
processed_clips = [] |
|
|
|
for image_path in self.images: |
|
if not os.path.exists(image_path): |
|
self.log(warning(f"Image not found: {image_path}, skipping")) |
|
continue |
|
|
|
try: |
|
|
|
clip = ImageClip(image_path) |
|
|
|
|
|
clip = clip.set_fps(15) |
|
|
|
|
|
aspect_ratio = 9/16 |
|
if clip.w / clip.h < aspect_ratio: |
|
|
|
clip = crop( |
|
clip, |
|
width=clip.w, |
|
height=round(clip.w / aspect_ratio), |
|
x_center=clip.w / 2, |
|
y_center=clip.h / 2 |
|
) |
|
else: |
|
|
|
clip = crop( |
|
clip, |
|
width=round(aspect_ratio * clip.h), |
|
height=clip.h, |
|
x_center=clip.w / 2, |
|
y_center=clip.h / 2 |
|
) |
|
|
|
|
|
clip = clip.resize((720, 1280)) |
|
|
|
processed_clips.append(clip) |
|
except Exception as e: |
|
self.log(warning(f"Error processing image {image_path}: {str(e)}")) |
|
|
|
if not processed_clips: |
|
raise ValueError("No valid images could be processed") |
|
|
|
|
|
self.log(f"Creating video sequence from {len(processed_clips)} clips") |
|
final_clips = [] |
|
tot_dur = 0 |
|
|
|
while tot_dur < max_duration: |
|
for base_clip in processed_clips: |
|
duration = min(req_dur, max_duration - tot_dur) |
|
if duration <= 0: |
|
break |
|
|
|
|
|
duration_clip = base_clip.set_duration(duration) |
|
final_clips.append(duration_clip) |
|
tot_dur += duration |
|
|
|
if tot_dur >= max_duration: |
|
break |
|
|
|
|
|
self.log(f"Concatenating {len(final_clips)} clips") |
|
final_clip = concatenate_videoclips(final_clips) |
|
final_clip = final_clip.set_fps(15) |
|
|
|
|
|
final_audio = tts_clip |
|
|
|
|
|
if hasattr(self, 'enable_music') and self.enable_music and self.music_file != "none": |
|
music_path = None |
|
if self.music_file == "random": |
|
music_path = choose_random_music() |
|
elif os.path.exists(os.path.join(MUSIC_DIR, self.music_file)): |
|
music_path = os.path.join(MUSIC_DIR, self.music_file) |
|
|
|
if music_path and os.path.exists(music_path): |
|
self.log(f"Adding background music: {music_path}") |
|
try: |
|
music_clip = AudioFileClip(music_path) |
|
|
|
if music_clip.duration < max_duration: |
|
num_loops = int(np.ceil(max_duration / music_clip.duration)) |
|
music_clip = concatenate_audioclips([music_clip] * num_loops) |
|
|
|
music_clip = music_clip.subclip(0, max_duration) |
|
|
|
music_volume = getattr(self, 'music_volume', 0.1) |
|
music_clip = music_clip.volumex(music_volume) |
|
|
|
final_audio = CompositeAudioClip([tts_clip, music_clip]) |
|
except Exception as e: |
|
self.log(warning(f"Error processing music: {str(e)}")) |
|
|
|
|
|
final_clip = final_clip.set_audio(final_audio) |
|
|
|
|
|
if self.subtitles_enabled and hasattr(self, 'subtitle_data'): |
|
self.log("Adding subtitles (optimized)") |
|
subtitle_clips = self.create_subtitle_clip(self.subtitle_data, (720, 1280)) |
|
if subtitle_clips: |
|
final_clip = CompositeVideoClip([final_clip] + subtitle_clips) |
|
|
|
|
|
self.log("Writing final video file (optimized encoding)") |
|
final_clip.write_videofile( |
|
output_path, |
|
fps=15, |
|
codec="libx264", |
|
audio_codec="aac", |
|
threads=8, |
|
preset="ultrafast", |
|
ffmpeg_params=["-crf", "28"] |
|
) |
|
|
|
|
|
import shutil |
|
try: |
|
shutil.rmtree(temp_dir, ignore_errors=True) |
|
except Exception: |
|
pass |
|
|
|
self.log(success(f"Video saved to: {output_path}")) |
|
return output_path |
|
|
|
except Exception as e: |
|
error_msg = f"Error combining video: {str(e)}" |
|
self.log(error(error_msg)) |
|
raise Exception(error_msg) |
|
|
|
def generate_video(self) -> dict: |
|
"""Generate complete video with all components.""" |
|
try: |
|
self.log("Starting video generation process") |
|
|
|
|
|
folder_num = 1 |
|
|
|
if os.path.exists(STORAGE_DIR): |
|
existing_folders = [d for d in os.listdir(STORAGE_DIR) if os.path.isdir(os.path.join(STORAGE_DIR, d))] |
|
numbered_folders = [] |
|
for folder in existing_folders: |
|
try: |
|
|
|
if "_" in folder: |
|
num = int(folder.split("_")[0]) |
|
numbered_folders.append(num) |
|
except (ValueError, IndexError): |
|
continue |
|
|
|
if numbered_folders: |
|
folder_num = max(numbered_folders) + 1 |
|
|
|
folder_id = f"{folder_num}_{str(uuid.uuid4())}" |
|
self.generation_folder = os.path.join(STORAGE_DIR, folder_id) |
|
os.makedirs(self.generation_folder, exist_ok=True) |
|
self.log(f"Created generation folder: {self.generation_folder}") |
|
|
|
try: |
|
|
|
self.log("Generating topic") |
|
self.generate_topic() |
|
|
|
|
|
self.progress(0.1, desc="Creating script") |
|
self.log("Generating script") |
|
self.generate_script() |
|
|
|
|
|
self.progress(0.2, desc="Creating metadata") |
|
self.log("Generating metadata") |
|
self.generate_metadata() |
|
|
|
|
|
self.progress(0.3, desc="Creating image prompts") |
|
self.log("Generating image prompts") |
|
self.generate_prompts() |
|
|
|
|
|
self.progress(0.4, desc="Generating images") |
|
self.log("Generating images") |
|
for i, prompt in enumerate(self.image_prompts, 1): |
|
self.progress(0.4 + 0.2 * (i / len(self.image_prompts)), |
|
desc=f"Generating image {i}/{len(self.image_prompts)}") |
|
self.log(f"Generating image {i}/{len(self.image_prompts)}") |
|
self.generate_image(prompt) |
|
|
|
|
|
self.progress(0.6, desc="Creating speech") |
|
self.log("Generating speech") |
|
self.generate_speech(self.script) |
|
|
|
|
|
self.progress(0.7, desc="Generating subtitles") |
|
if self.subtitles_enabled and hasattr(self, 'tts_path') and os.path.exists(self.tts_path): |
|
self.subtitle_data = self.generate_subtitles(self.tts_path) |
|
|
|
if self.subtitle_data: |
|
try: |
|
|
|
if 'wordlevel' in self.subtitle_data: |
|
word_subtitles_path = os.path.join(self.generation_folder, "word_subtitles.json") |
|
with open(word_subtitles_path, 'w') as f: |
|
json.dump(self.subtitle_data['wordlevel'], f, indent=2) |
|
self.log(f"Saved word-level subtitles to: {word_subtitles_path}") |
|
|
|
|
|
if 'linelevel' in self.subtitle_data: |
|
line_subtitles_path = os.path.join(self.generation_folder, "line_subtitles.json") |
|
with open(line_subtitles_path, 'w') as f: |
|
json.dump(self.subtitle_data['linelevel'], f, indent=2) |
|
self.log(f"Saved line-level subtitles to: {line_subtitles_path}") |
|
except Exception as e: |
|
self.log(warning(f"Error saving subtitles to generation folder: {str(e)}")) |
|
|
|
|
|
self.progress(0.75, desc="Saving generation data") |
|
try: |
|
content_path = os.path.join(self.generation_folder, "content.txt") |
|
with open(content_path, 'w', encoding='utf-8') as f: |
|
f.write(f"NICHE: {self.niche}\n\n") |
|
f.write(f"LANGUAGE: {self.language}\n\n") |
|
f.write(f"GENERATED TOPIC: {self.subject}\n\n") |
|
f.write(f"GENERATED SCRIPT:\n{self.script}\n\n") |
|
f.write(f"GENERATED PROMPTS:\n") |
|
for i, prompt in enumerate(self.image_prompts, 1): |
|
f.write(f"{i}. {prompt}\n") |
|
f.write("\n") |
|
f.write(f"GENERATED METADATA:\n") |
|
for key, value in self.metadata.items(): |
|
f.write(f"{key}: {value}\n") |
|
self.log(f"Saved content.txt to: {content_path}") |
|
except Exception as e: |
|
self.log(warning(f"Error saving content.txt: {str(e)}")) |
|
|
|
|
|
self.progress(0.8, desc="Creating final video") |
|
self.log("Combining all elements into final video (optimized rendering)") |
|
|
|
|
|
import gc |
|
gc.collect() |
|
|
|
path = self.combine() |
|
|
|
self.progress(0.95, desc="Finalizing") |
|
self.log(f"Video generation complete. Files saved in: {self.generation_folder}") |
|
|
|
|
|
return { |
|
'video_path': path, |
|
'generation_folder': self.generation_folder, |
|
'title': self.metadata['title'], |
|
'description': self.metadata['description'], |
|
'subject': self.subject, |
|
'script': self.script, |
|
'logs': self.logs |
|
} |
|
except Exception as e: |
|
error_msg = f"Error during video generation step: {str(e)}" |
|
self.log(error(error_msg)) |
|
|
|
self.cleanup_resources() |
|
raise Exception(error_msg) |
|
|
|
except Exception as e: |
|
error_msg = f"Error during video generation: {str(e)}" |
|
self.log(error(error_msg)) |
|
raise Exception(error_msg) |
|
|
|
def cleanup_resources(self): |
|
"""Clean up any resources to prevent memory leaks.""" |
|
try: |
|
|
|
import psutil |
|
for proc in psutil.process_iter(): |
|
try: |
|
|
|
if 'magick' in proc.name().lower() or 'ffmpeg' in proc.name().lower(): |
|
proc.kill() |
|
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): |
|
pass |
|
|
|
|
|
import gc |
|
gc.collect() |
|
except Exception as e: |
|
self.log(warning(f"Error during resource cleanup: {str(e)}")) |
|
pass |
|
|
|
|
|
def get_text_generator_models(generator): |
|
"""Get available models for the selected text generator.""" |
|
models = { |
|
"gemini": [ |
|
"gemini-2.0-flash", |
|
"gemini-2.0-flash-lite", |
|
"gemini-1.5-flash", |
|
"gemini-1.5-flash-8b", |
|
"gemini-1.5-pro" |
|
], |
|
"g4f": [ |
|
"gpt-4", |
|
"gpt-4o", |
|
"gpt-3.5-turbo", |
|
"llama-3-70b-chat", |
|
"claude-3-opus-20240229", |
|
"claude-3-sonnet-20240229", |
|
"claude-3-haiku-20240307" |
|
], |
|
"openai": [ |
|
"gpt-4o", |
|
"gpt-4-turbo", |
|
"gpt-3.5-turbo" |
|
] |
|
} |
|
return models.get(generator, ["default"]) |
|
|
|
def get_image_generator_models(generator): |
|
"""Get available models for the selected image generator.""" |
|
models = { |
|
"prodia": [ |
|
"sdxl", |
|
"realvisxl", |
|
"juggernaut", |
|
"dreamshaper", |
|
"dalle" |
|
], |
|
"hercai": [ |
|
"v1", |
|
"v2", |
|
"v3", |
|
"lexica" |
|
], |
|
"g4f": [ |
|
"flux", |
|
"dall-e-3", |
|
"dall-e-2", |
|
"midjourney" |
|
], |
|
"segmind": [ |
|
"sdxl-turbo", |
|
"realistic-vision", |
|
"sd3" |
|
], |
|
"pollinations": [ |
|
"default" |
|
] |
|
} |
|
return models.get(generator, ["default"]) |
|
|
|
def get_tts_voices(engine): |
|
"""Get available voices for the selected TTS engine.""" |
|
voices = { |
|
"elevenlabs": [ |
|
"Sarah", |
|
"Brian", |
|
"Lily", |
|
"Monika Sogam", |
|
"George", |
|
"River", |
|
"Matilda", |
|
"Will", |
|
"Jessica" |
|
], |
|
"openai": [ |
|
"alloy", |
|
"echo", |
|
"fable", |
|
"onyx", |
|
"nova", |
|
"shimmer" |
|
], |
|
"edge": [ |
|
"en-US-AriaNeural", |
|
"en-US-GuyNeural", |
|
"en-GB-SoniaNeural", |
|
"en-AU-NatashaNeural" |
|
], |
|
"gtts": [ |
|
"en", |
|
"es", |
|
"fr", |
|
"de", |
|
"it", |
|
"pt", |
|
"ru", |
|
"ja", |
|
"zh", |
|
"hi" |
|
] |
|
} |
|
return voices.get(engine, ["default"]) |
|
|
|
|
|
def create_interface(): |
|
with gr.Blocks(theme=gr.themes.Soft(primary_hue="indigo", radius_size="lg"), title="YouTube Shorts Generator") as demo: |
|
with gr.Row(): |
|
gr.Markdown( |
|
""" |
|
# 📱 YouTube Shorts Generator |
|
Generate engaging YouTube Shorts videos with AI. Just provide a niche and language to get started! |
|
""" |
|
) |
|
|
|
with gr.Row(equal_height=True): |
|
|
|
with gr.Column(scale=2, min_width=500): |
|
with gr.Group(): |
|
gr.Markdown("### 📝 Content") |
|
niche = gr.Textbox( |
|
label="Niche/Topic", |
|
placeholder="What's your video about?", |
|
value="Historical Facts" |
|
) |
|
language = gr.Dropdown( |
|
choices=["English", "Spanish", "French", "German", "Italian", "Portuguese", |
|
"Russian", "Japanese", "Chinese", "Hindi"], |
|
label="Language", |
|
value="English" |
|
) |
|
|
|
|
|
with gr.Group(): |
|
gr.Markdown("### 🔧 Generator Settings") |
|
with gr.Tabs(): |
|
with gr.TabItem("Text"): |
|
text_gen = gr.Dropdown( |
|
choices=["g4f", "gemini", "openai"], |
|
label="Text Generator", |
|
value="g4f" |
|
) |
|
text_model = gr.Dropdown( |
|
choices=get_text_generator_models("g4f"), |
|
label="Text Model", |
|
value="gpt-4" |
|
) |
|
|
|
with gr.TabItem("Image"): |
|
image_gen = gr.Dropdown( |
|
choices=["g4f", "prodia", "hercai", "segmind", "pollinations"], |
|
label="Image Generator", |
|
value="g4f" |
|
) |
|
image_model = gr.Dropdown( |
|
choices=get_image_generator_models("g4f"), |
|
label="Image Model", |
|
value="flux" |
|
) |
|
|
|
with gr.TabItem("Speech"): |
|
tts_engine = gr.Dropdown( |
|
choices=["edge", "elevenlabs", "gtts", "openai"], |
|
label="Speech Generator", |
|
value="edge" |
|
) |
|
tts_voice = gr.Dropdown( |
|
choices=get_tts_voices("edge"), |
|
label="Voice", |
|
value="en-US-AriaNeural" |
|
) |
|
|
|
with gr.TabItem("Audio"): |
|
enable_music = gr.Checkbox(label="Enable Background Music", value=True) |
|
|
|
music_choices = get_music_files() |
|
default_music = "none" if "random" not in music_choices else "random" |
|
music_file = gr.Dropdown( |
|
choices=music_choices, |
|
label="Background Music", |
|
value=default_music, |
|
interactive=True |
|
) |
|
music_volume = gr.Slider( |
|
minimum=0.0, |
|
maximum=1.0, |
|
value=0.1, |
|
step=0.05, |
|
label="Background Music Volume" |
|
) |
|
|
|
with gr.TabItem("Subtitles"): |
|
subtitles_enabled = gr.Checkbox(label="Enable Subtitles", value=True) |
|
highlighting_enabled = gr.Checkbox(label="Enable Word Highlighting", value=True) |
|
subtitle_font = gr.Dropdown( |
|
choices=get_font_files(), |
|
label="Font", |
|
value="random" |
|
) |
|
with gr.Row(): |
|
font_size = gr.Slider( |
|
minimum=40, |
|
maximum=120, |
|
value=80, |
|
step=5, |
|
label="Font Size" |
|
) |
|
subtitle_position = gr.Dropdown( |
|
choices=["bottom", "middle", "top"], |
|
label="Position", |
|
value="bottom" |
|
) |
|
with gr.Row(): |
|
text_color = gr.ColorPicker(label="Text Color", value="#FFFFFF") |
|
highlight_color = gr.ColorPicker(label="Highlight Color", value="#0000FF") |
|
|
|
|
|
generate_btn = gr.Button("🎬 Generate Video", variant="primary", size="lg") |
|
|
|
|
|
with gr.Column(scale=1, min_width=300): |
|
with gr.Tabs(): |
|
with gr.TabItem("Video"): |
|
|
|
video_output = gr.Video(label="Generated Video", height=580, width=330) |
|
|
|
with gr.TabItem("Metadata"): |
|
title_output = gr.Textbox(label="Title", lines=2) |
|
description_output = gr.Textbox(label="Description", lines=4) |
|
script_output = gr.Textbox(label="Script", lines=8) |
|
|
|
|
|
with gr.TabItem("🔑 API Keys"): |
|
gemini_api_key = gr.Textbox( |
|
label="Gemini API Key", |
|
type="password", |
|
value=os.environ.get("GEMINI_API_KEY", "") |
|
) |
|
assemblyai_api_key = gr.Textbox( |
|
label="AssemblyAI API Key", |
|
type="password", |
|
value=os.environ.get("ASSEMBLYAI_API_KEY", "") |
|
) |
|
elevenlabs_api_key = gr.Textbox( |
|
label="ElevenLabs API Key", |
|
type="password", |
|
value=os.environ.get("ELEVENLABS_API_KEY", "") |
|
) |
|
segmind_api_key = gr.Textbox( |
|
label="Segmind API Key", |
|
type="password", |
|
value=os.environ.get("SEGMIND_API_KEY", "") |
|
) |
|
openai_api_key = gr.Textbox( |
|
label="OpenAI API Key", |
|
type="password", |
|
value=os.environ.get("OPENAI_API_KEY", "") |
|
) |
|
|
|
with gr.TabItem("Log"): |
|
log_output = gr.Textbox(label="Process Log", lines=15, max_lines=100) |
|
|
|
|
|
def update_text_models(generator): |
|
return gr.Dropdown(choices=get_text_generator_models(generator)) |
|
|
|
def update_image_models(generator): |
|
return gr.Dropdown(choices=get_image_generator_models(generator)) |
|
|
|
def update_tts_voices(engine): |
|
return gr.Dropdown(choices=get_tts_voices(engine)) |
|
|
|
|
|
text_gen.change(fn=update_text_models, inputs=text_gen, outputs=text_model) |
|
image_gen.change(fn=update_image_models, inputs=image_gen, outputs=image_model) |
|
tts_engine.change(fn=update_tts_voices, inputs=tts_engine, outputs=tts_voice) |
|
|
|
|
|
def generate_youtube_short(niche, language, text_gen, text_model, image_gen, image_model, |
|
tts_engine, tts_voice, subtitles_enabled, highlighting_enabled, |
|
subtitle_font, font_size, subtitle_position, |
|
text_color, highlight_color, music_file, |
|
enable_music, music_volume, |
|
gemini_api_key, assemblyai_api_key, |
|
elevenlabs_api_key, segmind_api_key, openai_api_key, |
|
progress=gr.Progress()): |
|
|
|
if not niche.strip(): |
|
return { |
|
video_output: None, |
|
title_output: "ERROR: Please enter a niche/topic", |
|
description_output: "", |
|
script_output: "", |
|
log_output: "Error: Niche/Topic is required. Please enter a valid topic and try again." |
|
} |
|
|
|
|
|
api_keys = { |
|
'gemini': gemini_api_key, |
|
'assemblyai': assemblyai_api_key, |
|
'elevenlabs': elevenlabs_api_key, |
|
'segmind': segmind_api_key, |
|
'openai': openai_api_key |
|
} |
|
|
|
try: |
|
|
|
yt = YouTube( |
|
niche=niche, |
|
language=language, |
|
text_gen=text_gen, |
|
text_model=text_model, |
|
image_gen=image_gen, |
|
image_model=image_model, |
|
tts_engine=tts_engine, |
|
tts_voice=tts_voice, |
|
subtitle_font=subtitle_font, |
|
font_size=font_size, |
|
text_color=text_color, |
|
highlight_color=highlight_color, |
|
subtitles_enabled=subtitles_enabled, |
|
highlighting_enabled=highlighting_enabled, |
|
subtitle_position=subtitle_position, |
|
music_file=music_file, |
|
enable_music=enable_music, |
|
music_volume=music_volume, |
|
api_keys=api_keys, |
|
progress=progress |
|
) |
|
|
|
|
|
result = yt.generate_video() |
|
|
|
|
|
if not result or not result.get('video_path') or not os.path.exists(result.get('video_path', '')): |
|
return { |
|
video_output: None, |
|
title_output: "ERROR: Video generation failed", |
|
description_output: "", |
|
script_output: "", |
|
log_output: "\n".join(yt.logs) |
|
} |
|
|
|
return { |
|
video_output: result['video_path'], |
|
title_output: result['title'], |
|
description_output: result['description'], |
|
script_output: result['script'], |
|
log_output: "\n".join(result['logs']) |
|
} |
|
|
|
except Exception as e: |
|
import traceback |
|
error_details = f"Error: {str(e)}\n\n{traceback.format_exc()}" |
|
return { |
|
video_output: None, |
|
title_output: f"ERROR: {str(e)}", |
|
description_output: "", |
|
script_output: "", |
|
log_output: error_details |
|
} |
|
|
|
|
|
generate_btn.click( |
|
fn=generate_youtube_short, |
|
inputs=[ |
|
niche, language, text_gen, text_model, image_gen, image_model, |
|
tts_engine, tts_voice, subtitles_enabled, highlighting_enabled, |
|
subtitle_font, font_size, subtitle_position, text_color, highlight_color, music_file, |
|
enable_music, music_volume, gemini_api_key, assemblyai_api_key, elevenlabs_api_key, segmind_api_key, openai_api_key |
|
], |
|
outputs=[video_output, title_output, description_output, script_output, log_output] |
|
) |
|
|
|
|
|
music_choices = get_music_files() |
|
default_music = "none" if "random" not in music_choices else "random" |
|
|
|
gr.Examples( |
|
[ |
|
["Historical Facts", "English", "g4f", "gpt-4", "g4f", "flux", "edge", "en-US-AriaNeural", True, True, "default", 80, "bottom", "#FFFFFF", "#0000FF", default_music, True, 0.1], |
|
["Cooking Tips", "English", "g4f", "gpt-4", "g4f", "flux", "edge", "en-US-AriaNeural", True, True, "default", 80, "bottom", "#FFFFFF", "#FF0000", default_music, True, 0.1], |
|
["Technology News", "English", "g4f", "gpt-4", "g4f", "flux", "edge", "en-US-GuyNeural", True, True, "default", 80, "bottom", "#FFFFFF", "#00FF00", default_music, True, 0.1], |
|
], |
|
[niche, language, text_gen, text_model, image_gen, image_model, tts_engine, tts_voice, |
|
subtitles_enabled, highlighting_enabled, subtitle_font, font_size, |
|
subtitle_position, text_color, highlight_color, music_file, enable_music, music_volume], |
|
label="Quick Start Templates" |
|
) |
|
|
|
return demo |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
os.makedirs(STATIC_DIR, exist_ok=True) |
|
os.makedirs(MUSIC_DIR, exist_ok=True) |
|
os.makedirs(FONTS_DIR, exist_ok=True) |
|
os.makedirs(STORAGE_DIR, exist_ok=True) |
|
|
|
|
|
demo = create_interface() |
|
demo.launch() |