import os import sys import numpy as np import torch from diffusers import (CogVideoXDDIMScheduler, DDIMScheduler, DPMSolverMultistepScheduler, EulerAncestralDiscreteScheduler, EulerDiscreteScheduler, PNDMScheduler) from PIL import Image from transformers import T5EncoderModel current_file_path = os.path.abspath(__file__) project_roots = [os.path.dirname(current_file_path), os.path.dirname(os.path.dirname(current_file_path)), os.path.dirname(os.path.dirname(os.path.dirname(current_file_path)))] for project_root in project_roots: sys.path.insert(0, project_root) if project_root not in sys.path else None from cogvideox.models import (AutoencoderKLCogVideoX, CogVideoXTransformer3DModel, T5EncoderModel, T5Tokenizer) from cogvideox.pipeline import (CogVideoXFunPipeline, CogVideoXFunInpaintPipeline) from cogvideox.utils.fp8_optimization import convert_weight_dtype_wrapper from cogvideox.utils.lora_utils import merge_lora, unmerge_lora from cogvideox.utils.utils import get_image_to_video_latent, save_videos_grid # GPU memory mode, which can be choosen in [model_cpu_offload, model_cpu_offload_and_qfloat8, sequential_cpu_offload]. # model_cpu_offload means that the entire model will be moved to the CPU after use, which can save some GPU memory. # # model_cpu_offload_and_qfloat8 indicates that the entire model will be moved to the CPU after use, # and the transformer model has been quantized to float8, which can save more GPU memory. # # sequential_cpu_offload means that each layer of the model will be moved to the CPU after use, # resulting in slower speeds but saving a large amount of GPU memory. GPU_memory_mode = "model_cpu_offload_and_qfloat8" # model path model_name = "models/Diffusion_Transformer/CogVideoX-Fun-V1.1-2b-InP" # Choose the sampler in "Euler" "Euler A" "DPM++" "PNDM" "DDIM_Cog" and "DDIM_Origin" sampler_name = "DDIM_Origin" # Load pretrained model if need transformer_path = None vae_path = None lora_path = None # Other params sample_size = [384, 672] # V1.0 and V1.1 support up to 49 frames of video generation, # while V1.5 supports up to 85 frames. video_length = 49 fps = 8 # Use torch.float16 if GPU does not support torch.bfloat16 # ome graphics cards, such as v100, 2080ti, do not support torch.bfloat16 weight_dtype = torch.bfloat16 prompt = "A young woman with beautiful and clear eyes and blonde hair standing and white dress in a forest wearing a crown. She seems to be lost in thought, and the camera focuses on her face. The video is of high quality, and the view is very clear. High quality, masterpiece, best quality, highres, ultra-detailed, fantastic." negative_prompt = "The video is not of a high quality, it has a low resolution. Watermark present in each frame. The background is solid. Strange body and strange trajectory. Distortion. " guidance_scale = 6.0 seed = 43 num_inference_steps = 50 lora_weight = 0.55 save_path = "samples/cogvideox-fun-videos-t2v" transformer = CogVideoXTransformer3DModel.from_pretrained( model_name, subfolder="transformer", low_cpu_mem_usage=True, torch_dtype=torch.float8_e4m3fn if GPU_memory_mode == "model_cpu_offload_and_qfloat8" else weight_dtype, ).to(weight_dtype) if transformer_path is not None: print(f"From checkpoint: {transformer_path}") if transformer_path.endswith("safetensors"): from safetensors.torch import load_file, safe_open state_dict = load_file(transformer_path) else: state_dict = torch.load(transformer_path, map_location="cpu") state_dict = state_dict["state_dict"] if "state_dict" in state_dict else state_dict m, u = transformer.load_state_dict(state_dict, strict=False) print(f"missing keys: {len(m)}, unexpected keys: {len(u)}") # Get Vae vae = AutoencoderKLCogVideoX.from_pretrained( model_name, subfolder="vae" ).to(weight_dtype) if vae_path is not None: print(f"From checkpoint: {vae_path}") if vae_path.endswith("safetensors"): from safetensors.torch import load_file, safe_open state_dict = load_file(vae_path) else: state_dict = torch.load(vae_path, map_location="cpu") state_dict = state_dict["state_dict"] if "state_dict" in state_dict else state_dict m, u = vae.load_state_dict(state_dict, strict=False) print(f"missing keys: {len(m)}, unexpected keys: {len(u)}") # Get tokenizer and text_encoder tokenizer = T5Tokenizer.from_pretrained( model_name, subfolder="tokenizer" ) text_encoder = T5EncoderModel.from_pretrained( model_name, subfolder="text_encoder", torch_dtype=weight_dtype ) # Get Scheduler Choosen_Scheduler = scheduler_dict = { "Euler": EulerDiscreteScheduler, "Euler A": EulerAncestralDiscreteScheduler, "DPM++": DPMSolverMultistepScheduler, "PNDM": PNDMScheduler, "DDIM_Cog": CogVideoXDDIMScheduler, "DDIM_Origin": DDIMScheduler, }[sampler_name] scheduler = Choosen_Scheduler.from_pretrained( model_name, subfolder="scheduler" ) if transformer.config.in_channels != vae.config.latent_channels: pipeline = CogVideoXFunInpaintPipeline( vae=vae, tokenizer=tokenizer, text_encoder=text_encoder, transformer=transformer, scheduler=scheduler, ) else: pipeline = CogVideoXFunPipeline( vae=vae, tokenizer=tokenizer, text_encoder=text_encoder, transformer=transformer, scheduler=scheduler, ) if GPU_memory_mode == "sequential_cpu_offload": pipeline.enable_sequential_cpu_offload() elif GPU_memory_mode == "model_cpu_offload_and_qfloat8": convert_weight_dtype_wrapper(transformer, weight_dtype) pipeline.enable_model_cpu_offload() else: pipeline.enable_model_cpu_offload() generator = torch.Generator(device="cuda").manual_seed(seed) if lora_path is not None: pipeline = merge_lora(pipeline, lora_path, lora_weight) with torch.no_grad(): video_length = int((video_length - 1) // vae.config.temporal_compression_ratio * vae.config.temporal_compression_ratio) + 1 if video_length != 1 else 1 latent_frames = (video_length - 1) // vae.config.temporal_compression_ratio + 1 if video_length != 1 and transformer.config.patch_size_t is not None and latent_frames % transformer.config.patch_size_t != 0: additional_frames = transformer.config.patch_size_t - latent_frames % transformer.config.patch_size_t video_length += additional_frames * vae.config.temporal_compression_ratio if transformer.config.in_channels != vae.config.latent_channels: input_video, input_video_mask, _ = get_image_to_video_latent(None, None, video_length=video_length, sample_size=sample_size) sample = pipeline( prompt, num_frames = video_length, negative_prompt = negative_prompt, height = sample_size[0], width = sample_size[1], generator = generator, guidance_scale = guidance_scale, num_inference_steps = num_inference_steps, video = input_video, mask_video = input_video_mask, ).videos else: sample = pipeline( prompt, num_frames = video_length, negative_prompt = negative_prompt, height = sample_size[0], width = sample_size[1], generator = generator, guidance_scale = guidance_scale, num_inference_steps = num_inference_steps, ).videos if lora_path is not None: pipeline = unmerge_lora(pipeline, lora_path, lora_weight) if not os.path.exists(save_path): os.makedirs(save_path, exist_ok=True) index = len([path for path in os.listdir(save_path)]) + 1 prefix = str(index).zfill(8) if video_length == 1: video_path = os.path.join(save_path, prefix + ".png") image = sample[0, :, 0] image = image.transpose(0, 1).transpose(1, 2) image = (image * 255).numpy().astype(np.uint8) image = Image.fromarray(image) image.save(video_path) else: video_path = os.path.join(save_path, prefix + ".mp4") save_videos_grid(sample, video_path, fps=fps)