Spaces:
Running
Running
import os | |
import re | |
import random | |
import requests | |
import gradio as gr | |
from moviepy.editor import VideoFileClip, concatenate_videoclips, AudioFileClip, CompositeAudioClip | |
from moviepy.audio.fx.all import audio_loop | |
import edge_tts | |
import asyncio | |
from datetime import datetime | |
from pathlib import Path | |
from transformers import pipeline | |
from sentence_transformers import SentenceTransformer | |
from sklearn.metrics.pairwise import cosine_similarity | |
import numpy as np | |
import logging | |
from typing import List, Optional, Tuple | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
logger = logging.getLogger(__name__) | |
# Configuración de modelos de IA | |
PEXELS_API_KEY = os.getenv("PEXELS_API_KEY") | |
if not PEXELS_API_KEY: | |
logger.error("PEXELS_API_KEY no encontrada en variables de entorno") | |
# Cargamos modelos de IA para análisis semántico | |
logger.info("Cargando modelos de IA...") | |
try: | |
# Modelo para generación de texto | |
text_generator = pipeline("text-generation", model="facebook/mbart-large-50", device="cpu") | |
# Modelo para embeddings semánticos (para matching de videos) | |
semantic_model = SentenceTransformer('sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2') | |
logger.info("Modelos de IA cargados exitosamente") | |
except Exception as e: | |
logger.error(f"Error cargando modelos de IA: {e}") | |
raise | |
# Sistema mejorado de búsqueda semántica | |
def fetch_semantic_videos(query: str, script: str, num_videos: int = 5) -> List[Tuple[str, float]]: | |
"""Busca videos en Pexels usando matching semántico con el script""" | |
logger.info(f"Buscando videos semánticos para: '{query}'") | |
# Generar embedding del script completo | |
script_embedding = semantic_model.encode(script, convert_to_tensor=True) | |
headers = {"Authorization": PEXELS_API_KEY} | |
url = f"https://api.pexels.com/videos/search?query={query}&per_page={num_videos*2}" # Buscamos más para filtrar | |
try: | |
response = requests.get(url, headers=headers, timeout=15) | |
response.raise_for_status() | |
videos_data = [] | |
for video in response.json().get("videos", []): | |
# Filtramos por calidad mínima | |
video_files = [vf for vf in video.get("video_files", []) | |
if vf.get("width", 0) >= 1280 and vf.get("duration", 0) >= 5] | |
if video_files: | |
best_file = max(video_files, key=lambda x: x.get("width", 0)) | |
video_title = video.get("alt", "") or video.get("url", "") | |
# Calculamos similitud semántica | |
title_embedding = semantic_model.encode(video_title, convert_to_tensor=True) | |
similarity = cosine_similarity( | |
script_embedding.cpu().numpy().reshape(1, -1), | |
title_embedding.cpu().numpy().reshape(1, -1) | |
)[0][0] | |
videos_data.append((best_file["link"], similarity, video_title)) | |
# Ordenamos por relevancia semántica | |
videos_data.sort(key=lambda x: x[1], reverse=True) | |
# Filtramos los más relevantes | |
selected_videos = videos_data[:num_videos] | |
logger.info(f"Videos encontrados (relevancia):") | |
for idx, (url, score, title) in enumerate(selected_videos, 1): | |
logger.info(f"{idx}. {title} (score: {score:.2f})") | |
return [url for url, _, _ in selected_videos] | |
except Exception as e: | |
logger.error(f"Error en búsqueda semántica: {e}") | |
return [] | |
# Generación de script con contexto mejorado | |
def generate_script(prompt: str, custom_text: Optional[str] = None) -> str: | |
"""Genera un script contextualizado con IA""" | |
if custom_text and custom_text.strip(): | |
return custom_text.strip() | |
if not prompt or not prompt.strip(): | |
return "Error: Proporciona un tema o guion" | |
try: | |
# Prompt mejorado para generación contextual | |
context_prompt = f""" | |
Genera un guion detallado para un video sobre '{prompt}'. | |
El formato debe ser: | |
1. [Concepto 1]: Descripción breve (15-25 palabras) | |
2. [Concepto 2]: Descripción breve | |
... | |
Incluye detalles visuales entre [] para ayudar a seleccionar imágenes. | |
Ejemplo: [playa con palmeras] o [ciudad moderna con rascacielos] | |
""" | |
generated = text_generator( | |
context_prompt, | |
max_length=400, | |
num_return_sequences=1, | |
do_sample=True, | |
temperature=0.7, | |
top_k=50, | |
top_p=0.9 | |
)[0]['generated_text'] | |
# Post-procesamiento para limpiar el texto | |
cleaned = re.sub(r"<.*?>", "", generated) # Remove HTML tags | |
cleaned = re.sub(r"\n+", "\n", cleaned) # Remove extra newlines | |
return cleaned.strip() | |
except Exception as e: | |
logger.error(f"Error generando script: {e}") | |
return f"Top 10 sobre {prompt}: [ejemplo 1] Descripción breve..." | |
# Sistema mejorado de descarga de videos | |
def download_video_segment(url: str, duration: float, output_path: str) -> bool: | |
"""Descarga y procesa segmentos de video con manejo robusto""" | |
temp_path = f"temp_{random.randint(1000,9999)}.mp4" | |
try: | |
# Descarga con verificación | |
with requests.get(url, stream=True, timeout=20) as r: | |
r.raise_for_status() | |
with open(temp_path, 'wb') as f: | |
for chunk in r.iter_content(chunk_size=1024*1024): | |
if chunk: | |
f.write(chunk) | |
# Procesamiento con controles | |
with VideoFileClip(temp_path) as clip: | |
if clip.duration < 2: | |
raise ValueError("Video demasiado corto") | |
end_time = min(duration, clip.duration - 0.1) | |
subclip = clip.subclip(0, end_time) | |
# Configuración optimizada | |
subclip.write_videofile( | |
output_path, | |
codec="libx264", | |
audio_codec="aac", | |
fps=24, | |
threads=4, | |
preset='fast', | |
ffmpeg_params=[ | |
'-max_muxing_queue_size', '1024', | |
'-crf', '23', | |
'-movflags', '+faststart' | |
] | |
) | |
return True | |
except Exception as e: | |
logger.error(f"Error procesando video: {e}") | |
return False | |
finally: | |
if os.path.exists(temp_path): | |
os.remove(temp_path) | |
# Función principal mejorada | |
def create_contextual_video(prompt: str, custom_text: Optional[str] = None, music_file: Optional[str] = None) -> str: | |
"""Crea un video con matching semántico entre texto e imágenes""" | |
# 1. Generación del script | |
script = generate_script(prompt, custom_text) | |
logger.info(f"Script generado:\n{script}") | |
# 2. Búsqueda semántica de videos | |
search_query = " ".join(extract_keywords(script)) or prompt | |
video_urls = fetch_semantic_videos(search_query, script) | |
if not video_urls: | |
return "Error: No se encontraron videos relevantes. Intenta con otro tema." | |
# 3. Generación de voz | |
voice_file = f"voice_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp3" | |
if not run_async(generate_voice(script, voice_file)): | |
return "Error: No se pudo generar la narración." | |
# 4. Procesamiento de videos | |
output_dir = "output_videos" | |
os.makedirs(output_dir, exist_ok=True) | |
output_path = f"{output_dir}/video_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp4" | |
try: | |
# Descargar y preparar segmentos | |
clips = [] | |
segment_duration = AudioFileClip(voice_file).duration / len(video_urls) | |
for idx, url in enumerate(video_urls): | |
clip_path = f"segment_{idx}.mp4" | |
if download_video_segment(url, segment_duration, clip_path): | |
clips.append(VideoFileClip(clip_path)) | |
if not clips: | |
return "Error: No se pudieron procesar los videos." | |
# 5. Ensamblaje final | |
final_video = concatenate_videoclips(clips, method="compose") | |
audio_clip = AudioFileClip(voice_file) | |
# Añadir música de fondo si existe | |
if music_file and os.path.exists(music_file.name): | |
music = audio_loop(AudioFileClip(music_file.name), duration=audio_clip.duration) | |
final_audio = CompositeAudioClip([audio_clip, music.volumex(0.2)]) | |
else: | |
final_audio = audio_clip | |
final_video = final_video.set_audio(final_audio) | |
# Renderizado final optimizado | |
final_video.write_videofile( | |
output_path, | |
codec="libx264", | |
audio_codec="aac", | |
fps=24, | |
threads=6, | |
preset='fast', | |
bitrate="5000k" | |
) | |
return output_path | |
except Exception as e: | |
logger.error(f"Error crítico al crear video: {e}") | |
return f"Error: Fallo en la creación del video - {str(e)}" | |
finally: | |
# Limpieza | |
for clip in clips: | |
clip.close() | |
if os.path.exists(voice_file): | |
os.remove(voice_file) | |
for i in range(len(video_urls)): | |
if os.path.exists(f"segment_{i}.mp4"): | |
os.remove(f"segment_{i}.mp4") | |
# Interfaz mejorada | |
with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
gr.Markdown(""" | |
# 🎬 Generador de Videos con IA Semántica | |
**Crea videos donde las imágenes coinciden perfectamente con tu texto** | |
""") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.Image("https://i.imgur.com/7X8P5R8.png", label="Ejemplo Visual") | |
with gr.Accordion("📌 Consejos para mejores resultados", open=False): | |
gr.Markdown(""" | |
- **Describe tu tema con detalles**: "Playas del Caribe con arena blanca" en vez de solo "playas" | |
- **Usa sustantivos concretos**: "Animales de la selva amazónica" > "naturaleza" | |
- **Sé específico**: "Tecnología 2024" > "Avances en inteligencia artificial 2024" | |
""") | |
gr.Examples( | |
examples=[ | |
["Lugares históricos de Europa con arquitectura medieval"], | |
["Tecnologías emergentes en inteligencia artificial para 2024"], | |
["Recetas tradicionales mexicanas con ingredientes autóctonos"] | |
], | |
inputs=[prompt], | |
label="Ejemplos de prompts efectivos" | |
) | |
with gr.Column(scale=2): | |
prompt = gr.Textbox( | |
label="Tema principal del video", | |
placeholder="Ej: 'Top 5 innovaciones tecnológicas de 2024'", | |
max_lines=2 | |
) | |
custom_text = gr.TextArea( | |
label="O escribe tu propio guion (opcional)", | |
placeholder="Ej: 1. [Robot humanoide] Avances en robótica...", | |
lines=6 | |
) | |
music_file = gr.File( | |
label="Música de fondo (opcional - MP3)", | |
type="filepath", | |
file_types=[".mp3"] | |
) | |
submit = gr.Button("🚀 Generar Video", variant="primary") | |
output = gr.Video( | |
label="Video Generado", | |
format="mp4", | |
interactive=False | |
) | |
submit.click( | |
fn=create_contextual_video, | |
inputs=[prompt, custom_text, music_file], | |
outputs=output, | |
api_name="generate_video" | |
) | |
if __name__ == "__main__": | |
demo.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
share=True, | |
debug=True | |
) |