Spaces:
Runtime error
Runtime error
from flask import Flask, request, jsonify, send_file | |
import os | |
import base64 | |
import json | |
import uuid | |
import tempfile | |
import logging | |
from pathlib import Path | |
from typing import List, Dict, Any, Optional | |
import cv2 | |
import numpy as np | |
from PIL import Image | |
import torch | |
from transformers import pipeline | |
from moviepy.editor import VideoFileClip, AudioFileClip, concatenate_videoclips, ImageClip | |
import requests | |
from io import BytesIO | |
import threading | |
import time | |
# Set up logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
app = Flask(__name__) | |
class HuggingFaceVideoGenerator: | |
def __init__(self, huggingface_token: Optional[str] = None): | |
""" | |
Initialize the Hugging Face Video Generator | |
Args: | |
huggingface_token: Optional Hugging Face API token | |
""" | |
self.hf_token = huggingface_token | |
self.jobs = {} # Store processing jobs | |
if huggingface_token: | |
os.environ["HUGGINGFACE_HUB_TOKEN"] = huggingface_token | |
# Initialize Hugging Face pipelines | |
self._init_pipelines() | |
# Create output directory | |
self.output_dir = Path("generated_videos") | |
self.output_dir.mkdir(exist_ok=True) | |
def _init_pipelines(self): | |
"""Initialize Hugging Face pipelines""" | |
try: | |
# Text-to-Speech pipeline | |
self.tts_pipeline = pipeline( | |
"text-to-speech", | |
model="microsoft/speecht5_tts", | |
device=0 if torch.cuda.is_available() else -1 | |
) | |
logger.info("TTS pipeline initialized") | |
except Exception as e: | |
logger.warning(f"Could not initialize TTS pipeline: {e}") | |
self.tts_pipeline = None | |
try: | |
# Text-to-Image pipeline (for generating images from text) | |
self.text_to_image = pipeline( | |
"text-to-image", | |
model="runwayml/stable-diffusion-v1-5", | |
device=0 if torch.cuda.is_available() else -1 | |
) | |
logger.info("Text-to-Image pipeline initialized") | |
except Exception as e: | |
logger.warning(f"Could not initialize Text-to-Image pipeline: {e}") | |
self.text_to_image = None | |
def download_image_from_url(self, url: str) -> np.ndarray: | |
"""Download and process image from URL""" | |
try: | |
response = requests.get(url, timeout=10) | |
response.raise_for_status() | |
image = Image.open(BytesIO(response.content)) | |
# Convert to RGB if needed | |
if image.mode != 'RGB': | |
image = image.convert('RGB') | |
# Convert to OpenCV format | |
opencv_image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) | |
return opencv_image | |
except Exception as e: | |
logger.error(f"Error downloading image from {url}: {e}") | |
raise | |
def decode_base64_image(self, base64_string: str) -> np.ndarray: | |
"""Decode base64 image string""" | |
try: | |
# Remove data URL prefix if present | |
if ',' in base64_string: | |
base64_string = base64_string.split(',')[1] | |
image_data = base64.b64decode(base64_string) | |
image = Image.open(BytesIO(image_data)) | |
# Convert to RGB if needed | |
if image.mode != 'RGB': | |
image = image.convert('RGB') | |
# Convert to OpenCV format | |
opencv_image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) | |
return opencv_image | |
except Exception as e: | |
logger.error(f"Error decoding base64 image: {e}") | |
raise | |
def generate_image_from_text(self, prompt: str) -> np.ndarray: | |
"""Generate image from text prompt using Hugging Face""" | |
if not self.text_to_image: | |
raise ValueError("Text-to-Image pipeline not available") | |
try: | |
logger.info(f"Generating image from prompt: {prompt}") | |
result = self.text_to_image(prompt) | |
# Convert PIL image to OpenCV format | |
if hasattr(result, 'images'): | |
pil_image = result.images[0] | |
else: | |
pil_image = result | |
opencv_image = cv2.cvtColor(np.array(pil_image), cv2.COLOR_RGB2BGR) | |
return opencv_image | |
except Exception as e: | |
logger.error(f"Error generating image from text: {e}") | |
raise | |
def process_images_data(self, images_data: List[Dict]) -> List[np.ndarray]: | |
"""Process various image data formats""" | |
processed_images = [] | |
for img_data in images_data: | |
try: | |
if 'url' in img_data: | |
# Download from URL | |
image = self.download_image_from_url(img_data['url']) | |
processed_images.append(image) | |
elif 'base64' in img_data: | |
# Decode base64 | |
image = self.decode_base64_image(img_data['base64']) | |
processed_images.append(image) | |
elif 'text_prompt' in img_data and self.text_to_image: | |
# Generate from text | |
image = self.generate_image_from_text(img_data['text_prompt']) | |
processed_images.append(image) | |
else: | |
logger.warning(f"Unsupported image data format: {img_data.keys()}") | |
except Exception as e: | |
logger.error(f"Error processing image data: {e}") | |
continue | |
return processed_images | |
def create_video_from_images( | |
self, | |
images: List[np.ndarray], | |
output_path: str, | |
fps: int = 30, | |
duration_per_image: float = 2.0, | |
transition_duration: float = 0.5, | |
resolution: tuple = (1920, 1080), | |
transition_type: str = "fade" | |
) -> str: | |
"""Create video from processed images""" | |
logger.info(f"Creating video from {len(images)} images") | |
if not images: | |
raise ValueError("No images provided") | |
# Create clips from images | |
clips = [] | |
for i, img in enumerate(images): | |
# Resize image | |
img_resized = cv2.resize(img, resolution) | |
# Convert BGR to RGB for moviepy | |
img_rgb = cv2.cvtColor(img_resized, cv2.COLOR_BGR2RGB) | |
# Create temporary file for image | |
with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as f: | |
Image.fromarray(img_rgb).save(f.name) | |
temp_img_path = f.name | |
# Create image clip | |
clip = ImageClip(temp_img_path, duration=duration_per_image) | |
# Add transition effect | |
if transition_type == "fade" and i > 0: | |
clip = clip.fadein(transition_duration) | |
if i < len(images) - 1: | |
clip = clip.fadeout(transition_duration) | |
clips.append(clip) | |
# Clean up temp file | |
try: | |
os.unlink(temp_img_path) | |
except: | |
pass | |
# Concatenate clips | |
if transition_type == "fade" and len(clips) > 1: | |
# Overlap clips for smooth transitions | |
final_clips = [clips[0]] | |
for clip in clips[1:]: | |
final_clips.append(clip.set_start(final_clips[-1].end - transition_duration)) | |
final_video = CompositeVideoClip(final_clips) | |
else: | |
final_video = concatenate_videoclips(clips) | |
# Write video | |
final_video.write_videofile( | |
output_path, | |
fps=fps, | |
codec='libx264', | |
audio_codec='aac' if hasattr(final_video, 'audio') and final_video.audio else None | |
) | |
# Clean up | |
final_video.close() | |
for clip in clips: | |
clip.close() | |
logger.info(f"Video created: {output_path}") | |
return output_path | |
def generate_tts_audio(self, text: str) -> str: | |
"""Generate TTS audio""" | |
if not self.tts_pipeline: | |
raise ValueError("TTS pipeline not available") | |
logger.info("Generating TTS audio") | |
try: | |
# Generate audio | |
audio_data = self.tts_pipeline(text) | |
# Save to temporary file | |
import soundfile as sf | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as f: | |
sf.write(f.name, audio_data["audio"], audio_data["sampling_rate"]) | |
return f.name | |
except Exception as e: | |
logger.error(f"Error generating TTS: {e}") | |
raise | |
def add_audio_to_video( | |
self, | |
video_path: str, | |
audio_path: str, | |
output_path: str, | |
audio_volume: float = 1.0 | |
) -> str: | |
"""Add audio to video""" | |
logger.info("Adding audio to video") | |
try: | |
video = VideoFileClip(video_path) | |
audio = AudioFileClip(audio_path) | |
# Adjust volume | |
if audio_volume != 1.0: | |
audio = audio.volumex(audio_volume) | |
# Match durations | |
if audio.duration > video.duration: | |
audio = audio.subclip(0, video.duration) | |
elif audio.duration < video.duration: | |
loops = int(video.duration / audio.duration) + 1 | |
audio = audio.loop(loops).subclip(0, video.duration) | |
# Combine | |
final_video = video.set_audio(audio) | |
final_video.write_videofile(output_path, codec='libx264', audio_codec='aac') | |
# Clean up | |
video.close() | |
audio.close() | |
final_video.close() | |
return output_path | |
except Exception as e: | |
logger.error(f"Error adding audio to video: {e}") | |
raise | |
def process_video_request(self, job_id: str, request_data: Dict[str, Any]): | |
"""Process video generation request in background""" | |
try: | |
self.jobs[job_id]['status'] = 'processing' | |
self.jobs[job_id]['progress'] = 0 | |
# Extract parameters | |
images_data = request_data.get('images', []) | |
video_params = request_data.get('video_params', {}) | |
audio_params = request_data.get('audio_params', {}) | |
# Process images | |
self.jobs[job_id]['progress'] = 20 | |
images = self.process_images_data(images_data) | |
if not images: | |
raise ValueError("No valid images processed") | |
# Create video | |
self.jobs[job_id]['progress'] = 50 | |
video_output = self.output_dir / f"{job_id}_video.mp4" | |
self.create_video_from_images( | |
images=images, | |
output_path=str(video_output), | |
fps=video_params.get('fps', 30), | |
duration_per_image=video_params.get('duration_per_image', 2.0), | |
transition_duration=video_params.get('transition_duration', 0.5), | |
resolution=tuple(video_params.get('resolution', [1920, 1080])), | |
transition_type=video_params.get('transition_type', 'fade') | |
) | |
# Add audio if requested | |
final_output = video_output | |
if audio_params.get('text') and self.tts_pipeline: | |
self.jobs[job_id]['progress'] = 80 | |
audio_path = self.generate_tts_audio(audio_params['text']) | |
final_output = self.output_dir / f"{job_id}_final.mp4" | |
self.add_audio_to_video( | |
video_path=str(video_output), | |
audio_path=audio_path, | |
output_path=str(final_output), | |
audio_volume=audio_params.get('volume', 1.0) | |
) | |
# Clean up | |
try: | |
os.unlink(audio_path) | |
os.unlink(str(video_output)) | |
except: | |
pass | |
# Update job status | |
self.jobs[job_id]['status'] = 'completed' | |
self.jobs[job_id]['progress'] = 100 | |
self.jobs[job_id]['output_file'] = str(final_output) | |
self.jobs[job_id]['download_url'] = f"/download/{job_id}" | |
logger.info(f"Job {job_id} completed successfully") | |
except Exception as e: | |
logger.error(f"Job {job_id} failed: {e}") | |
self.jobs[job_id]['status'] = 'failed' | |
self.jobs[job_id]['error'] = str(e) | |
# Initialize generator | |
generator = HuggingFaceVideoGenerator( | |
huggingface_token=os.getenv('HUGGINGFACE_TOKEN') | |
) | |
def generate_video(): | |
"""Main endpoint to receive data from n8n and generate video""" | |
try: | |
data = request.get_json() | |
if not data: | |
return jsonify({'error': 'No JSON data provided'}), 400 | |
# Validate required fields | |
if 'images' not in data or not data['images']: | |
return jsonify({'error': 'No images data provided'}), 400 | |
# Generate unique job ID | |
job_id = str(uuid.uuid4()) | |
# Initialize job | |
generator.jobs[job_id] = { | |
'status': 'queued', | |
'progress': 0, | |
'created_at': time.time() | |
} | |
# Start processing in background | |
thread = threading.Thread( | |
target=generator.process_video_request, | |
args=(job_id, data) | |
) | |
thread.daemon = True | |
thread.start() | |
return jsonify({ | |
'job_id': job_id, | |
'status': 'queued', | |
'status_url': f"/status/{job_id}", | |
'message': 'Video generation started' | |
}) | |
except Exception as e: | |
logger.error(f"Error in generate_video: {e}") | |
return jsonify({'error': str(e)}), 500 | |
def get_job_status(job_id): | |
"""Get job status and progress""" | |
if job_id not in generator.jobs: | |
return jsonify({'error': 'Job not found'}), 404 | |
job = generator.jobs[job_id] | |
response = { | |
'job_id': job_id, | |
'status': job['status'], | |
'progress': job['progress'] | |
} | |
if job['status'] == 'completed': | |
response['download_url'] = job.get('download_url') | |
elif job['status'] == 'failed': | |
response['error'] = job.get('error') | |
return jsonify(response) | |
def download_video(job_id): | |
"""Download generated video""" | |
if job_id not in generator.jobs: | |
return jsonify({'error': 'Job not found'}), 404 | |
job = generator.jobs[job_id] | |
if job['status'] != 'completed': | |
return jsonify({'error': 'Job not completed'}), 400 | |
output_file = job.get('output_file') | |
if not output_file or not os.path.exists(output_file): | |
return jsonify({'error': 'Output file not found'}), 404 | |
return send_file( | |
output_file, | |
as_attachment=True, | |
download_name=f"generated_video_{job_id}.mp4" | |
) | |
def health_check(): | |
"""Health check endpoint""" | |
return jsonify({ | |
'status': 'healthy', | |
'tts_available': generator.tts_pipeline is not None, | |
'text_to_image_available': generator.text_to_image is not None | |
}) | |
def index(): | |
"""API documentation""" | |
return jsonify({ | |
'message': 'Hugging Face Video Generator API', | |
'endpoints': { | |
'POST /generate_video': 'Generate video from images and audio', | |
'GET /status/<job_id>': 'Get job status', | |
'GET /download/<job_id>': 'Download generated video', | |
'GET /health': 'Health check' | |
}, | |
'example_request': { | |
'images': [ | |
{'url': 'https://example.com/image1.jpg'}, | |
{'base64': 'data:image/jpeg;base64,/9j/4AAQ...'}, | |
{'text_prompt': 'A beautiful sunset over mountains'} | |
], | |
'video_params': { | |
'fps': 30, | |
'duration_per_image': 3.0, | |
'transition_duration': 0.5, | |
'resolution': [1920, 1080], | |
'transition_type': 'fade' | |
}, | |
'audio_params': { | |
'text': 'Welcome to our video presentation', | |
'volume': 1.0 | |
} | |
} | |
}) | |
if __name__ == '__main__': | |
# Run the Flask server | |
app.run( | |
host='0.0.0.0', | |
port=int(os.getenv('PORT', 5000)), | |
debug=os.getenv('DEBUG', 'false').lower() == 'true' | |
) |