# app.py import os import sys import time import gradio as gr import spaces from huggingface_hub import snapshot_download from huggingface_hub.utils import GatedRepoError, RepositoryNotFoundError, RevisionNotFoundError from pathlib import Path import tempfile from pydub import AudioSegment import cv2 import numpy as np from scipy import interpolate # Add the src directory to the system path to allow for local imports sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), 'src'))) from models.inference.moda_test import LiveVASAPipeline, emo_map, set_seed # --- Configuration --- # Set seed for reproducibility set_seed(42) # Paths and constants for the Gradio demo DEFAULT_CFG_PATH = "configs/audio2motion/inference/inference.yaml" DEFAULT_MOTION_MEAN_STD_PATH = "src/datasets/mean.pt" DEFAULT_SILENT_AUDIO_PATH = "src/examples/silent-audio.wav" OUTPUT_DIR = "gradio_output" WEIGHTS_DIR = "pretrain_weights" REPO_ID = "lixinyizju/moda" # --- Download Pre-trained Weights from Hugging Face Hub --- def download_weights(): """ Downloads pre-trained weights from Hugging Face Hub if they don't exist locally. """ # A simple check for a key file to see if the download is likely complete motion_model_file = os.path.join(WEIGHTS_DIR, "moda", "net-200.pth") if not os.path.exists(motion_model_file): print(f"Weights not found locally. Downloading from Hugging Face Hub repo '{REPO_ID}'...") print(f"This may take a while depending on your internet connection.") try: snapshot_download( repo_id=REPO_ID, local_dir=WEIGHTS_DIR, local_dir_use_symlinks=False, # Use False to copy files directly; safer for Windows resume_download=True, ) print("Weights downloaded successfully.") except GatedRepoError: raise gr.Error(f"Access to the repository '{REPO_ID}' is gated. Please visit https://huggingface.co/{REPO_ID} to request access.") except (RepositoryNotFoundError, RevisionNotFoundError): raise gr.Error(f"The repository '{REPO_ID}' was not found. Please check the repository ID.") except Exception as e: print(f"An error occurred during download: {e}") raise gr.Error(f"Failed to download models. Please check your internet connection and try again. Error: {e}") else: print(f"Found existing weights at '{WEIGHTS_DIR}'. Skipping download.") # --- Audio Conversion Function --- def ensure_wav_format(audio_path): """ Ensures the audio file is in WAV format. If not, converts it to WAV. Returns the path to the WAV file (either original or converted). """ if audio_path is None: return None audio_path = Path(audio_path) # Check if already WAV if audio_path.suffix.lower() == '.wav': print(f"Audio is already in WAV format: {audio_path}") return str(audio_path) # Convert to WAV print(f"Converting audio from {audio_path.suffix} to WAV format...") try: # Load the audio file audio = AudioSegment.from_file(audio_path) # Create a temporary WAV file with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp_file: wav_path = tmp_file.name # Export as WAV with higher sampling rate for better quality audio.export( wav_path, format='wav', parameters=["-ar", "24000", "-ac", "1"] # 24kHz, mono for better lip-sync ) print(f"Audio converted successfully to: {wav_path}") return wav_path except Exception as e: print(f"Error converting audio: {e}") raise gr.Error(f"Failed to convert audio file to WAV format. Error: {e}") # --- Frame Interpolation Function --- def interpolate_frames(video_path, target_fps=30): """ Interpolates frames in a video to achieve smoother motion. Args: video_path: Path to the input video target_fps: Target frames per second Returns: Path to the interpolated video """ try: video_path = str(video_path) cap = cv2.VideoCapture(video_path) # Get original video properties original_fps = cap.get(cv2.CAP_PROP_FPS) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) print(f"Original FPS: {original_fps}, Target FPS: {target_fps}") # If target FPS is not higher, return original if original_fps >= target_fps: cap.release() print("Target FPS is not higher than original. Skipping interpolation.") return video_path # Read all frames frames = [] while True: ret, frame = cap.read() if not ret: break frames.append(frame) cap.release() if len(frames) < 2: print("Not enough frames for interpolation.") return video_path # Calculate interpolation factor interpolation_factor = int(target_fps / original_fps) interpolated_frames = [] print(f"Interpolating with factor: {interpolation_factor}") # Perform frame interpolation for i in range(len(frames) - 1): interpolated_frames.append(frames[i]) # Generate intermediate frames for j in range(1, interpolation_factor): alpha = j / interpolation_factor # Use weighted average for simple interpolation interpolated_frame = cv2.addWeighted( frames[i], 1 - alpha, frames[i + 1], alpha, 0 ) interpolated_frames.append(interpolated_frame) # Add the last frame interpolated_frames.append(frames[-1]) # Save the interpolated video output_path = video_path.replace('.mp4', '_interpolated.mp4') fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(output_path, fourcc, target_fps, (width, height)) for frame in interpolated_frames: out.write(frame) out.release() print(f"Interpolated video saved to: {output_path}") return output_path except Exception as e: print(f"Error during frame interpolation: {e}") return video_path # Return original if interpolation fails # --- Initialization --- # Create output directory if it doesn't exist os.makedirs(OUTPUT_DIR, exist_ok=True) # Download weights before initializing the pipeline download_weights() # Instantiate the pipeline once to avoid reloading models on every request print("Initializing MoDA pipeline...") try: pipeline = LiveVASAPipeline( cfg_path=DEFAULT_CFG_PATH, motion_mean_std_path=DEFAULT_MOTION_MEAN_STD_PATH ) print("MoDA pipeline initialized successfully.") except Exception as e: print(f"Error initializing pipeline: {e}") pipeline = None # Invert the emo_map for easy lookup from the dropdown value emo_name_to_id = {v: k for k, v in emo_map.items()} # --- Core Generation Function --- @spaces.GPU(duration=180) # Increased duration for smoothing and interpolation def generate_motion(source_image_path, driving_audio_path, emotion_name, cfg_scale, smooth_enabled, target_fps, progress=gr.Progress(track_tqdm=True)): """ The main function that takes Gradio inputs and generates the talking head video. Args: source_image_path: Path to the source image driving_audio_path: Path to the driving audio emotion_name: Selected emotion cfg_scale: CFG scale for generation smooth_enabled: Whether to enable smoothing target_fps: Target frames per second for interpolation """ if pipeline is None: raise gr.Error("Pipeline failed to initialize. Check the console logs for details.") if source_image_path is None: raise gr.Error("Please upload a source image.") if driving_audio_path is None: raise gr.Error("Please upload a driving audio file.") start_time = time.time() # Ensure audio is in WAV format with optimal sampling rate wav_audio_path = ensure_wav_format(driving_audio_path) temp_wav_created = wav_audio_path != driving_audio_path # Create a unique subdirectory for this run timestamp = time.strftime("%Y%m%d-%H%M%S") run_output_dir = os.path.join(OUTPUT_DIR, timestamp) os.makedirs(run_output_dir, exist_ok=True) # Get emotion ID from its name emotion_id = emo_name_to_id.get(emotion_name, 8) # Default to 'None' (ID 8) if not found print(f"Starting generation with the following parameters:") print(f" Source Image: {source_image_path}") print(f" Driving Audio (original): {driving_audio_path}") print(f" Driving Audio (WAV): {wav_audio_path}") print(f" Emotion: {emotion_name} (ID: {emotion_id})") print(f" CFG Scale: {cfg_scale}") print(f" Smoothing: {smooth_enabled}") print(f" Target FPS: {target_fps}") try: # Temporarily disable smoothing if it causes CUDA tensor issues # Check if smooth causes issues and handle gracefully try: # Try with smoothing first result_video_path = pipeline.driven_sample( image_path=source_image_path, audio_path=wav_audio_path, cfg_scale=float(cfg_scale), emo=emotion_id, save_dir=".", smooth=smooth_enabled, # Use the checkbox value silent_audio_path=DEFAULT_SILENT_AUDIO_PATH, ) except TypeError as tensor_error: if "can't convert cuda" in str(tensor_error) and smooth_enabled: print("Warning: Smoothing caused CUDA tensor error. Retrying without smoothing...") # Retry without smoothing result_video_path = pipeline.driven_sample( image_path=source_image_path, audio_path=wav_audio_path, cfg_scale=float(cfg_scale), emo=emotion_id, save_dir=".", smooth=False, # Disable smoothing as fallback silent_audio_path=DEFAULT_SILENT_AUDIO_PATH, ) print("Generated video without smoothing due to technical limitations.") else: raise tensor_error # Apply frame interpolation if requested if target_fps > 24: # Assuming default is around 24 FPS print(f"Applying frame interpolation to achieve {target_fps} FPS...") result_video_path = interpolate_frames(result_video_path, target_fps=target_fps) except Exception as e: print(f"An error occurred during video generation: {e}") import traceback traceback.print_exc() raise gr.Error(f"An unexpected error occurred: {str(e)}. Please check the console for details.") finally: # Clean up temporary WAV file if created if temp_wav_created and os.path.exists(wav_audio_path): try: os.remove(wav_audio_path) print(f"Cleaned up temporary WAV file: {wav_audio_path}") except Exception as e: print(f"Warning: Could not delete temporary file {wav_audio_path}: {e}") end_time = time.time() processing_time = end_time - start_time result_video_path = Path(result_video_path) final_path = result_video_path.with_name(f"final_{result_video_path.stem}{result_video_path.suffix}") print(f"Video generated successfully at: {final_path}") print(f"Processing time: {processing_time:.2f} seconds.") return final_path # --- Gradio UI Definition --- with gr.Blocks(theme=gr.themes.Soft(), css=".gradio-container {max-width: 960px !important; margin: 0 auto !important}") as demo: gr.HTML( """
Tips for best results:
• Use high-quality front-facing images
• Clear audio without background noise
• Enable smoothing for natural motion
• Adjust CFG scale if motion seems stiff