Spaces:
Sleeping
Sleeping
import os | |
import random | |
import requests | |
import gradio as gr | |
from moviepy.editor import VideoFileClip, concatenate_videoclips, AudioFileClip, CompositeVideoClip | |
from moviepy.audio.fx.all import audio_loop | |
import edge_tts | |
import asyncio | |
from datetime import datetime | |
from pathlib import Path | |
from transformers import pipeline | |
# Pexels API key from environment variable | |
PEXELS_API_KEY = os.getenv("PEXELS_API_KEY") | |
# Ensure asyncio works with Gradio | |
def run_async(coro): | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
result = loop.run_until_complete(coro) | |
loop.close() | |
return result | |
# Load lightweight text generation model | |
try: | |
generator = pipeline("text-generation", model="distilbert/distilgpt2", device="cpu") | |
except Exception as e: | |
print(f"Error loading model: {e}") | |
generator = None | |
# Fetch videos from Pexels | |
def fetch_pexels_videos(query, num_videos=5): | |
headers = {"Authorization": PEXELS_API_KEY} | |
url = f"https://api.pexels.com/videos/search?query={query}&per_page={num_videos}" | |
response = requests.get(url, headers=headers) | |
if response.status_code == 200: | |
return [video["video_files"][0]["link"] for video in response.json()["videos"]] | |
return [] | |
# Generate script using local model or custom text | |
def generate_script(prompt, custom_text=None): | |
if custom_text: | |
return custom_text.strip() | |
if not prompt: | |
return "Error: Debes proporcionar un prompt o un guion personalizado." | |
# Generate script with local model | |
if generator: | |
input_text = f"Generate a script for a video about '{prompt}'. Create a numbered list with short descriptions (max 20 words per item) for a top 10 related to the topic." | |
try: | |
result = generator(input_text, max_length=300, num_return_sequences=1, do_sample=True, truncation=True)[0]['generated_text'] | |
return result.strip() | |
except Exception as e: | |
print(f"Error generating script: {e}") | |
# Fallback mock response | |
if "recetas" in prompt.lower(): | |
return """ | |
1. Tacos al pastor: Jugosa carne marinada con piña. | |
2. Lasagna: Capas de pasta, carne y queso fundido. | |
3. Sushi: Arroz y pescado fresco en rollos delicados. | |
4. Pizza casera: Masa crujiente con tus ingredientes favoritos. | |
5. Paella: Arroz con mariscos y azafrán. | |
6. Ceviche: Pescado fresco marinado en limón. | |
7. Ramen: Caldo rico con fideos y cerdo. | |
8. Tiramisú: Postre cremoso con café y mascarpone. | |
9. Enchiladas: Tortillas rellenas con salsa picante. | |
10. Curry: Especias intensas con carne o vegetales. | |
""" | |
return f"Top 10 sobre {prompt}: No se pudo generar un guion específico." | |
# Generate voice using Edge TTS | |
async def generate_voice(text, output_file="output.mp3"): | |
communicate = edge_tts.Communicate(text, voice="es-MX-DaliaNeural") | |
await communicate.save(output_file) | |
return output_file | |
# Download and trim video | |
def download_and_trim_video(url, duration, output_path): | |
response = requests.get(url, stream=True) | |
with open("temp_video.mp4", "wb") as f: | |
for chunk in response.iter_content(chunk_size=1024): | |
f.write(chunk) | |
clip = VideoFileClip("temp_video.mp4").subclip(0, min(duration, VideoFileClip("temp_video.mp4").duration)) | |
clip.write_videofile(output_path, codec="libx264", audio_codec="aac") | |
clip.close() | |
os.remove("temp_video.mp4") | |
return output_path | |
# Main video creation function | |
def create_video(prompt, custom_text, music_file): | |
output_dir = "output_videos" | |
os.makedirs(output_dir, exist_ok=True) | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
output_video = f"{output_dir}/video_{timestamp}.mp4" | |
# Generate or use provided script | |
script = generate_script(prompt, custom_text) | |
if "Error" in script: | |
return script | |
# Generate voice | |
voice_file = "temp_audio.mp3" | |
run_async(generate_voice(script, voice_file)) | |
audio = AudioFileClip(voice_file) | |
video_duration = audio.duration | |
# Fetch Pexels videos | |
query = prompt.split()[0] if prompt else "generic" | |
video_urls = fetch_pexels_videos(query, num_videos=5) | |
if not video_urls: | |
return "Error: No se encontraron videos en Pexels." | |
# Download and trim videos | |
clips = [] | |
for i, url in enumerate(video_urls): | |
clip_path = f"temp_clip_{i}.mp4" | |
download_and_trim_video(url, video_duration / len(video_urls), clip_path) | |
clips.append(VideoFileClip(clip_path)) | |
# Concatenate video clips | |
final_clip = concatenate_videoclips(clips, method="compose").set_duration(video_duration) | |
# Add looped music | |
if music_file: | |
music = AudioFileClip(music_file.name) | |
music = audio_loop(music, duration=video_duration) | |
final_clip = final_clip.set_audio(music.set_duration(video_duration)) | |
else: | |
final_clip = final_clip.set_audio(audio) | |
# Write final video | |
final_clip.write_videofile(output_video, codec="libx264", audio_codec="aac", fps=24) | |
# Clean up | |
for clip in clips: | |
clip.close() | |
audio.close() | |
if music_file: | |
music.close() | |
final_clip.close() | |
os.remove(voice_file) | |
for i in range(len(video_urls)): | |
os.remove(f"temp_clip_{i}.mp4") | |
return output_video | |
# Gradio interface | |
with gr.Blocks() as demo: | |
gr.Markdown("# Generador de Videos") | |
gr.Markdown("Ingresa un prompt (ej., 'Top 10 recetas más ricas') o un guion personalizado. Sube música opcional (MP3).") | |
prompt = gr.Textbox(label="Prompt", placeholder="Ejemplo: Top 10 recetas más ricas") | |
custom_text = gr.Textbox(label="Guion Personalizado", placeholder="Ingresa tu propio guion aquí (opcional)") | |
music_file = gr.File(label="Subir Música (MP3, opcional)", file_types=[".mp3"]) | |
submit = gr.Button("Generar Video") | |
output = gr.Video(label="Video Generado") | |
submit.click(fn=create_video, inputs=[prompt, custom_text, music_file], outputs=output) | |
demo.launch(share=True) |