import os import re import requests import gradio as gr from moviepy.editor import * import edge_tts import tempfile import logging from datetime import datetime import numpy as np from sklearn.feature_extraction.text import TfidfVectorizer import nltk import random from transformers import pipeline import torch import asyncio import nest_asyncio from nltk.tokenize import sent_tokenize # Setup nltk.download('punkt', quiet=True) nest_asyncio.apply() logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) PEXELS_API_KEY = os.getenv("PEXELS_API_KEY") MODEL_NAME = "DeepESP/gpt2-spanish" VOICE_NAMES, VOICES = [], [] async def get_voices(): voces = await edge_tts.list_voices() voice_names = [f"{v['Name']} ({v['Gender']}, {v['LocaleName']})" for v in voces] return voice_names, voces async def get_and_set_voices(): global VOICE_NAMES, VOICES try: VOICE_NAMES, VOICES = await get_voices() if not VOICES: raise Exception("No se encontraron voces.") except Exception as e: logger.warning(f"Fallo al cargar voces: {e}") VOICE_NAMES = ["Voz Predeterminada (Femenino, es-ES)"] VOICES = [{'ShortName': 'es-ES-ElviraNeural'}] asyncio.get_event_loop().run_until_complete(get_and_set_voices()) def generar_guion_profesional(prompt): try: generator = pipeline( "text-generation", model=MODEL_NAME, device=0 if torch.cuda.is_available() else -1 ) response = generator( f"Escribe un guion profesional para un video de YouTube sobre '{prompt}'. " "Incluye introducción, desarrollo en 3 secciones y conclusión:", max_length=1000, temperature=0.7, top_k=50, top_p=0.95, num_return_sequences=1 ) guion = response[0]['generated_text'] if len(guion.split()) < 100: raise ValueError("Guion demasiado breve") return guion except Exception as e: logger.error(f"Error generando guion: {e}") return f"""Introducción sobre {prompt}. Sección 1: Orígenes e historia. Sección 2: Estado actual. Sección 3: Futuro e impacto. Conclusión reflexiva.""" def buscar_videos_avanzado(prompt, guion, num_videos=5): try: oraciones = sent_tokenize(guion) vectorizer = TfidfVectorizer(stop_words='spanish') tfidf = vectorizer.fit_transform(oraciones) palabras = vectorizer.get_feature_names_out() scores = np.asarray(tfidf.sum(axis=0)).ravel() top_indices = np.argsort(scores)[-5:] palabras_clave = [palabras[i] for i in top_indices] palabras_prompt = re.findall(r'\b\w{4,}\b', prompt.lower()) todas = list(set(palabras_clave + palabras_prompt))[:5] headers = {"Authorization": PEXELS_API_KEY} response = requests.get( f"https://api.pexels.com/videos/search?query={'+'.join(todas)}&per_page={num_videos}", headers=headers, timeout=15 ) return response.json().get('videos', []) except Exception as e: logger.error(f"Error buscando videos: {e}") return [] async def crear_video_profesional(prompt, custom_script, voz_index, musica=None): voz_archivo = "voz.mp3" try: guion = custom_script if custom_script.strip() else generar_guion_profesional(prompt) voz_seleccionada = VOICES[voz_index]['ShortName'] if VOICES else 'es-ES-ElviraNeural' # Generar audio await edge_tts.Communicate(guion, voz_seleccionada).save(voz_archivo) audio = AudioFileClip(voz_archivo) # Obtener videos videos_data = buscar_videos_avanzado(prompt, guion) if not videos_data: raise Exception("No se encontraron videos") # Procesar videos clips = [] for video in videos_data[:3]: video_file = next((vf for vf in video['video_files'] if vf['quality'] == 'sd'), video['video_files'][0]) with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as temp_video: response = requests.get(video_file['link'], stream=True) for chunk in response.iter_content(chunk_size=1024 * 1024): temp_video.write(chunk) clip = VideoFileClip(temp_video.name).subclip(0, min(10, video['duration'])) clips.append(clip) # Crear video final video_final = concatenate_videoclips(clips) video_final = video_final.set_audio(audio) output_path = f"video_output_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp4" video_final.write_videofile(output_path, fps=24, threads=2) return output_path except Exception as e: logger.error(f"Error crítico: {e}") return None finally: if os.path.exists(voz_archivo): os.remove(voz_archivo) # Gradio app with gr.Blocks(title="Generador de Videos") as app: with gr.Row(): with gr.Column(): prompt = gr.Textbox(label="Tema del video") custom_script = gr.TextArea(label="Guión personalizado (opcional)") voz = gr.Dropdown(VOICE_NAMES, label="Voz", value=VOICE_NAMES[0]) btn = gr.Button("Generar Video", variant="primary") with gr.Column(): output = gr.Video(label="Resultado", format="mp4") async def wrapper(p, cs, v): return await crear_video_profesional(p, cs, VOICE_NAMES.index(v)) btn.click( fn=wrapper, inputs=[prompt, custom_script, voz], outputs=output ) if __name__ == "__main__": app.launch(server_name="0.0.0.0", server_port=7860)