Spaces:
Build error
Build error
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. | |
# SPDX-License-Identifier: Apache-2.0 | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
import glob | |
import io | |
import os | |
from os.path import join, isdir, isfile | |
import zipfile | |
import imageio.v3 as imageio | |
import json | |
import numpy as np | |
import pyexr | |
from tqdm import tqdm | |
from api_types import CompressedSeedingRequest, SeedingRequest | |
from encoding import CompressionFormat | |
def srgb_to_linear(img): | |
limit = 0.04045 | |
mask = img > limit | |
# Process the two cases in parallel using NumPy's vectorized operations | |
result = np.empty_like(img) | |
result[mask] = np.power((img[mask] + 0.055) / 1.055, 2.4) | |
result[~mask] = img[~mask] / 12.92 | |
return result | |
def load_gen3c_seeding_data(data_directory: str, max_frames: int | None = None, | |
frames_stride: int = 1) -> CompressedSeedingRequest: | |
""" | |
Example directory structure: | |
├── camera.npz | |
├── depth.npz | |
├── mask.npz | |
├── metadata.json | |
└── rgb.mp4 | |
We will keep the data compressed as much as possible so that it can | |
be uploaded faster to the inference server. | |
""" | |
bar = tqdm(range(6), desc="Seeding data loading") | |
# [n_frames, height, width], float16 | |
depths = np.load(join(data_directory, "depth.npz"))['depth'] | |
assert depths.ndim == 3, depths.shape | |
n_img = depths.shape[0] | |
resolutions = np.tile([depths.shape[2], depths.shape[1]], reps=(n_img, 1)) | |
assert resolutions.shape == (n_img, 2) | |
with io.BytesIO() as f: | |
np.savez_compressed(f, depths) | |
depths_compressed = f.getvalue() | |
bar.update(1) | |
# Intrinsics: [n_frames, 3, 3], float32 | |
# Organized as: | |
# [[fx, 0, cx], | |
# [ 0, fy, cy], | |
# [ 0, 0, 1]] | |
camera_data = np.load(join(data_directory, "camera.npz")) | |
intrinsics = camera_data['intrinsics'] | |
# Absolute focal lengths | |
focal_lengths = np.stack([intrinsics[:, 0, 0], intrinsics[:, 1, 1]], axis=1) | |
assert focal_lengths.shape == (n_img, 2) | |
# Relative principal points | |
principal_points = (intrinsics[:, :2, 2] / resolutions).astype(np.float32) | |
assert principal_points.shape == (n_img, 2) | |
bar.update(1) | |
# [n_frames, height, width], bool | |
masks = np.load(join(data_directory, "mask.npz"))['mask'] | |
with io.BytesIO() as f: | |
np.savez_compressed(f, masks) | |
masks_compressed = f.getvalue() | |
bar.update(1) | |
# TODO: set the frontend's FPS slider based on `metadata["fps"]` | |
# metadata = json.load(open(join(data_directory, "metadata.json"))) | |
bar.update(1) | |
images_compressed = open(join(data_directory, "rgb.mp4"), "rb").read() | |
bar.update(1) | |
# [n_frames, 4, 4], float32 | |
w2c = camera_data['w2c'] | |
cameras_to_world = np.linalg.inv(w2c)[:, :3, :] | |
assert cameras_to_world.shape == (n_img, 3, 4) | |
bar.update(1) | |
return CompressedSeedingRequest( | |
request_id="__seeding_from_files", | |
images=None, # Will be auto-filled with placeholders | |
depths=None, # Will be auto-filled with placeholders | |
masks=None, # Will be auto-filled with placeholders | |
cameras_to_world=cameras_to_world, | |
focal_lengths=focal_lengths, | |
principal_points=principal_points, | |
resolutions=resolutions, | |
images_compressed=[images_compressed], | |
images_format=CompressionFormat.MP4, | |
depths_compressed=[depths_compressed], | |
depths_format=CompressionFormat.NPZ, | |
masks_compressed=[masks_compressed], | |
masks_format=CompressionFormat.NPZ, | |
) | |
def load_v2v_seeding_data(data_directory: str, max_frames: int | None = None, | |
frames_stride: int = 1) -> SeedingRequest: | |
""" | |
The seeding data would typically come from the client. | |
For convenience during debugging, we allow loading it here. | |
""" | |
if isdir(data_directory): | |
# --- Load seeding data from a directory. | |
if isfile(join(data_directory, "rgb.mp4")) and isfile(join(data_directory, "metadata.json")): | |
return load_gen3c_seeding_data(data_directory, max_frames=max_frames, | |
frames_stride=frames_stride) | |
# Gen3C / INGP pre-processed format. | |
# We assume depths, camera poses, etc are included. | |
# Load the seeding frames | |
n_img = len([img for img in sorted(os.listdir(join(data_directory, 'rgb'))) | |
if img.endswith('.jpg')]) | |
images = [] | |
depths = [] | |
for i_frame in range(n_img): | |
# Load image data | |
image = imageio.imread(join(data_directory, 'rgb', f'{i_frame:05d}.jpg')) | |
image_np = image.astype(np.float32) / 255.0 | |
# Load depth data | |
depth_np = np.load(join(data_directory, 'depth', f'{i_frame:05d}.npz'))['depth'] | |
images.append(image_np) | |
depths.append(depth_np) | |
del image_np, depth_np | |
# Load camera trajectory | |
with open(join(data_directory, 'cameras.json'), 'r') as f: | |
cameras = json.load(f) | |
cameras_to_world = np.asarray(cameras)[:n_img] | |
if (max_frames is not None) and (max_frames < len(images)): | |
images = images[::frames_stride][:max_frames] | |
depths = depths[::frames_stride][:max_frames] | |
cameras_to_world = cameras_to_world[::frames_stride][:max_frames] | |
else: | |
# --- Load a single image. | |
# We will have to assume camera poses, etc and let depth be auto-estimated. | |
n_img = 1 | |
image = imageio.imread(data_directory) | |
images = [image.astype(np.float32) / 255.0] | |
depths = None | |
cameras_to_world = np.eye(4)[None, :3, :] | |
# Shape: [batch, height, width, 3] | |
images = np.stack(images, axis=0) | |
if depths is not None: | |
# Shape: [batch, height, width] | |
depths = np.stack(depths, axis=0) | |
# Note: assumed based on how this data was generated | |
resolutions = np.tile([images.shape[2], images.shape[1]], reps=(n_img, 1)) | |
fov_y_rad = np.pi * (50.625 / 180.0) | |
f = 0.5 / (np.tan(fov_y_rad / 2.0)) * resolutions[:, 1] | |
focal_lengths = np.stack([f, f], axis=-1) | |
principal_points = np.full((n_img, 2), 0.5) | |
return SeedingRequest( | |
request_id="__seeding_from_files", | |
images=images, | |
depths=depths, | |
cameras_to_world=cameras_to_world, | |
focal_lengths=focal_lengths, | |
principal_points=principal_points, | |
resolutions=resolutions, | |
) | |
def ensure_alpha_channel(image: np.ndarray): | |
# Allow alpha channel to be omitted for faster transfers | |
assert image.shape[-1] in (3, 4) | |
if image.shape[-1] == 3: | |
image = np.concatenate([image, np.ones((*image.shape[:2], 1))], | |
axis=-1) | |
image = image.astype(np.float32) | |
return image | |
def apply_to_pytree(pytree, cb): | |
tp = type(pytree) | |
if pytree is None: | |
return None | |
elif isinstance(pytree, (tuple, list)): | |
return tp([apply_to_pytree(v, cb) for v in pytree]) | |
elif isinstance(pytree, dict): | |
return { k: apply_to_pytree(v, cb) for k, v in pytree.items() } | |
else: | |
return cb(pytree) | |
def move_to_device(pytree, device): | |
import torch | |
def move(pytree): | |
if torch.is_tensor(pytree): | |
return pytree.to(device) | |
elif isinstance(pytree, np.ndarray): | |
return torch.from_numpy(pytree).to(device) | |
else: | |
# Let's assume it's a not something we need to move | |
return pytree | |
# raise NotImplementedError(f"move_to_device(): unsupported type {type(pytree)}") | |
return apply_to_pytree(pytree, move) | |
def clone_tensors(pytree): | |
import torch | |
def clone(pytree): | |
if torch.is_tensor(pytree): | |
return pytree.clone() | |
elif isinstance(pytree, np.ndarray): | |
return pytree.copy() | |
else: | |
# Let's assume it's a not something we need to copy | |
return pytree | |
# raise NotImplementedError(f"clone_tensors(): unsupported type {type(pytree)}") | |
return apply_to_pytree(pytree, clone) | |