Spaces:
Sleeping
Sleeping
import os | |
import asyncio | |
import logging | |
import tempfile | |
import requests | |
from datetime import datetime | |
import edge_tts | |
import gradio as gr | |
import torch | |
from transformers import GPT2Tokenizer, GPT2LMHeadModel | |
from keybert import KeyBERT | |
from moviepy.editor import VideoFileClip, concatenate_videoclips, AudioFileClip, CompositeAudioClip | |
import re | |
import math | |
from pydub import AudioSegment | |
from collections import Counter | |
import shutil | |
import json | |
# Configuración de logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
# Clave API de Pexels | |
PEXELS_API_KEY = os.environ.get("PEXELS_API_KEY") | |
# Buscar videos en Pexels usando API REST - Versión mejorada | |
def buscar_videos_pexels(query, api_key, per_page=5): | |
headers = {"Authorization": api_key} | |
try: | |
params = { | |
"query": query, | |
"per_page": per_page, | |
"orientation": "landscape", | |
"size": "medium" | |
} | |
response = requests.get( | |
"https://api.pexels.com/videos/search", | |
headers=headers, | |
params=params, | |
timeout=20 | |
) | |
response.raise_for_status() | |
# Intentar parsear la respuesta | |
try: | |
data = response.json() | |
return data.get('videos', []) | |
except json.JSONDecodeError: | |
logger.error("Respuesta JSON inválida de Pexels") | |
return [] | |
except requests.exceptions.RequestException as e: | |
logger.error(f"Error de conexión con Pexels: {e}") | |
except Exception as e: | |
logger.error(f"Error inesperado: {e}") | |
return [] | |
# Inicialización de modelos | |
MODEL_NAME = "datificate/gpt2-small-spanish" | |
try: | |
tokenizer = GPT2Tokenizer.from_pretrained(MODEL_NAME) | |
model = GPT2LMHeadModel.from_pretrained(MODEL_NAME).eval() | |
if tokenizer.pad_token is None: | |
tokenizer.pad_token = tokenizer.eos_token | |
logger.info("Modelo GPT-2 en español cargado") | |
except Exception as e: | |
logger.error(f"Error al cargar modelo GPT-2: {e}") | |
tokenizer = model = None | |
try: | |
kw_model = KeyBERT('distilbert-base-multilingual-cased') | |
logger.info("KeyBERT cargado") | |
except Exception as e: | |
logger.error(f"Error al cargar KeyBERT: {e}") | |
kw_model = None | |
# Función mejorada para generar guiones | |
def generate_script(prompt, max_length=150): | |
if not tokenizer or not model: | |
return prompt # Fallback al prompt original | |
try: | |
# Prompt mejorado con instrucciones claras | |
enhanced_prompt = f"Escribe un guion corto y coherente sobre: {prompt}" | |
inputs = tokenizer(enhanced_prompt, return_tensors="pt", truncation=True, max_length=512) | |
# Parámetros optimizados para español | |
outputs = model.generate( | |
**inputs, | |
max_length=max_length, | |
do_sample=True, | |
top_p=0.9, | |
top_k=40, | |
temperature=0.7, | |
repetition_penalty=1.5, | |
pad_token_id=tokenizer.pad_token_id, | |
eos_token_id=tokenizer.eos_token_id | |
) | |
text = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
# Limpiar texto generado | |
text = re.sub(r'<[^>]+>', '', text) # Eliminar tokens especiales | |
sentences = text.split('.') | |
if sentences: | |
text = sentences[0] + '.' # Tomar la primera oración coherente | |
return text | |
except Exception as e: | |
logger.error(f"Error generando guion: {e}") | |
return prompt # Fallback al prompt original | |
# Generación de voz | |
async def text_to_speech(text, output_path, voice="es-ES-ElviraNeural"): | |
try: | |
communicate = edge_tts.Communicate(text, voice) | |
await communicate.save(output_path) | |
return True | |
except Exception as e: | |
logger.error(f"Error en TTS: {e}") | |
return False | |
# Descarga de videos - Versión más robusta | |
def download_video_file(url, temp_dir): | |
if not url: | |
return None | |
try: | |
response = requests.get(url, stream=True, timeout=30) | |
response.raise_for_status() | |
file_name = f"video_{datetime.now().strftime('%H%M%S%f')}.mp4" | |
output_path = os.path.join(temp_dir, file_name) | |
with open(output_path, 'wb') as f: | |
for chunk in response.iter_content(chunk_size=8192): | |
f.write(chunk) | |
return output_path | |
except Exception as e: | |
logger.error(f"Error descargando video: {e}") | |
return None | |
# Loop para audio | |
def loop_audio_to_length(audio_clip, target_duration): | |
if audio_clip.duration >= target_duration: | |
return audio_clip.subclip(0, target_duration) | |
loops = int(target_duration / audio_clip.duration) + 1 | |
audios = [audio_clip] * loops | |
return concatenate_videoclips(audios).subclip(0, target_duration) | |
# Extracción de palabras clave robusta - Versión mejorada | |
def extract_visual_keywords_from_script(script_text): | |
# Limpiar texto | |
clean_text = re.sub(r'[^\w\sáéíóúñ]', '', script_text.lower()) | |
# Método 1: KeyBERT si está disponible | |
if kw_model: | |
try: | |
keywords = kw_model.extract_keywords( | |
clean_text, | |
keyphrase_ngram_range=(1, 1), | |
stop_words='spanish', | |
top_n=3 | |
) | |
if keywords: | |
return [kw[0].replace(" ", "+") for kw in keywords] | |
except Exception as e: | |
logger.warning(f"KeyBERT falló: {e}") | |
# Método 2: Frecuencia de palabras (fallback) | |
words = clean_text.split() | |
stop_words = {"el", "la", "los", "las", "de", "en", "y", "a", "que", "es", "un", "una", "con"} | |
keywords = [word for word in words if len(word) > 3 and word not in stop_words] | |
# Si aún no hay palabras clave, usar palabras predeterminadas | |
if not keywords: | |
logger.warning("Usando palabras clave predeterminadas") | |
return ["naturaleza", "ciudad", "paisaje"] | |
# Contar frecuencia y seleccionar las 3 más comunes | |
word_counts = Counter(keywords) | |
return [word.replace(" ", "+") for word, _ in word_counts.most_common(3)] | |
# Función principal para crear video - Versión mejorada | |
def crear_video(prompt_type, input_text, musica_file=None): | |
logger.info(f"Iniciando creación de video: {prompt_type}") | |
# 1. Generar o usar guion | |
if prompt_type == "Generar Guion con IA": | |
guion = generate_script(input_text) | |
else: | |
guion = input_text | |
logger.info(f"Guion: {guion[:100]}...") | |
# Validar guion | |
if not guion.strip(): | |
raise ValueError("El guion está vacío") | |
# Directorio temporal | |
temp_dir = tempfile.mkdtemp() | |
temp_files = [] | |
try: | |
# 2. Generar audio de voz | |
voz_path = os.path.join(temp_dir, "voz.mp3") | |
if not asyncio.run(text_to_speech(guion, voz_path)): | |
raise ValueError("Error generando voz") | |
temp_files.append(voz_path) | |
audio_tts = AudioFileClip(voz_path) | |
audio_duration = audio_tts.duration | |
# 3. Extraer palabras clave con respaldo | |
try: | |
keywords = extract_visual_keywords_from_script(guion) | |
except Exception as e: | |
logger.error(f"Error extrayendo palabras clave: {e}") | |
keywords = ["naturaleza", "paisaje"] # Palabras clave de respaldo | |
logger.info(f"Palabras clave: {keywords}") | |
# 4. Buscar y descargar videos con múltiples intentos | |
videos_data = [] | |
for keyword in keywords: | |
try: | |
videos = buscar_videos_pexels(keyword, PEXELS_API_KEY, per_page=3) | |
if videos: | |
videos_data.extend(videos) | |
logger.info(f"Encontrados {len(videos)} videos para '{keyword}'") | |
except Exception as e: | |
logger.warning(f"Error buscando videos para '{keyword}': {e}") | |
# Si no encontramos videos, intentar con palabras clave genéricas | |
if not videos_data: | |
logger.warning("Usando palabras clave genéricas como respaldo") | |
for keyword in ["naturaleza", "ciudad", "paisaje"]: | |
videos = buscar_videos_pexels(keyword, PEXELS_API_KEY, per_page=3) | |
if videos: | |
videos_data.extend(videos) | |
if not videos_data: | |
raise ValueError("No se encontraron videos en Pexels para ninguna palabra clave") | |
video_paths = [] | |
for video in videos_data: | |
if 'video_files' not in video or not video['video_files']: | |
continue | |
try: | |
# Seleccionar la mejor calidad de video | |
best_quality = max( | |
video['video_files'], | |
key=lambda x: x.get('width', 0) * x.get('height', 0) | |
) | |
if 'link' in best_quality: | |
path = download_video_file(best_quality['link'], temp_dir) | |
if path: | |
video_paths.append(path) | |
temp_files.append(path) | |
logger.info(f"Video descargado: {best_quality['link']}") | |
except Exception as e: | |
logger.warning(f"Error procesando video: {e}") | |
if not video_paths: | |
raise ValueError("No se pudo descargar ningún video") | |
# 5. Procesar videos | |
clips = [] | |
current_duration = 0 | |
for path in video_paths: | |
if current_duration >= audio_duration: | |
break | |
try: | |
clip = VideoFileClip(path) | |
usable_duration = min(clip.duration, 10) | |
if usable_duration > 1: # Ignorar clips muy cortos | |
clips.append(clip.subclip(0, usable_duration)) | |
current_duration += usable_duration | |
logger.info(f"Añadido clip de {usable_duration:.1f}s (total: {current_duration:.1f}/{audio_duration:.1f}s)") | |
except Exception as e: | |
logger.warning(f"Error procesando video {path}: {e}") | |
if not clips: | |
raise ValueError("No hay clips válidos para crear el video") | |
video_base = concatenate_videoclips(clips, method="compose") | |
# Ajustar duración del video si es necesario | |
if video_base.duration < audio_duration: | |
num_repeats = int(audio_duration / video_base.duration) + 1 | |
repeated_clips = [video_base] * num_repeats | |
video_base = concatenate_videoclips(repeated_clips).subclip(0, audio_duration) | |
# 6. Manejar música de fondo | |
final_audio = audio_tts | |
if musica_file: | |
try: | |
# Copiar archivo de música | |
music_path = os.path.join(temp_dir, "musica.mp3") | |
shutil.copyfile(musica_file, music_path) | |
temp_files.append(music_path) | |
# Procesar música | |
musica_audio = AudioFileClip(music_path) | |
# Crear loop si es necesario | |
if musica_audio.duration < audio_duration: | |
musica_audio = loop_audio_to_length(musica_audio, audio_duration) | |
# Mezclar con el audio de voz | |
final_audio = CompositeAudioClip([ | |
musica_audio.volumex(0.3), # 30% volumen | |
audio_tts.volumex(1.0) # 100% volumen voz | |
]) | |
logger.info("Música de fondo añadida") | |
except Exception as e: | |
logger.warning(f"Error procesando música: {e}") | |
# 7. Crear video final | |
video_final = video_base.set_audio(final_audio) | |
output_path = os.path.join(temp_dir, "final_video.mp4") | |
video_final.write_videofile( | |
output_path, | |
fps=24, | |
threads=4, | |
codec="libx264", | |
audio_codec="aac", | |
preset="medium", | |
logger=None | |
) | |
logger.info(f"Video creado: {output_path}") | |
return output_path | |
except Exception as e: | |
logger.error(f"Error creando video: {e}") | |
raise | |
finally: | |
# Limpieza | |
for path in temp_files: | |
try: | |
if os.path.isfile(path): | |
os.remove(path) | |
except: | |
pass | |
if os.path.exists(temp_dir): | |
shutil.rmtree(temp_dir, ignore_errors=True) | |
# Función para ejecutar la aplicación | |
def run_app(prompt_type, prompt_ia, prompt_manual, musica_file): | |
input_text = prompt_ia if prompt_type == "Generar Guion con IA" else prompt_manual | |
if not input_text.strip(): | |
return None, "Por favor ingresa texto" | |
try: | |
video_path = crear_video(prompt_type, input_text, musica_file) | |
return video_path, "✅ Video generado exitosamente" | |
except ValueError as ve: | |
return None, f"⚠️ {ve}" | |
except Exception as e: | |
return None, f"❌ Error: {str(e)}" | |
# Interfaz de Gradio | |
with gr.Blocks(title="Generador de Videos con IA", theme=gr.themes.Soft(), css=""" | |
.gradio-container {max-width: 800px; margin: auto;} | |
h1 {text-align: center;} | |
""") as app: | |
gr.Markdown("# 🎬 Generador Automático de Videos con IA") | |
with gr.Row(): | |
with gr.Column(): | |
prompt_type = gr.Radio( | |
["Generar Guion con IA", "Usar Mi Guion"], | |
label="Método de Entrada", | |
value="Generar Guion con IA" | |
) | |
with gr.Column(visible=True) as ia_guion_column: | |
prompt_ia = gr.Textbox( | |
label="Tema para IA", | |
lines=2, | |
placeholder="Ej: Un paisaje natural con montañas y ríos...", | |
max_lines=4 | |
) | |
with gr.Column(visible=False) as manual_guion_column: | |
prompt_manual = gr.Textbox( | |
label="Tu Guion Completo", | |
lines=5, | |
placeholder="Ej: En este video exploraremos los misterios del océano...", | |
max_lines=10 | |
) | |
musica_input = gr.Audio( | |
label="Música de fondo (opcional)", | |
type="filepath", | |
interactive=True | |
) | |
generate_btn = gr.Button("✨ Generar Video", variant="primary") | |
with gr.Column(): | |
video_output = gr.Video( | |
label="Video Generado", | |
interactive=False, | |
height=400 | |
) | |
status_output = gr.Textbox( | |
label="Estado", | |
interactive=False, | |
show_label=False, | |
placeholder="Esperando acción..." | |
) | |
# Manejar visibilidad de columnas | |
prompt_type.change( | |
lambda x: (gr.update(visible=x == "Generar Guion con IA"), | |
gr.update(visible=x == "Usar Mi Guion")), | |
inputs=prompt_type, | |
outputs=[ia_guion_column, manual_guion_column] | |
) | |
# Lógica de generación | |
generate_btn.click( | |
lambda: (None, "⏳ Procesando... (esto puede tomar 2-5 minutos)"), | |
outputs=[video_output, status_output], | |
queue=False | |
).then( | |
run_app, | |
inputs=[prompt_type, prompt_ia, prompt_manual, musica_input], | |
outputs=[video_output, status_output] | |
) | |
gr.Markdown("### Instrucciones:") | |
gr.Markdown(""" | |
1. **Selecciona el tipo de entrada**: | |
- "Generar Guion con IA": Describe un tema | |
- "Usar Mi Guion": Escribe tu guion completo | |
2. **Sube música** (opcional): Selecciona un archivo de audio | |
3. **Haz clic en Generar Video** | |
4. Espera a que se procese el video (puede tomar varios minutos) | |
""") | |
if __name__ == "__main__": | |
app.launch(server_name="0.0.0.0", server_port=7860) |