Spaces:
Running
Running
import os | |
import re | |
import random | |
import time | |
import logging | |
from typing import Optional, List | |
from datetime import datetime | |
from pathlib import Path | |
# Configuraci贸n inicial para HF Spaces | |
os.environ["TOKENIZERS_PARALLELISM"] = "false" | |
os.environ["GRADIO_ANALYTICS_ENABLED"] = "false" | |
os.environ["HF_HUB_DISABLE_PROGRESS_BARS"] = "1" | |
# Configuraci贸n de logging | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
logger = logging.getLogger(__name__) | |
try: | |
import requests | |
from moviepy.editor import VideoFileClip, concatenate_videoclips, AudioFileClip, CompositeAudioClip | |
from moviepy.audio.fx.all import audio_loop | |
import edge_tts | |
import gradio as gr | |
import numpy as np | |
from transformers import pipeline | |
import backoff | |
except ImportError as e: | |
logger.error(f"Error importing dependencies: {e}") | |
raise | |
# Constantes configurables | |
MAX_VIDEOS = 3 # Reducir para evitar rate limiting | |
VIDEO_SEGMENT_DURATION = 5 # Duraci贸n de cada segmento en segundos | |
MAX_RETRIES = 3 # M谩ximo de reintentos para descargas | |
REQUEST_TIMEOUT = 15 # Timeout para requests | |
# Configuraci贸n de modelos | |
MODEL_NAME = "facebook/mbart-large-50" | |
PEXELS_API_KEY = os.getenv("PEXELS_API_KEY", "") | |
def safe_download(url: str, timeout: int = REQUEST_TIMEOUT) -> Optional[str]: | |
"""Descarga segura con reintentos y manejo de rate limiting""" | |
try: | |
response = requests.get(url, stream=True, timeout=timeout) | |
response.raise_for_status() | |
filename = f"temp_{random.randint(1000,9999)}.mp4" | |
with open(filename, 'wb') as f: | |
for chunk in response.iter_content(chunk_size=8192): | |
f.write(chunk) | |
return filename | |
except requests.exceptions.HTTPError as e: | |
if e.response.status_code == 429: | |
retry_after = int(e.response.headers.get('Retry-After', 5)) | |
logger.warning(f"Rate limited. Waiting {retry_after} seconds...") | |
time.sleep(retry_after) | |
logger.error(f"Download failed: {str(e)}") | |
return None | |
except Exception as e: | |
logger.error(f"Unexpected download error: {str(e)}") | |
return None | |
def download_video_segment(url: str, duration: float, output_path: str) -> bool: | |
"""Descarga y procesa un segmento de video""" | |
temp_path = None | |
try: | |
temp_path = safe_download(url) | |
if not temp_path: | |
return False | |
with VideoFileClip(temp_path) as clip: | |
if clip.duration < 1: | |
logger.error("Video demasiado corto") | |
return False | |
end_time = min(duration, clip.duration - 0.1) | |
subclip = clip.subclip(0, end_time) | |
# Configuraci贸n optimizada para HF Spaces | |
subclip.write_videofile( | |
output_path, | |
codec="libx264", | |
audio_codec="aac", | |
threads=2, | |
preset='ultrafast', | |
verbose=False, | |
ffmpeg_params=[ | |
'-max_muxing_queue_size', '1024', | |
'-movflags', '+faststart' | |
] | |
) | |
return True | |
except Exception as e: | |
logger.error(f"Video processing error: {str(e)}") | |
return False | |
finally: | |
if temp_path and os.path.exists(temp_path): | |
os.remove(temp_path) | |
def fetch_pexels_videos(query: str) -> List[str]: | |
"""Busca videos en Pexels con manejo de errores""" | |
if not PEXELS_API_KEY: | |
logger.error("PEXELS_API_KEY no configurada") | |
return [] | |
headers = {"Authorization": PEXELS_API_KEY} | |
url = f"https://api.pexels.com/videos/search?query={query}&per_page={MAX_VIDEOS}" | |
try: | |
response = requests.get(url, headers=headers, timeout=REQUEST_TIMEOUT) | |
response.raise_for_status() | |
videos = [] | |
for video in response.json().get("videos", [])[:MAX_VIDEOS]: | |
video_files = [vf for vf in video.get("video_files", []) | |
if vf.get("width", 0) >= 720] # Calidad m铆nima | |
if video_files: | |
best_file = max(video_files, key=lambda x: x.get("width", 0)) | |
videos.append(best_file["link"]) | |
return videos | |
except Exception as e: | |
logger.error(f"Error fetching Pexels videos: {str(e)}") | |
return [] | |
def generate_script(prompt: str) -> str: | |
"""Genera un script usando IA local con fallback""" | |
try: | |
generator = pipeline("text-generation", model=MODEL_NAME) | |
result = generator( | |
f"Genera un guion breve sobre {prompt} en espa帽ol con {MAX_VIDEOS} puntos:", | |
max_length=200, | |
num_return_sequences=1 | |
)[0]['generated_text'] | |
return result | |
except Exception as e: | |
logger.error(f"Error generating script: {str(e)}") | |
return f"1. Punto uno sobre {prompt}\n2. Punto dos\n3. Punto tres" | |
async def generate_voice(text: str, output_file: str = "voice.mp3") -> bool: | |
"""Genera narraci贸n de voz con manejo de errores""" | |
try: | |
communicate = edge_tts.Communicate(text, voice="es-MX-DaliaNeural") | |
await communicate.save(output_file) | |
return True | |
except Exception as e: | |
logger.error(f"Voice generation failed: {str(e)}") | |
return False | |
def run_async(coro): | |
"""Ejecuta corrutinas as铆ncronas desde c贸digo s铆ncrono""" | |
import asyncio | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
try: | |
return loop.run_until_complete(coro) | |
finally: | |
loop.close() | |
def create_video(prompt: str) -> Optional[str]: | |
"""Funci贸n principal para crear el video""" | |
try: | |
# 1. Generar contenido | |
script = generate_script(prompt) | |
logger.info(f"Script generado: {script[:100]}...") | |
# 2. Buscar videos | |
video_urls = fetch_pexels_videos(prompt) | |
if not video_urls: | |
logger.error("No se encontraron videos") | |
return None | |
# 3. Generar voz | |
voice_file = "voice.mp3" | |
if not run_async(generate_voice(script, voice_file)): | |
logger.error("No se pudo generar voz") | |
return None | |
# 4. Procesar videos | |
output_dir = "output" | |
os.makedirs(output_dir, exist_ok=True) | |
output_path = os.path.join(output_dir, f"video_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp4") | |
clips = [] | |
segment_duration = VIDEO_SEGMENT_DURATION | |
for i, url in enumerate(video_urls): | |
clip_path = f"segment_{i}.mp4" | |
if download_video_segment(url, segment_duration, clip_path): | |
clips.append(VideoFileClip(clip_path)) | |
if not clips: | |
logger.error("No se pudieron procesar los videos") | |
return None | |
# 5. Ensamblar video final | |
final_video = concatenate_videoclips(clips, method="compose") | |
audio_clip = AudioFileClip(voice_file) | |
final_video = final_video.set_audio(audio_clip) | |
final_video.write_videofile( | |
output_path, | |
codec="libx264", | |
audio_codec="aac", | |
threads=2, | |
preset='ultrafast', | |
verbose=False | |
) | |
return output_path | |
except Exception as e: | |
logger.error(f"Error creating video: {str(e)}") | |
return None | |
finally: | |
# Limpieza | |
for clip in clips: | |
clip.close() | |
if os.path.exists(voice_file): | |
os.remove(voice_file) | |
for i in range(len(video_urls)): | |
if os.path.exists(f"segment_{i}.mp4"): | |
os.remove(f"segment_{i}.mp4") | |
# Interfaz Gradio optimizada | |
with gr.Blocks(title="Generador de Videos HF", theme=gr.themes.Soft()) as app: | |
gr.Markdown("# 馃帴 Generador Autom谩tico de Videos") | |
with gr.Row(): | |
with gr.Column(): | |
prompt_input = gr.Textbox( | |
label="Tema del video", | |
placeholder="Ej: Paisajes naturales de Chile", | |
max_lines=2 | |
) | |
generate_btn = gr.Button("Generar Video", variant="primary") | |
with gr.Column(): | |
output_video = gr.Video(label="Resultado", interactive=False) | |
generate_btn.click( | |
fn=create_video, | |
inputs=prompt_input, | |
outputs=output_video | |
) | |
# Para Hugging Face Spaces | |
if __name__ == "__main__": | |
app.launch(server_name="0.0.0.0", server_port=7860) |