Spaces:
Running
Running
import os | |
import asyncio | |
import logging | |
import tempfile | |
import requests | |
from datetime import datetime | |
import edge_tts | |
import gradio as gr | |
import torch | |
from transformers import GPT2Tokenizer, GPT2LMHeadModel | |
from keybert import KeyBERT | |
from moviepy.editor import VideoFileClip, concatenate_videoclips, AudioFileClip, CompositeAudioClip | |
import subprocess | |
import re | |
import math | |
from pydub import AudioSegment | |
from pexelsapi.pexels import Pexels | |
# Configuraci贸n inicial | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
PEXELS_API_KEY = os.environ.get("PEXELS_API_KEY") | |
# Inicializaci贸n de modelos | |
MODEL_NAME = "gpt2" | |
try: | |
tokenizer = GPT2Tokenizer.from_pretrained(MODEL_NAME) | |
model = GPT2LMHeadModel.from_pretrained(MODEL_NAME).eval() | |
if tokenizer.pad_token is None: | |
tokenizer.pad_token = tokenizer.eos_token | |
except Exception as e: | |
logger.error(f"Error al cargar modelo GPT-2: {e}") | |
tokenizer = model = None | |
try: | |
kw_model = KeyBERT('multi-qa-MiniLM-L6-cos-v1') | |
except Exception as e: | |
logger.error(f"Error al cargar KeyBERT: {e}") | |
kw_model = None | |
# Funciones principales | |
def generate_script(prompt, max_length=250): | |
if not tokenizer or not model: | |
return "Error: Modelo no disponible" | |
try: | |
inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=512) | |
outputs = model.generate(**inputs, max_length=max_length) | |
return tokenizer.decode(outputs[0], skip_special_tokens=True) | |
except Exception as e: | |
logger.error(f"Error generando gui贸n: {e}") | |
return "Error al generar gui贸n" | |
async def text_to_speech(text, output_path, voice="es-ES-ElviraNeural"): | |
try: | |
communicate = edge_tts.Communicate(text, voice) | |
await communicate.save(output_path) | |
return True | |
except Exception as e: | |
logger.error(f"Error en TTS: {e}") | |
return False | |
def search_pexels_videos(query_list, num_videos=3): | |
if not PEXELS_API_KEY: | |
raise ValueError("API key no configurada") | |
pexel = Pexels(PEXELS_API_KEY) | |
video_urls = [] | |
for query in query_list: | |
try: | |
results = pexel.search_videos(query=query, per_page=num_videos) | |
for video in results.get('videos', []): | |
best = max(video['video_files'], key=lambda x: x['width']*x['height']) | |
video_urls.append(best['link']) | |
except Exception as e: | |
logger.error(f"Error buscando videos: {e}") | |
return video_urls | |
def download_video(url, temp_dir): | |
try: | |
response = requests.get(url, stream=True, timeout=30) | |
filename = f"video_{datetime.now().strftime('%Y%m%d%H%M%S')}.mp4" | |
filepath = os.path.join(temp_dir, filename) | |
with open(filepath, 'wb') as f: | |
for chunk in response.iter_content(chunk_size=8192): | |
f.write(chunk) | |
return filepath | |
except Exception as e: | |
logger.error(f"Error descargando video: {e}") | |
return None | |
def create_video(audio_path, video_paths, output_path): | |
try: | |
clips = [VideoFileClip(path) for path in video_paths] | |
final_clip = concatenate_videoclips(clips) | |
audio = AudioFileClip(audio_path) | |
if final_clip.duration < audio.duration: | |
final_clip = final_clip.loop(duration=audio.duration) | |
final_clip = final_clip.set_audio(audio) | |
final_clip.write_videofile( | |
output_path, | |
codec="libx264", | |
audio_codec="aac", | |
fps=24 | |
) | |
return True | |
except Exception as e: | |
logger.error(f"Error creando video: {e}") | |
return False | |
# Interfaz de Gradio | |
with gr.Blocks() as app: | |
with gr.Row(): | |
with gr.Column(): | |
prompt_type = gr.Radio(["Generar Guion", "Usar Guion Existente"], label="Tipo de Entrada") | |
text_input = gr.Textbox(label="Texto", lines=5) | |
generate_btn = gr.Button("Generar Video") | |
with gr.Column(): | |
video_output = gr.Video() | |
status_output = gr.Textbox(label="Estado") | |
async def generate_video(prompt_type, text): | |
temp_dir = tempfile.mkdtemp() | |
try: | |
if prompt_type == "Generar Guion": | |
script = generate_script(text) | |
else: | |
script = text | |
tts_path = os.path.join(temp_dir, "audio.mp3") | |
if not await text_to_speech(script, tts_path): | |
return None, "Error generando voz" | |
keywords = extract_keywords(script) if kw_model else [text.split()[0]] | |
video_urls = search_pexels_videos(keywords) | |
video_paths = [] | |
for url in video_urls: | |
path = download_video(url, temp_dir) | |
if path: | |
video_paths.append(path) | |
if not video_paths: | |
return None, "Error descargando videos" | |
output_path = os.path.join(temp_dir, "final.mp4") | |
if create_video(tts_path, video_paths, output_path): | |
return output_path, "Video generado" | |
else: | |
return None, "Error creando video" | |
except Exception as e: | |
logger.error(f"Error: {str(e)}") | |
return None, f"Error: {str(e)}" | |
finally: | |
pass # Hugging Face limpia autom谩ticamente | |
if __name__ == "__main__": | |
app.launch(server_name="0.0.0.0", server_port=7860) |