INVIDEO_BASIC / app.py
gnosticdev's picture
Update app.py
711e3d2 verified
raw
history blame
33.2 kB
import os
import asyncio
import logging
import tempfile
import requests
from datetime import datetime
import edge_tts
from gtts import gTTS
import gradio as gr
import torch
from transformers import GPT2Tokenizer, GPT2LMHeadModel
from keybert import KeyBERT
from moviepy.editor import VideoFileClip, concatenate_videoclips, AudioFileClip, CompositeAudioClip, concatenate_audioclips, AudioClip
import re
import math
import shutil
import json
from collections import Counter
import time
# Configuración de logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('video_generator_full.log', encoding='utf-8')
]
)
logger = logging.getLogger(__name__)
logger.info("="*80)
logger.info("INICIO DE EJECUCIÓN - GENERADOR DE VIDEOS")
logger.info("="*80)
# Diccionario de voces TTS disponibles organizadas por idioma
VOCES_DISPONIBLES = {
"Español (España)": {
"es-ES-JuanNeural": "Juan (España) - Masculino",
"es-ES-ElviraNeural": "Elvira (España) - Femenino",
"es-ES-AlvaroNeural": "Álvaro (España) - Masculino",
"es-ES-AbrilNeural": "Abril (España) - Femenino",
"es-ES-ArnauNeural": "Arnau (España) - Masculino",
"es-ES-DarioNeural": "Darío (España) - Masculino",
"es-ES-EliasNeural": "Elías (España) - Masculino",
"es-ES-EstrellaNeural": "Estrella (España) - Femenino",
"es-ES-IreneNeural": "Irene (España) - Femenino",
"es-ES-LaiaNeural": "Laia (España) - Femenino",
"es-ES-LiaNeural": "Lía (España) - Femenino",
"es-ES-NilNeural": "Nil (España) - Masculino",
"es-ES-SaulNeural": "Saúl (España) - Masculino",
"es-ES-TeoNeural": "Teo (España) - Masculino",
"es-ES-TrianaNeural": "Triana (España) - Femenino",
"es-ES-VeraNeural": "Vera (España) - Femenino"
},
"Español (México)": {
"es-MX-JorgeNeural": "Jorge (México) - Masculino",
"es-MX-DaliaNeural": "Dalia (México) - Femenino",
"es-MX-BeatrizNeural": "Beatriz (México) - Femenino",
"es-MX-CandelaNeural": "Candela (México) - Femenino",
"es-MX-CarlotaNeural": "Carlota (México) - Femenino",
"es-MX-CecilioNeural": "Cecilio (México) - Masculino",
"es-MX-GerardoNeural": "Gerardo (México) - Masculino",
"es-MX-LarissaNeural": "Larissa (México) - Femenino",
"es-MX-LibertoNeural": "Liberto (México) - Masculino",
"es-MX-LucianoNeural": "Luciano (México) - Masculino",
"es-MX-MarinaNeural": "Marina (México) - Femenino",
"es-MX-NuriaNeural": "Nuria (México) - Femenino",
"es-MX-PelayoNeural": "Pelayo (México) - Masculino",
"es-MX-RenataNeural": "Renata (México) - Femenino",
"es-MX-YagoNeural": "Yago (México) - Masculino"
},
"Español (Argentina)": {
"es-AR-TomasNeural": "Tomás (Argentina) - Masculino",
"es-AR-ElenaNeural": "Elena (Argentina) - Femenino"
},
"Español (Colombia)": {
"es-CO-GonzaloNeural": "Gonzalo (Colombia) - Masculino",
"es-CO-SalomeNeural": "Salomé (Colombia) - Femenino"
},
"Español (Chile)": {
"es-CL-LorenzoNeural": "Lorenzo (Chile) - Masculino",
"es-CL-CatalinaNeural": "Catalina (Chile) - Femenino"
},
"Español (Perú)": {
"es-PE-AlexNeural": "Alex (Perú) - Masculino",
"es-PE-CamilaNeural": "Camila (Perú) - Femenino"
},
"Español (Venezuela)": {
"es-VE-PaolaNeural": "Paola (Venezuela) - Femenino",
"es-VE-SebastianNeural": "Sebastián (Venezuela) - Masculino"
},
"Español (Estados Unidos)": {
"es-US-AlonsoNeural": "Alonso (Estados Unidos) - Masculino",
"es-US-PalomaNeural": "Paloma (Estados Unidos) - Femenino"
}
}
# Función para obtener lista plana de voces para el dropdown
def get_voice_choices():
choices = []
for region, voices in VOCES_DISPONIBLES.items():
for voice_id, voice_name in voices.items():
choices.append((f"{voice_name} ({region})", voice_id))
return choices
# Obtener las voces al inicio del script
AVAILABLE_VOICES = get_voice_choices()
DEFAULT_VOICE_ID = "es-MX-DaliaNeural" # Cambiado a una voz más estable
DEFAULT_VOICE_NAME = DEFAULT_VOICE_ID
for text, voice_id in AVAILABLE_VOICES:
if voice_id == DEFAULT_VOICE_ID:
DEFAULT_VOICE_NAME = text
break
if DEFAULT_VOICE_ID not in [v[1] for v in AVAILABLE_VOICES]:
DEFAULT_VOICE_ID = AVAILABLE_VOICES[0][1] if AVAILABLE_VOICES else "es-MX-DaliaNeural"
DEFAULT_VOICE_NAME = AVAILABLE_VOICES[0][0] if AVAILABLE_VOICES else "Dalia (México) - Femenino"
logger.info(f"Voz por defecto seleccionada (ID): {DEFAULT_VOICE_ID}")
# Clave API de Pexels
PEXELS_API_KEY = os.environ.get("PEXELS_API_KEY")
if not PEXELS_API_KEY:
logger.critical("NO SE ENCONTRÓ PEXELS_API_KEY EN VARIABLES DE ENTORNO")
# Inicialización de modelos
MODEL_NAME = "datificate/gpt2-small-spanish"
logger.info(f"Inicializando modelo GPT-2: {MODEL_NAME}")
tokenizer = None
model = None
try:
tokenizer = GPT2Tokenizer.from_pretrained(MODEL_NAME)
model = GPT2LMHeadModel.from_pretrained(MODEL_NAME).eval()
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
logger.info(f"Modelo GPT-2 cargado | Vocabulario: {len(tokenizer)} tokens")
except Exception as e:
logger.error(f"FALLA CRÍTICA al cargar GPT-2: {str(e)}", exc_info=True)
tokenizer = model = None
logger.info("Cargando modelo KeyBERT...")
kw_model = None
try:
kw_model = KeyBERT('distilbert-base-multilingual-cased')
logger.info("KeyBERT inicializado correctamente")
except Exception as e:
logger.error(f"FALLA al cargar KeyBERT: {str(e)}", exc_info=True)
kw_model = None
def buscar_videos_pexels(query, api_key, per_page=5):
if not api_key:
logger.warning("No se puede buscar en Pexels: API Key no configurada.")
return []
logger.debug(f"Buscando en Pexels: '{query}' | Resultados: {per_page}")
headers = {"Authorization": api_key}
try:
params = {
"query": query,
"per_page": per_page,
"orientation": "landscape",
"size": "medium"
}
response = requests.get(
"https://api.pexels.com/videos/search",
headers=headers,
params=params,
timeout=20
)
response.raise_for_status()
data = response.json()
videos = data.get('videos', [])
logger.info(f"Pexels: {len(videos)} videos encontrados para '{query}'")
return videos
except requests.exceptions.RequestException as e:
logger.error(f"Error de conexión Pexels para '{query}': {str(e)}")
return []
except json.JSONDecodeError:
logger.error(f"Pexels: JSON inválido recibido | Status: {response.status_code}")
return []
except Exception as e:
logger.error(f"Error inesperado Pexels para '{query}': {str(e)}")
return []
def generate_script(prompt, max_length=150):
logger.info(f"Generando guión | Prompt: '{prompt[:50]}...' | Longitud máxima: {max_length}")
if not tokenizer or not model:
logger.warning("Modelos GPT-2 no disponibles - Usando prompt original como guion.")
return prompt.strip()
instruction_phrase_start = "Escribe un guion corto, interesante y coherente sobre:"
ai_prompt = f"{instruction_phrase_start} {prompt}"
try:
inputs = tokenizer(ai_prompt, return_tensors="pt", truncation=True, max_length=512)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
inputs = {k: v.to(device) for k, v in inputs.items()}
outputs = model.generate(
**inputs,
max_length=max_length + inputs[list(inputs.keys())[0]].size(1),
do_sample=True,
top_p=0.9,
top_k=40,
temperature=0.7,
repetition_penalty=1.2,
pad_token_id=tokenizer.pad_token_id,
eos_token_id=tokenizer.eos_token_id,
no_repeat_ngram_size=3
)
text = tokenizer.decode(outputs[0], skip_special_tokens=True)
prompt_in_output_idx = text.lower().find(prompt.lower())
if prompt_in_output_idx != -1:
cleaned_text = text[prompt_in_output_idx + len(prompt):].strip()
logger.debug("Texto limpiado tomando parte después del prompt original.")
else:
instruction_start_idx = text.find(instruction_phrase_start)
if instruction_start_idx != -1:
cleaned_text = text[instruction_start_idx + len(instruction_phrase_start):].strip()
logger.debug("Texto limpiado tomando parte después de la frase de instrucción base.")
else:
logger.warning("No se pudo identificar el inicio del guión generado.")
cleaned_text = text.strip()
cleaned_text = re.sub(r'<[^>]+>', '', cleaned_text).strip()
cleaned_text = cleaned_text.lstrip(':').lstrip('.').strip()
sentences = cleaned_text.split('.')
if sentences and sentences[0].strip():
final_text = sentences[0].strip() + '.'
if len(sentences) > 1 and sentences[1].strip() and len(final_text.split()) < max_length * 0.7:
final_text += " " + sentences[1].strip() + "."
final_text = final_text.replace("..", ".")
logger.info(f"Guion generado final (Truncado a 100 chars): '{final_text[:100]}...'")
return final_text.strip()
logger.info(f"Guion generado final (sin oraciones completas detectadas): '{cleaned_text[:100]}...'")
return cleaned_text.strip()
except Exception as e:
logger.error(f"Error generando guion con GPT-2: {str(e)}")
return prompt.strip()
async def text_to_speech(text, output_path, voice):
logger.info(f"Convirtiendo texto a voz | Caracteres: {len(text)} | Voz: {voice}")
if not text or not text.strip():
logger.warning("Texto vacío para TTS")
return False
try:
communicate = edge_tts.Communicate(text, voice)
await communicate.save(output_path)
if os.path.exists(output_path) and os.path.getsize(output_path) > 100:
logger.info(f"Audio guardado exitosamente con edge_tts en: {output_path}")
return True
logger.warning(f"edge_tts falló, intentando gTTS...")
except Exception as e:
logger.error(f"Error en edge_tts con voz '{voice}': {str(e)}")
try:
tts = gTTS(text=text, lang='es')
tts.save(output_path)
if os.path.exists(output_path) and os.path.getsize(output_path) > 100:
logger.info(f"Audio guardado exitosamente con gTTS en: {output_path}")
return True
logger.error(f"gTTS falló o archivo vacío en: {output_path}")
return False
except Exception as e:
logger.error(f"Error en gTTS: {str(e)}")
return False
def download_video_file(url, temp_dir):
if not url:
logger.warning("URL de video no proporcionada")
return None
try:
logger.info(f"Descargando video desde: {url[:80]}...")
os.makedirs(temp_dir, exist_ok=True)
file_name = f"video_dl_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}.mp4"
output_path = os.path.join(temp_dir, file_name)
with requests.get(url, stream=True, timeout=60) as r:
r.raise_for_status()
with open(output_path, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
if os.path.exists(output_path) and os.path.getsize(output_path) > 1000:
logger.info(f"Video descargado exitosamente: {output_path}")
return output_path
logger.warning(f"Descarga parece incompleta o vacía: {output_path}")
if os.path.exists(output_path):
os.remove(output_path)
return None
except requests.exceptions.RequestException as e:
logger.error(f"Error de descarga para {url[:80]}...: {str(e)}")
return None
except Exception as e:
logger.error(f"Error inesperado descargando {url[:80]}...: {str(e)}")
return None
def loop_audio_to_length(audio_clip, target_duration):
logger.debug(f"Ajustando audio | Duración actual: {audio_clip.duration:.2f}s | Objetivo: {target_duration:.2f}s")
if audio_clip is None or audio_clip.duration is None or audio_clip.duration <= 0:
logger.warning("Input audio clip is invalid")
sr = getattr(audio_clip, 'fps', 44100) if audio_clip else 44100
return AudioClip(lambda t: 0, duration=target_duration, fps=sr)
if audio_clip.duration >= target_duration:
logger.debug("Audio clip ya es suficientemente largo. Recortando.")
return audio_clip.subclip(0, target_duration)
loops = math.ceil(target_duration / audio_clip.duration)
logger.debug(f"Creando {loops} loops de audio")
try:
looped_audio = concatenate_audioclips([audio_clip] * loops)
final_looped_audio = looped_audio.subclip(0, target_duration)
looped_audio.close()
return final_looped_audio
except Exception as e:
logger.error(f"Error concatenando audio: {str(e)}")
return audio_clip.subclip(0, min(audio_clip.duration, target_duration))
def extract_visual_keywords_from_script(script_text):
logger.info("Extrayendo palabras clave del guion")
if not script_text or not script_text.strip():
logger.warning("Guion vacío")
return ["naturaleza", "ciudad", "paisaje"]
clean_text = re.sub(r'[^\w\sáéíóúñÁÉÍÓÚÑ]', '', script_text)
if kw_model:
try:
keywords1 = kw_model.extract_keywords(clean_text, keyphrase_ngram_range=(1, 1), stop_words='spanish', top_n=5)
keywords2 = kw_model.extract_keywords(clean_text, keyphrase_ngram_range=(2, 2), stop_words='spanish', top_n=3)
all_keywords = keywords1 + keywords2
all_keywords.sort(key=lambda item: item[1], reverse=True)
keywords_list = []
seen_keywords = set()
for keyword, _ in all_keywords:
formatted_keyword = keyword.lower().replace(" ", "+")
if formatted_keyword and formatted_keyword not in seen_keywords:
keywords_list.append(formatted_keyword)
seen_keywords.add(formatted_keyword)
if len(keywords_list) >= 5:
break
if keywords_list:
logger.debug(f"Palabras clave extraídas por KeyBERT: {keywords_list}")
return keywords_list
except Exception as e:
logger.warning(f"KeyBERT falló: {str(e)}. Usando método simple.")
logger.debug("Extrayendo palabras clave con método simple...")
words = clean_text.lower().split()
stop_words = {"el", "la", "los", "las", "de", "en", "y", "a", "que", "es", "un", "una", "con", "para", "del", "al", "por", "su", "sus", "se", "lo", "le", "me", "te", "nos", "os", "les", "mi", "tu"}
valid_words = [word for word in words if len(word) > 3 and word not in stop_words]
if not valid_words:
logger.warning("No se encontraron palabras clave válidas.")
return ["espiritual", "terror", "matrix", "arcontes", "galaxia"]
word_counts = Counter(valid_words)
top_keywords = [word.replace(" ", "+") for word, _ in word_counts.most_common(5)]
logger.info(f"Palabras clave finales: {top_keywords}")
return top_keywords
async def crear_video_async(prompt_type, input_text, selected_voice, musica_file=None):
logger.info("="*80)
logger.info(f"INICIANDO CREACIÓN DE VIDEO | Tipo: {prompt_type}")
logger.debug(f"Input: '{input_text[:100]}...'")
logger.info(f"Voz seleccionada: {selected_voice}")
start_time = datetime.now()
temp_dir_intermediate = tempfile.mkdtemp(prefix="video_gen_intermediate_")
logger.info(f"Directorio temporal creado: {temp_dir_intermediate}")
temp_intermediate_files = []
audio_tts_original = None
musica_audio_original = None
audio_tts = None
musica_audio = None
video_base = None
video_final = None
source_clips = []
clips_to_concatenate = []
try:
# 1. Generar o usar guion
guion = generate_script(input_text) if prompt_type == "Generar Guion con IA" else input_text.strip()
logger.info(f"Guion final ({len(guion)} chars): '{guion[:100]}...'")
if not guion.strip():
raise ValueError("El guion está vacío.")
# 2. Generar audio de voz
voz_path = os.path.join(temp_dir_intermediate, "voz.mp3")
tts_voices_to_try = [selected_voice, "es-MX-DaliaNeural"]
tts_success = False
max_chunk_length = 1000
text_chunks = [guion[i:i + max_chunk_length] for i in range(0, len(guion), max_chunk_length)]
logger.info(f"Texto dividido en {len(text_chunks)} fragmentos para TTS")
for current_voice in tts_voices_to_try:
logger.info(f"Intentando TTS con voz: {current_voice}")
try:
temp_audio_files = []
for i, chunk in enumerate(text_chunks):
temp_path = os.path.join(temp_dir_intermediate, f"voz_chunk_{i}.mp3")
tts_success = await text_to_speech(chunk, temp_path, current_voice)
if tts_success and os.path.exists(temp_path) and os.path.getsize(temp_path) > 100:
temp_audio_files.append(temp_path)
else:
logger.warning(f"TTS falló para fragmento {i} con voz: {current_voice}")
break
if len(temp_audio_files) == len(text_chunks):
audio_clips = [AudioFileClip(f) for f in temp_audio_files]
concatenated_audio = concatenate_audioclips(audio_clips)
concatenated_audio.write_audiofile(voz_path, codec='mp3')
concatenated_audio.close()
for clip in audio_clips:
clip.close()
tts_success = os.path.exists(voz_path) and os.path.getsize(voz_path) > 100
temp_intermediate_files.extend(temp_audio_files)
if tts_success:
logger.info(f"TTS exitoso con voz: {current_voice}")
break
except Exception as e:
logger.error(f"Error en TTS con voz '{current_voice}': {str(e)}")
if not tts_success or not os.path.exists(voz_path) or os.path.getsize(voz_path) <= 100:
raise ValueError(f"Error generando voz. Intentos con {tts_voices_to_try} y gTTS fallaron.")
temp_intermediate_files.append(voz_path)
audio_tts_original = AudioFileClip(voz_path)
if audio_tts_original.duration is None or audio_tts_original.duration <= 0:
raise ValueError("Audio de voz generado es inválido.")
audio_tts = audio_tts_original
audio_duration = audio_tts_original.duration
logger.info(f"Duración audio voz: {audio_duration:.2f} segundos")
if audio_duration < 1.0:
raise ValueError("Audio de voz demasiado corto.")
# 3. Extraer palabras clave
keywords = extract_visual_keywords_from_script(guion)
if not keywords:
keywords = ["video", "background"]
logger.info(f"Palabras clave: {keywords}")
# 4. Buscar y descargar videos
videos_data = []
total_desired_videos = 10
per_page_per_keyword = max(1, total_desired_videos // len(keywords))
for keyword in keywords:
if len(videos_data) >= total_desired_videos:
break
videos = buscar_videos_pexels(keyword, PEXELS_API_KEY, per_page=per_page_per_keyword)
videos_data.extend(videos)
if len(videos_data) < total_desired_videos / 2:
generic_keywords = ["mystery", "alien", "ufo", "conspiracy", "paranormal"]
for keyword in generic_keywords:
if len(videos_data) >= total_desired_videos:
break
videos = buscar_videos_pexels(keyword, PEXELS_API_KEY, per_page=2)
videos_data.extend(videos)
if not videos_data:
raise ValueError("No se encontraron videos en Pexels.")
video_paths = []
for video in videos_data:
if 'video_files' not in video or not video['video_files']:
continue
best_quality = max(video['video_files'], key=lambda x: x.get('width', 0) * x.get('height', 0), default=None)
if best_quality and 'link' in best_quality:
path = download_video_file(best_quality['link'], temp_dir_intermediate)
if path:
video_paths.append(path)
temp_intermediate_files.append(path)
if not video_paths:
raise ValueError("No se descargaron videos utilizables.")
# 5. Procesar y concatenar clips de video
current_duration = 0
min_clip_duration = 0.5
max_clip_segment = 10.0
for i, path in enumerate(video_paths):
if current_duration >= audio_duration + max_clip_segment:
break
try:
clip = VideoFileClip(path)
source_clips.append(clip)
if clip.duration is None or clip.duration <= 0:
continue
remaining_needed = audio_duration - current_duration
segment_duration = min(clip.duration, max_clip_segment, remaining_needed + min_clip_duration)
if segment_duration >= min_clip_duration:
sub = clip.subclip(0, segment_duration)
clips_to_concatenate.append(sub)
current_duration += sub.duration
except Exception as e:
logger.warning(f"Error procesando video {path}: {str(e)}")
if not clips_to_concatenate:
raise ValueError("No hay segmentos de video válidos.")
video_base = concatenate_videoclips(clips_to_concatenate, method="chain")
if video_base.duration is None or video_base.duration <= 0:
raise ValueError("Video base inválido.")
# Ajustar duración del video
if video_base.duration < audio_duration:
num_full_repeats = int(audio_duration // video_base.duration)
remaining_duration = audio_duration % video_base.duration
repeated_clips_list = [video_base] * num_full_repeats
if remaining_duration > 0:
remaining_clip = video_base.subclip(0, remaining_duration)
repeated_clips_list.append(remaining_clip)
video_base = concatenate_videoclips(repeated_clips_list, method="chain")
elif video_base.duration > audio_duration:
video_base = video_base.subclip(0, audio_duration)
# 6. Manejar música de fondo
final_audio = audio_tts
if musica_file:
try:
music_path = os.path.join(temp_dir_intermediate, "musica_bg.mp3")
shutil.copyfile(musica_file.name if hasattr(musica_file, 'name') else musica_file, music_path)
temp_intermediate_files.append(music_path)
musica_audio_original = AudioFileClip(music_path)
if musica_audio_original.duration > 0:
musica_audio = loop_audio_to_length(musica_audio_original, video_base.duration)
final_audio = CompositeAudioClip([
musica_audio.volumex(0.2),
audio_tts.volumex(1.0)
])
except Exception as e:
logger.warning(f"Error procesando música: {str(e)}")
final_audio = audio_tts
if abs(final_audio.duration - video_base.duration) > 0.2:
final_audio = final_audio.subclip(0, video_base.duration)
# 7. Combinar audio y video
video_final = video_base.set_audio(final_audio)
output_filename = f"video_{int(datetime.now().timestamp())}.mp4"
output_path = os.path.join(temp_dir_intermediate, output_filename)
persistent_dir = "/data"
os.makedirs(persistent_dir, exist_ok=True)
persistent_path = os.path.join(persistent_dir, output_filename)
video_final.write_videofile(
output_path,
fps=24,
threads=2,
codec="libx264",
audio_codec="aac",
preset="medium",
ffmpeg_params=['-vf', 'scale=1920:1080:force_original_aspect_ratio=decrease,pad=1920:1080:-1:-1:color=black', '-crf', '23'],
logger='bar'
)
shutil.move(output_path, persistent_path)
download_url = f"https://gnosticdev-invideo-basic.hf.space/file={persistent_path}"
logger.info(f"Video guardado en: {persistent_path}")
logger.info(f"URL de descarga: {download_url}")
total_time = (datetime.now() - start_time).total_seconds()
logger.info(f"Video generado en {total_time:.2f}s")
return persistent_path, download_url
except ValueError as ve:
logger.error(f"Error controlado: {str(ve)}")
raise
except Exception as e:
logger.critical(f"Error crítico: {str(e)}")
raise
finally:
for clip in source_clips + clips_to_concatenate:
try:
clip.close()
except:
pass
if audio_tts_original:
try:
audio_tts_original.close()
except:
pass
if musica_audio:
try:
musica_audio.close()
except:
pass
if musica_audio_original:
try:
musica_audio_original.close()
except:
pass
if video_base:
try:
video_base.close()
except:
pass
if video_final:
try:
video_final.close()
except:
pass
for path in temp_intermediate_files:
if os.path.isfile(path) and path != persistent_path:
try:
os.remove(path)
except:
logger.warning(f"No se pudo eliminar {path}")
try:
if os.path.exists(temp_dir_intermediate):
shutil.rmtree(temp_dir_intermediate)
except:
logger.warning(f"No se pudo eliminar directorio temporal {temp_dir_intermediate}")
async def run_app_async(prompt_type, prompt_ia, prompt_manual, musica_file, selected_voice):
logger.info("="*80)
logger.info("SOLICITUD RECIBIDA EN INTERFAZ")
input_text = prompt_ia if prompt_type == "Generar Guion con IA" else prompt_manual
output_video = None
output_file = None
status_msg = gr.update(value="⏳ Procesando... Esto puede tomar hasta 1 hora.")
if not input_text or not input_text.strip():
logger.warning("Texto de entrada vacío.")
return None, None, gr.update(value="⚠️ Ingresa texto para el guion o tema.")
voice_ids_disponibles = [v[1] for v in AVAILABLE_VOICES]
if selected_voice not in voice_ids_disponibles:
logger.warning(f"Voz inválida: '{selected_voice}'. Usando voz por defecto: {DEFAULT_VOICE_ID}")
selected_voice = DEFAULT_VOICE_ID
try:
logger.info("Iniciando generación de video...")
video_path, download_url = await crear_video_async(prompt_type, input_text, selected_voice, musica_file)
if video_path and os.path.exists(video_path):
output_video = video_path
output_file = video_path
status_msg = gr.update(value=f"✅ Video generado exitosamente. Descarga: {download_url}")
logger.info(f"Retornando video_path: {video_path}, URL: {download_url}")
else:
status_msg = gr.update(value="❌ Error: Falló la generación del video.")
logger.error("No se generó video_path válido.")
except ValueError as ve:
logger.warning(f"Error de validación: {str(ve)}")
status_msg = gr.update(value=f"⚠️ Error: {str(ve)}")
except Exception as e:
logger.critical(f"Error crítico: {str(e)}")
status_msg = gr.update(value=f"❌ Error inesperado: {str(e)}")
finally:
logger.info("Finalizando run_app_async")
return output_video, gr.File(value=output_file, label="Descargar Video"), status_msg
def run_app(prompt_type, prompt_ia, prompt_manual, musica_file, selected_voice):
return asyncio.run(run_app_async(prompt_type, prompt_ia, prompt_manual, musica_file, selected_voice))
# Interfaz de Gradio
with gr.Blocks(title="Generador de Videos con IA", theme=gr.themes.Soft()) as app:
gr.Markdown("# 🎬 Generador Automático de Videos con IA")
gr.Markdown("Genera videos cortos a partir de un tema o guion, usando imágenes de archivo de Pexels y voz generada.")
with gr.Row():
with gr.Column():
prompt_type = gr.Radio(
["Generar Guion con IA", "Usar Mi Guion"],
label="Método de Entrada",
value="Generar Guion con IA"
)
with gr.Column(visible=True) as ia_guion_column:
prompt_ia = gr.Textbox(
label="Tema para IA",
lines=2,
placeholder="Ej: Un paisaje natural con montañas y ríos al amanecer...",
max_lines=4
)
with gr.Column(visible=False) as manual_guion_column:
prompt_manual = gr.Textbox(
label="Tu Guion Completo",
lines=5,
placeholder="Ej: En este video exploraremos los misterios del océano...",
max_lines=10
)
musica_input = gr.Audio(
label="Música de fondo (opcional)",
type="filepath",
interactive=True
)
voice_dropdown = gr.Dropdown(
label="Seleccionar Voz para Guion",
choices=AVAILABLE_VOICES,
value=DEFAULT_VOICE_ID,
interactive=True
)
generate_btn = gr.Button("✨ Generar Video", variant="primary")
with gr.Column():
video_output = gr.Video(
label="Previsualización del Video Generado",
interactive=False,
height=400
)
file_output = gr.File(
label="Descargar Archivo de Video",
interactive=False,
visible=False
)
status_output = gr.Textbox(
label="Estado",
interactive=False,
placeholder="Esperando acción...",
value="Esperando entrada..."
)
prompt_type.change(
fn=lambda x: (gr.update(visible=x == "Generar Guion con IA"), gr.update(visible=x == "Usar Mi Guion")),
inputs=prompt_type,
outputs=[ia_guion_column, manual_guion_column]
)
generate_btn.click(
fn=lambda: (None, None, gr.update(value="⏳ Procesando... Esto puede tomar hasta 1 hora.")),
outputs=[video_output, file_output, status_output]
).then(
fn=run_app,
inputs=[prompt_type, prompt_ia, prompt_manual, musica_input, voice_dropdown],
outputs=[video_output, file_output, status_output],
queue=True
).then(
fn=lambda video_path, file_output, status_msg: gr.update(visible=file_output.value is not None),
inputs=[video_output, file_output, status_output],
outputs=[file_output]
)
gr.Markdown("### Instrucciones:")
gr.Markdown("""
1. Configura la variable de entorno `PEXELS_API_KEY`.
2. Selecciona el tipo de entrada: "Generar Guion con IA" o "Usar Mi Guion".
3. Sube música (opcional).
4. Selecciona la voz.
5. Haz clic en "✨ Generar Video".
6. Revisa el estado. Si el video se genera, estará disponible en /data.
7. Consulta `video_generator_full.log` para detalles.
""")
if __name__ == "__main__":
logger.info("Verificando dependencias...")
try:
from moviepy.editor import ColorClip
temp_clip = ColorClip((100,100), color=(255,0,0), duration=0.1)
temp_clip.close()
logger.info("MoviePy y FFmpeg accesibles.")
except Exception as e:
logger.critical(f"Fallo en dependencias: {e}")
raise
os.environ['GRADIO_SERVER_TIMEOUT'] = '3600'
logger.info("Iniciando aplicación Gradio...")
try:
app.launch(server_name="0.0.0.0", server_port=7860, share=False)
except Exception as e:
logger.critical(f"No se pudo iniciar la app: {str(e)}")
raise