Spaces:
Running
on
Zero
Running
on
Zero
import gradio as gr | |
import torch | |
import numpy as np | |
import tempfile | |
import os | |
import spaces | |
from diffusers import LTXLatentUpsamplePipeline | |
from pipeline_ltx_condition_control import LTXConditionPipeline | |
from diffusers.utils import export_to_video, load_video | |
from torchvision import transforms | |
import random | |
from controlnet_aux import CannyDetector | |
# from image_gen_aux import DepthPreprocessor | |
# import mediapipe as mp | |
from PIL import Image | |
import cv2 | |
dtype = torch.bfloat16 | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
#pipeline = LTXConditionPipeline.from_pretrained("Lightricks/LTX-Video-0.9.7-dev", torch_dtype=dtype) | |
pipeline = LTXConditionPipeline.from_pretrained("Lightricks/LTX-Video-0.9.7-distilled", torch_dtype=torch.bfloat16) | |
pipe_upsample = LTXLatentUpsamplePipeline.from_pretrained("Lightricks/ltxv-spatial-upscaler-0.9.7", vae=pipeline.vae, torch_dtype=dtype) | |
pipeline.to(device) | |
pipe_upsample.to(device) | |
pipeline.vae.enable_tiling() | |
CONTROL_LORAS = { | |
"canny": { | |
"repo": "Lightricks/LTX-Video-ICLoRA-canny-13b-0.9.7", | |
"weight_name": "ltxv-097-ic-lora-canny-control-diffusers.safetensors", | |
"adapter_name": "canny_lora" | |
}, | |
"depth": { | |
"repo": "Lightricks/LTX-Video-ICLoRA-depth-13b-0.9.7", | |
"weight_name": "ltxv-097-ic-lora-depth-control-diffusers.safetensors", | |
"adapter_name": "depth_lora" | |
}, | |
"pose": { | |
"repo": "Lightricks/LTX-Video-ICLoRA-pose-13b-0.9.7", | |
"weight_name": "ltxv-097-ic-lora-pose-control-diffusers.safetensors", | |
"adapter_name": "pose_lora" | |
} | |
} | |
# load canny lora | |
pipeline.load_lora_weights( | |
CONTROL_LORAS["canny"]["repo"], | |
weight_name=CONTROL_LORAS["canny"]["weight_name"], | |
adapter_name=CONTROL_LORAS["canny"]["adapter_name"] | |
) | |
pipeline.set_adapters([CONTROL_LORAS["canny"]["adapter_name"]], adapter_weights=[1.0]) | |
# Initialize MediaPipe pose estimation | |
# mp_drawing = mp.solutions.drawing_utils | |
# mp_drawing_styles = mp.solutions.drawing_styles | |
# mp_pose = mp.solutions.pose | |
canny_processor = CannyDetector() | |
def read_video(video) -> torch.Tensor: | |
""" | |
Reads a video file and converts it into a torch.Tensor with the shape [F, C, H, W]. | |
""" | |
to_tensor_transform = transforms.ToTensor() | |
video_tensor = torch.stack([to_tensor_transform(img) for img in video]) | |
return video_tensor | |
def round_to_nearest_resolution_acceptable_by_vae(height, width, vae_temporal_compression_ratio): | |
height = height - (height % vae_temporal_compression_ratio) | |
width = width - (width % vae_temporal_compression_ratio) | |
return height, width | |
def load_control_lora(control_type, current_lora_state): | |
"""Load the specified control LoRA, unloading any previous one""" | |
if control_type not in CONTROL_LORAS: | |
raise ValueError(f"Unknown control type: {control_type}") | |
# If same LoRA is already loaded, do nothing | |
if current_lora_state == control_type: | |
print(f"{control_type} LoRA already loaded") | |
return current_lora_state | |
# Unload current LoRA if any | |
if current_lora_state is not None: | |
try: | |
pipeline.unload_lora_weights() | |
print(f"Unloaded previous LoRA: {current_lora_state}") | |
except Exception as e: | |
print(f"Warning: Could not unload previous LoRA: {e}") | |
# Load new LoRA | |
lora_config = CONTROL_LORAS[control_type] | |
try: | |
pipeline.load_lora_weights( | |
lora_config["repo"], | |
weight_name=lora_config["weight_name"], | |
adapter_name=lora_config["adapter_name"] | |
) | |
pipeline.set_adapters([lora_config["adapter_name"]], adapter_weights=[1.0]) | |
new_lora_state = control_type | |
print(f"Loaded {control_type} LoRA successfully") | |
return new_lora_state | |
except Exception as e: | |
print(f"Error loading {control_type} LoRA: {e}") | |
raise | |
def process_video_for_canny(video, width, height): | |
""" | |
Process video for canny control. | |
""" | |
print("Processing video for canny control...") | |
canny_video = [] | |
detect_resolution = video[0].size() | |
for frame in video: | |
# TODO: change resolution logic | |
canny_video.append(canny_processor(frame, low_threshold=50, high_threshold=200, detect_resolution=detect_resolution, image_resolution=(width, height))) | |
return canny_video | |
def process_video_for_pose(video): | |
""" | |
Process video for pose control using MediaPipe pose estimation. | |
Returns video frames with pose landmarks drawn on black background. | |
""" | |
print("Processing video for pose control...") | |
pose_video = [] | |
with mp_pose.Pose( | |
static_image_mode=True, | |
model_complexity=1, | |
enable_segmentation=False, | |
min_detection_confidence=0.5, | |
min_tracking_confidence=0.5 | |
) as pose: | |
for frame in video: | |
# Convert PIL image to numpy array | |
frame_np = np.array(frame) | |
# Convert RGB to BGR for MediaPipe | |
frame_bgr = cv2.cvtColor(frame_np, cv2.COLOR_RGB2BGR) | |
# Process the frame | |
results = pose.process(frame_bgr) | |
# Create black background with same dimensions | |
pose_frame = np.zeros_like(frame_np) | |
# Draw pose landmarks if detected | |
if results.pose_landmarks: | |
mp_drawing.draw_landmarks( | |
pose_frame, | |
results.pose_landmarks, | |
mp_pose.POSE_CONNECTIONS, | |
landmark_drawing_spec=mp_drawing_styles.get_default_pose_landmarks_style() | |
) | |
# Convert back to PIL Image | |
pose_pil = Image.fromarray(pose_frame) | |
pose_video.append(pose_pil) | |
return pose_video | |
def process_input_video(reference_video, width, height): | |
""" | |
Process the input video for canny edges and return both processed video and preview. | |
""" | |
if reference_video is None: | |
return None | |
try: | |
# Load video into a list of PIL images | |
video = load_video(reference_video) | |
# Process video for canny edges | |
processed_video = process_video_for_canny(video, width, height) | |
# Create a preview video file for display | |
fps = 24 | |
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp_file: | |
preview_path = tmp_file.name | |
export_to_video(processed_video, preview_path, fps=fps) | |
return preview_path | |
except Exception as e: | |
print(f"Error processing input video: {e}") | |
return None | |
def process_video_for_control(reference_video, control_type, width, height): | |
"""Process video based on the selected control type - now only used for non-canny types""" | |
video = load_video(reference_video) | |
if control_type == "canny": | |
# This should not be called for canny since it's pre-processed | |
processed_video = process_video_for_canny(video, width, height) | |
elif control_type == "depth": | |
processed_video = process_video_for_depth(video) | |
elif control_type == "pose": | |
processed_video = process_video_for_pose(video) | |
else: | |
processed_video = video | |
return processed_video | |
def generate_video( | |
reference_video, | |
control_video, # New parameter for pre-processed video | |
prompt, | |
control_type, | |
duration=3.0, | |
negative_prompt="worst quality, inconsistent motion, blurry, jittery, distorted", | |
height=768, | |
width=1152, | |
num_inference_steps=7, | |
guidance_scale=1.0, | |
seed=0, | |
randomize_seed=False, | |
progress=gr.Progress() | |
): | |
try: | |
# Initialize models if needed | |
# Models are already loaded at startup | |
if reference_video is None: | |
return None, "Please upload a reference video." | |
if not prompt.strip(): | |
return None, "Please enter a prompt." | |
# Handle seed | |
if randomize_seed: | |
seed = random.randint(0, 2**32 - 1) | |
# Calculate number of frames from duration (24 fps) | |
fps = 24 | |
num_frames = int(duration * fps) + 1 # +1 for proper frame count | |
# Ensure num_frames is valid for the model (multiple of temporal compression + 1) | |
temporal_compression = pipeline.vae_temporal_compression_ratio | |
num_frames = ((num_frames - 1) // temporal_compression) * temporal_compression + 1 | |
progress(0.1, desc="Preparing processed video...") | |
# Use pre-processed video frames if available (for canny), otherwise process on-demand | |
if control_video is not None: | |
# Use the pre-processed canny frames | |
processed_video = load_video(control_video) | |
else: | |
# Fallback to processing on-demand for other control types | |
processed_video = process_video_for_control(reference_video, control_type, width, height) | |
# Convert to tensor | |
processed_video = read_video(processed_video) | |
progress(0.2, desc="Preparing generation parameters...") | |
# Calculate downscaled dimensions | |
downscale_factor = 2 / 3 | |
downscaled_height = int(height * downscale_factor) | |
downscaled_width = int(width * downscale_factor) | |
downscaled_height, downscaled_width = round_to_nearest_resolution_acceptable_by_vae( | |
downscaled_height, downscaled_width, pipeline.vae_temporal_compression_ratio | |
) | |
progress(0.3, desc="Generating video at lower resolution...") | |
# 1. Generate video at smaller resolution | |
latents = pipeline( | |
reference_video=processed_video, # Use processed video | |
prompt=prompt, | |
negative_prompt=negative_prompt, | |
width=downscaled_width, | |
height=downscaled_height, | |
num_frames=num_frames, | |
num_inference_steps=num_inference_steps, | |
decode_timestep=0.05, | |
decode_noise_scale=0.025, | |
guidance_scale=guidance_scale, | |
generator=torch.Generator().manual_seed(seed), | |
output_type="latent", | |
).frames | |
progress(0.6, desc="Upscaling video...") | |
# 2. Upscale generated video | |
upscaled_height, upscaled_width = downscaled_height * 2, downscaled_width * 2 | |
upscaled_latents = pipe_upsample( | |
latents=latents, | |
output_type="latent" | |
).frames | |
progress(0.8, desc="Final denoising and processing...") | |
# 3. Denoise the upscaled video | |
video_output = pipeline( | |
prompt=prompt, | |
negative_prompt=negative_prompt, | |
width=upscaled_width, | |
height=upscaled_height, | |
num_frames=num_frames, | |
denoise_strength=0.4, | |
num_inference_steps=10, | |
latents=upscaled_latents, | |
decode_timestep = 0.05, | |
guidance_scale=guidance_scale, | |
decode_noise_scale = 0.025, | |
image_cond_noise_scale=0.025, | |
generator=torch.Generator(device="cuda").manual_seed(seed), | |
output_type="pil", | |
).frames[0] | |
progress(0.9, desc="Finalizing output...") | |
# 4. Downscale to expected resolution | |
video_output = [frame.resize((width, height)) for frame in video_output] | |
# Export to temporary file | |
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp_file: | |
output_path = tmp_file.name | |
export_to_video(video_output, output_path, fps=fps) | |
progress(1.0, desc="Complete!") | |
return output_path, seed | |
except Exception as e: | |
print(e) | |
return None, seed | |
# Create Gradio interface | |
with gr.Blocks() as demo: | |
gr.Markdown( | |
""" | |
# LTX Video Control Canny | |
""" | |
) | |
# State variables | |
#current_lora_state = gr.State(value=None) | |
with gr.Row(): | |
with gr.Column(scale=1): | |
reference_video = gr.Video( | |
label="Reference Video", | |
height=300 | |
) | |
prompt = gr.Textbox( | |
label="Prompt", | |
placeholder="Describe the video you want to generate...", | |
lines=3, | |
value="The Joker in his iconic purple suit and green hair, dancing alone in a dimly lit, run-down room. His movements are erratic and unpredictable, shifting between graceful and chaotic as he loses himself in the moment. The camera captures his theatrical gestures, his dance reflecting his unhinged personality. Moody lighting with shadows dancing across the walls, creating an atmosphere of beautiful madness." | |
) | |
# Control Type Selection | |
control_type = gr.Radio( | |
label="Control Type", | |
choices=["canny", "depth", "pose"], | |
value="canny", | |
visible=False, | |
info="Choose the type of control guidance for video generation" | |
) | |
duration = gr.Slider( | |
label="Duration (seconds)", | |
minimum=1.0, | |
maximum=10.0, | |
step=0.5, | |
value=2.5 | |
) | |
negative_prompt = gr.Textbox( | |
label="Negative Prompt", | |
placeholder="What you don't want in the video...", | |
lines=2, | |
value="worst quality, inconsistent motion, blurry, jittery, distorted" | |
) | |
# Advanced Settings | |
with gr.Accordion("Advanced Settings", open=False): | |
with gr.Row(): | |
height = gr.Slider( | |
label="Height", | |
minimum=256, | |
maximum=1024, | |
step=32, | |
value=768 | |
) | |
width = gr.Slider( | |
label="Width", | |
minimum=256, | |
maximum=1536, | |
step=32, | |
value=1152 | |
) | |
num_inference_steps = gr.Slider( | |
label="Inference Steps", | |
minimum=10, | |
maximum=50, | |
step=1, | |
value=7 | |
) | |
with gr.Row(): | |
guidance_scale = gr.Slider( | |
label="Guidance Scale", | |
minimum=1.0, | |
maximum=15.0, | |
step=0.1, | |
value=1.0 | |
) | |
with gr.Row(): | |
randomize_seed = gr.Checkbox( | |
label="Randomize Seed", | |
value=False | |
) | |
seed = gr.Number( | |
label="Seed", | |
value=0, | |
precision=0 | |
) | |
generate_btn = gr.Button( | |
"Generate", | |
) | |
with gr.Column(scale=1): | |
output_video = gr.Video( | |
label="Generated Video", | |
height=400 | |
) | |
control_video = gr.Video( | |
label="Processed Control Video (Canny Edges)", | |
height=400, | |
visible=True | |
) | |
gr.Examples( | |
examples=[ | |
["video_assets/vid_1.mp4", None, "A sleek cybernetic wolf sprinting through a neon-lit futuristic cityscape, its metallic form gleaming with electric blue circuits. The wolf's powerful stride carries it down rain-slicked streets between towering skyscrapers, while holographic advertisements cast colorful reflections on its chrome surface. Sparks of digital energy trail behind the creature as it moves with fluid mechanical precision through the urban maze, creating streaks of light in the misty night air.", "canny", 3, "worst quality, inconsistent motion, blurry, jittery, distorted", 768, 1152, 7, 1, 0, True], | |
["video_assets/vid_2.mp4", None, "A translucent ghost floating in a moonlit cemetery, raising a glowing spectral lantern that casts eerie light through the darkness. The ethereal figure's wispy form shimmers as it lifts the phantom light above its head, illuminating weathered tombstones and gnarled trees. Pale mist swirls around the ghost as the lantern pulses with otherworldly energy, creating haunting shadows that dance across the graveyard in the dead of night.", "canny", 2.5, "worst quality, inconsistent motion, blurry, jittery, distorted", 768, 1152, 7, 1, 0, True], | |
["video_assets/vid_3.mp4", None,"A sleek android assassin poised in a combat stance atop a futuristic skyscraper, arms positioned for perfect balance. The chrome-plated figure gleams under neon city lights as holographic data streams flow around its metallic form. Rain droplets bead on its polished surface while the sprawling cyberpunk metropolis stretches endlessly below. Electric circuits pulse beneath the android's transparent panels as it maintains its precise, calculated pose against the backdrop of flying vehicles and towering digital billboards.", "canny", 3, "worst quality, inconsistent motion, blurry, jittery, distorted", 768, 1152, 7, 1, 0, True], | |
["video_assets/vid_4.mp4", None, "Luminescent video game characters with glowing outlines and neon-bright details wandering through a digital landscape. Their bodies emit soft, colorful light that pulses gently as they move, creating trails of radiance behind them. The characters have a futuristic, stylized appearance with smooth surfaces that reflect their inner glow. They navigate naturally through their environment, their movements fluid and purposeful, while their bioluminescent features cast dynamic shadows and illuminate the surrounding area. The scene has a cyberpunk aesthetic with the characters' radiant presence serving as the primary light source in an otherwise darkened digital world.", "canny", 2.5, "worst quality, inconsistent motion, blurry, jittery, distorted", 768, 1152, 7, 1, 0, True], | |
], | |
inputs=[reference_video, | |
prompt, | |
control_type, | |
duration, | |
negative_prompt, | |
height, | |
width, | |
num_inference_steps, | |
guidance_scale, | |
seed, | |
randomize_seed], | |
outputs=[output_video, seed], | |
fn=generate_video, cache_examples="lazy" | |
) | |
# Event handlers | |
# Auto-process video when uploaded | |
reference_video.upload( | |
fn=process_input_video, | |
inputs=[reference_video, width, height], | |
outputs=[control_video], | |
show_progress=True | |
) | |
generate_btn.click( | |
fn=generate_video, | |
inputs=[ | |
reference_video, | |
control_video, # Use pre-processed video | |
prompt, | |
control_type, | |
duration, | |
negative_prompt, | |
height, | |
width, | |
num_inference_steps, | |
guidance_scale, | |
seed, | |
randomize_seed | |
], | |
outputs=[output_video, seed], | |
show_progress=True | |
) | |
if __name__ == "__main__": | |
demo.launch() |