# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ CUDA_VISIBLE_DEVICES=1 python3 -m cosmos_predict1.diffusion.inference.world_interpolator \ --checkpoint_dir checkpoints \ --diffusion_transformer_dir Cosmos-Predict1-7B-WorldInterpolator \ --input_image_or_video_path assets/diffusion/interpolation_example.mp4 \ --num_input_frames 1 \ --offload_prompt_upsampler \ --video_save_name diffusion-world-interpolator-7b \ --num_video_frames 10 \ --num_frame_pairs 2 """ import argparse import os import torch from cosmos_predict1.diffusion.inference.inference_utils import add_common_arguments, check_input_frames, validate_args from cosmos_predict1.diffusion.inference.world_generation_pipeline import DiffusionWorldInterpolatorGenerationPipeline from cosmos_predict1.utils import log, misc from cosmos_predict1.utils.io import read_prompts_from_file, save_video # from cosmos_predict1.utils.visualize.video import save_img_or_video torch.enable_grad(False) def parse_arguments() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Video to world generation demo script") # Add common arguments add_common_arguments(parser) # Add video2world specific arguments parser.add_argument( "--diffusion_transformer_dir", type=str, default="Cosmos-Predict1-7B-WorldInterpolator", help="DiT model weights directory name relative to checkpoint_dir", choices=[ "Cosmos-Predict1-7B-WorldInterpolator", "Cosmos-Predict1-7B-WorldInterpolator_post-trained", ], ) parser.add_argument( "--prompt_upsampler_dir", type=str, default="Pixtral-12B", help="Prompt upsampler weights directory relative to checkpoint_dir", ) parser.add_argument( "--input_image_or_video_path", type=str, help="Input video/image path for generating a single video", ) parser.add_argument( "--num_input_frames", type=int, default=2, help="The minimum number of input frames for world_interpolator predictions.", ) # parser.add_argument("--num_video_frames", type=int, default=118, help="numer of video frames to sample") parser.add_argument("--pixel_chunk_duration", type=int, default=121, help="pixel chunk duration") parser.add_argument( "--frame_stride", type=int, default=1, help="Specifies the gap between frames used for interpolation. A step_size of 1 means consecutive frame " "pairs are treated as inputs (e.g., (x0, x1), (x1, x2)), while a step_size of 2 pairs frames with one " "frame in between (e.g., (x0, x2), (x2, x4) are treated as input at a time). Increasing this value " "results in interpolation over a larger temporal range. Default is 1.", ) parser.add_argument( "--frame_index_start", type=int, default=0, help="Specifies the gap between frames used for interpolation. A step_size of 1 means consecutive frame " "pairs are treated as inputs (e.g., (x0, x1), (x1, x2)), while a step_size of 2 pairs frames with one " "frame in between (e.g., (x0, x2), (x2, x4) are treated as input at a time). Increasing this value " "results in interpolation over a larger temporal range. Default is 1.", ) parser.add_argument( "--num_frame_pairs", type=int, default=None, help="Limits the number of unique frame pairs processed for interpolation. By default (None), the interpolator " "runs on all possible pairs extracted from the input video with the given step_size. If set to 1, only the first " "frame pair is processed (e.g., (x0, x1) for step_size=1, (x0, x2) for step_size=2). Higher values allow processing more " "pairs up to the maximum possible with the given step_size.", ) return parser.parse_args() def demo(args): """Run world-interpolator generation demo. This function handles the main video-to-world generation pipeline, including: - Setting up the random seed for reproducibility - Initializing the generation pipeline with the provided configuration - Processing single or multiple prompts/images/videos from input - Generating videos from prompts and images/videos - Saving the generated videos and corresponding prompts to disk Args: cfg (argparse.Namespace): Configuration namespace containing: - Model configuration (checkpoint paths, model settings) - Generation parameters (guidance, steps, dimensions) - Input/output settings (prompts/images/videos, save paths) - Performance options (model offloading settings) The function will save: - Generated MP4 video files - Text files containing the processed prompts If guardrails block the generation, a critical log message is displayed and the function continues to the next prompt if available. """ # import ipdb; ipdb.set_trace() misc.set_random_seed(args.seed) inference_type = "world_interpolator" validate_args(args, inference_type) if args.num_gpus > 1: from megatron.core import parallel_state from cosmos_predict1.utils import distributed distributed.init() parallel_state.initialize_model_parallel(context_parallel_size=args.num_gpus) process_group = parallel_state.get_context_parallel_group() # Initialize video_interpolator generation model pipeline pipeline = DiffusionWorldInterpolatorGenerationPipeline( inference_type=inference_type, checkpoint_dir=args.checkpoint_dir, checkpoint_name=args.diffusion_transformer_dir, prompt_upsampler_dir=args.prompt_upsampler_dir, enable_prompt_upsampler=not args.disable_prompt_upsampler, offload_network=args.offload_diffusion_transformer, offload_tokenizer=args.offload_tokenizer, offload_text_encoder_model=args.offload_text_encoder_model, offload_prompt_upsampler=args.offload_prompt_upsampler, offload_guardrail_models=args.offload_guardrail_models, disable_guardrail=args.disable_guardrail, num_steps=args.num_steps, height=args.height, width=args.width, fps=args.fps, num_video_frames=args.num_video_frames, num_input_frames=args.num_input_frames, num_frame_pairs=args.num_frame_pairs, frame_stride=args.frame_stride, ) if args.num_gpus > 1: pipeline.model.net.enable_context_parallel(process_group) # Handle multiple prompts if prompt file is provided if args.batch_input_path: log.info(f"Reading batch inputs from path: {args.batch_input_path}") prompts = read_prompts_from_file(args.batch_input_path) else: # Single prompt case prompts = [{"prompt": args.prompt, "visual_input": args.input_image_or_video_path}] os.makedirs(args.video_save_folder, exist_ok=True) for i, input_dict in enumerate(prompts): current_prompt = input_dict.get("prompt", None) if current_prompt is None and args.disable_prompt_upsampler: log.critical("Prompt is missing, skipping world generation.") continue current_image_or_video_path = input_dict.get("visual_input", None) if current_image_or_video_path is None: log.critical("Visual input is missing, skipping world generation.") continue # Check input frames if not check_input_frames(current_image_or_video_path, args.num_input_frames): continue # Generate video generated_output = pipeline.generate( prompt=current_prompt, image_or_video_path=current_image_or_video_path, negative_prompt=args.negative_prompt, ) if generated_output is None: log.critical("Guardrail blocked video2world generation.") continue video, prompt = generated_output # Save video video_save_path = os.path.join(args.video_save_folder, args.video_save_name + ".mp4") prompt_save_path = os.path.join(args.video_save_folder, args.video_save_name + ".txt") save_video( video=video, fps=args.fps, H=args.height, W=args.width, video_save_quality=5, video_save_path=video_save_path, ) with open(prompt_save_path, "w") as f: f.write(prompt) log.info(f"Saved video to {video_save_path}") log.info(f"Saved prompt to {prompt_save_path}") # clean up properly if args.num_gpus > 1: parallel_state.destroy_model_parallel() import torch.distributed as dist dist.destroy_process_group() if __name__ == "__main__": args = parse_arguments() demo(args)