Spaces:
Sleeping
Sleeping
import os | |
import sys | |
import numpy as np | |
import torch | |
from diffusers import (CogVideoXDDIMScheduler, DDIMScheduler, | |
DPMSolverMultistepScheduler, | |
EulerAncestralDiscreteScheduler, EulerDiscreteScheduler, | |
PNDMScheduler) | |
from PIL import Image | |
from transformers import T5EncoderModel | |
current_file_path = os.path.abspath(__file__) | |
project_roots = [os.path.dirname(current_file_path), os.path.dirname(os.path.dirname(current_file_path)), os.path.dirname(os.path.dirname(os.path.dirname(current_file_path)))] | |
for project_root in project_roots: | |
sys.path.insert(0, project_root) if project_root not in sys.path else None | |
from cogvideox.models import (AutoencoderKLCogVideoX, | |
CogVideoXTransformer3DModel, T5EncoderModel, | |
T5Tokenizer) | |
from cogvideox.pipeline import (CogVideoXFunPipeline, | |
CogVideoXFunInpaintPipeline) | |
from cogvideox.utils.fp8_optimization import convert_weight_dtype_wrapper | |
from cogvideox.utils.lora_utils import merge_lora, unmerge_lora | |
from cogvideox.utils.utils import get_image_to_video_latent, save_videos_grid | |
# GPU memory mode, which can be choosen in [model_cpu_offload, model_cpu_offload_and_qfloat8, sequential_cpu_offload]. | |
# model_cpu_offload means that the entire model will be moved to the CPU after use, which can save some GPU memory. | |
# | |
# model_cpu_offload_and_qfloat8 indicates that the entire model will be moved to the CPU after use, | |
# and the transformer model has been quantized to float8, which can save more GPU memory. | |
# | |
# sequential_cpu_offload means that each layer of the model will be moved to the CPU after use, | |
# resulting in slower speeds but saving a large amount of GPU memory. | |
GPU_memory_mode = "model_cpu_offload_and_qfloat8" | |
# model path | |
model_name = "models/Diffusion_Transformer/CogVideoX-Fun-V1.1-2b-InP" | |
# Choose the sampler in "Euler" "Euler A" "DPM++" "PNDM" "DDIM_Cog" and "DDIM_Origin" | |
sampler_name = "DDIM_Origin" | |
# Load pretrained model if need | |
transformer_path = None | |
vae_path = None | |
lora_path = None | |
# Other params | |
sample_size = [384, 672] | |
# V1.0 and V1.1 support up to 49 frames of video generation, | |
# while V1.5 supports up to 85 frames. | |
video_length = 49 | |
fps = 8 | |
# Use torch.float16 if GPU does not support torch.bfloat16 | |
# ome graphics cards, such as v100, 2080ti, do not support torch.bfloat16 | |
weight_dtype = torch.bfloat16 | |
prompt = "A young woman with beautiful and clear eyes and blonde hair standing and white dress in a forest wearing a crown. She seems to be lost in thought, and the camera focuses on her face. The video is of high quality, and the view is very clear. High quality, masterpiece, best quality, highres, ultra-detailed, fantastic." | |
negative_prompt = "The video is not of a high quality, it has a low resolution. Watermark present in each frame. The background is solid. Strange body and strange trajectory. Distortion. " | |
guidance_scale = 6.0 | |
seed = 43 | |
num_inference_steps = 50 | |
lora_weight = 0.55 | |
save_path = "samples/cogvideox-fun-videos-t2v" | |
transformer = CogVideoXTransformer3DModel.from_pretrained( | |
model_name, | |
subfolder="transformer", | |
low_cpu_mem_usage=True, | |
torch_dtype=torch.float8_e4m3fn if GPU_memory_mode == "model_cpu_offload_and_qfloat8" else weight_dtype, | |
).to(weight_dtype) | |
if transformer_path is not None: | |
print(f"From checkpoint: {transformer_path}") | |
if transformer_path.endswith("safetensors"): | |
from safetensors.torch import load_file, safe_open | |
state_dict = load_file(transformer_path) | |
else: | |
state_dict = torch.load(transformer_path, map_location="cpu") | |
state_dict = state_dict["state_dict"] if "state_dict" in state_dict else state_dict | |
m, u = transformer.load_state_dict(state_dict, strict=False) | |
print(f"missing keys: {len(m)}, unexpected keys: {len(u)}") | |
# Get Vae | |
vae = AutoencoderKLCogVideoX.from_pretrained( | |
model_name, | |
subfolder="vae" | |
).to(weight_dtype) | |
if vae_path is not None: | |
print(f"From checkpoint: {vae_path}") | |
if vae_path.endswith("safetensors"): | |
from safetensors.torch import load_file, safe_open | |
state_dict = load_file(vae_path) | |
else: | |
state_dict = torch.load(vae_path, map_location="cpu") | |
state_dict = state_dict["state_dict"] if "state_dict" in state_dict else state_dict | |
m, u = vae.load_state_dict(state_dict, strict=False) | |
print(f"missing keys: {len(m)}, unexpected keys: {len(u)}") | |
# Get tokenizer and text_encoder | |
tokenizer = T5Tokenizer.from_pretrained( | |
model_name, subfolder="tokenizer" | |
) | |
text_encoder = T5EncoderModel.from_pretrained( | |
model_name, subfolder="text_encoder", torch_dtype=weight_dtype | |
) | |
# Get Scheduler | |
Choosen_Scheduler = scheduler_dict = { | |
"Euler": EulerDiscreteScheduler, | |
"Euler A": EulerAncestralDiscreteScheduler, | |
"DPM++": DPMSolverMultistepScheduler, | |
"PNDM": PNDMScheduler, | |
"DDIM_Cog": CogVideoXDDIMScheduler, | |
"DDIM_Origin": DDIMScheduler, | |
}[sampler_name] | |
scheduler = Choosen_Scheduler.from_pretrained( | |
model_name, | |
subfolder="scheduler" | |
) | |
if transformer.config.in_channels != vae.config.latent_channels: | |
pipeline = CogVideoXFunInpaintPipeline( | |
vae=vae, | |
tokenizer=tokenizer, | |
text_encoder=text_encoder, | |
transformer=transformer, | |
scheduler=scheduler, | |
) | |
else: | |
pipeline = CogVideoXFunPipeline( | |
vae=vae, | |
tokenizer=tokenizer, | |
text_encoder=text_encoder, | |
transformer=transformer, | |
scheduler=scheduler, | |
) | |
if GPU_memory_mode == "sequential_cpu_offload": | |
pipeline.enable_sequential_cpu_offload() | |
elif GPU_memory_mode == "model_cpu_offload_and_qfloat8": | |
convert_weight_dtype_wrapper(transformer, weight_dtype) | |
pipeline.enable_model_cpu_offload() | |
else: | |
pipeline.enable_model_cpu_offload() | |
generator = torch.Generator(device="cuda").manual_seed(seed) | |
if lora_path is not None: | |
pipeline = merge_lora(pipeline, lora_path, lora_weight) | |
with torch.no_grad(): | |
video_length = int((video_length - 1) // vae.config.temporal_compression_ratio * vae.config.temporal_compression_ratio) + 1 if video_length != 1 else 1 | |
latent_frames = (video_length - 1) // vae.config.temporal_compression_ratio + 1 | |
if video_length != 1 and transformer.config.patch_size_t is not None and latent_frames % transformer.config.patch_size_t != 0: | |
additional_frames = transformer.config.patch_size_t - latent_frames % transformer.config.patch_size_t | |
video_length += additional_frames * vae.config.temporal_compression_ratio | |
if transformer.config.in_channels != vae.config.latent_channels: | |
input_video, input_video_mask, _ = get_image_to_video_latent(None, None, video_length=video_length, sample_size=sample_size) | |
sample = pipeline( | |
prompt, | |
num_frames = video_length, | |
negative_prompt = negative_prompt, | |
height = sample_size[0], | |
width = sample_size[1], | |
generator = generator, | |
guidance_scale = guidance_scale, | |
num_inference_steps = num_inference_steps, | |
video = input_video, | |
mask_video = input_video_mask, | |
).videos | |
else: | |
sample = pipeline( | |
prompt, | |
num_frames = video_length, | |
negative_prompt = negative_prompt, | |
height = sample_size[0], | |
width = sample_size[1], | |
generator = generator, | |
guidance_scale = guidance_scale, | |
num_inference_steps = num_inference_steps, | |
).videos | |
if lora_path is not None: | |
pipeline = unmerge_lora(pipeline, lora_path, lora_weight) | |
if not os.path.exists(save_path): | |
os.makedirs(save_path, exist_ok=True) | |
index = len([path for path in os.listdir(save_path)]) + 1 | |
prefix = str(index).zfill(8) | |
if video_length == 1: | |
video_path = os.path.join(save_path, prefix + ".png") | |
image = sample[0, :, 0] | |
image = image.transpose(0, 1).transpose(1, 2) | |
image = (image * 255).numpy().astype(np.uint8) | |
image = Image.fromarray(image) | |
image.save(video_path) | |
else: | |
video_path = os.path.join(save_path, prefix + ".mp4") | |
save_videos_grid(sample, video_path, fps=fps) |