roll-ai's picture
Upload 381 files
b6af722 verified
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Adapted from:
https://github.com/bytedance/IRASim/blob/main/dataset/dataset_util.py
"""
import base64
import math
import os
from io import BytesIO
import numpy as np
import torch
import torch.distributed as dist
import torchvision.transforms.functional as F
from PIL import Image
def is_dist_avail_and_initialized():
if not dist.is_available():
return False
if not dist.is_initialized():
return False
return True
def get_rank():
if not is_dist_avail_and_initialized():
return 0
return dist.get_rank()
def get_1d_sincos_pos_embed_from_grid(embed_dim, pos):
"""
embed_dim: output dimension for each position
pos: a list of positions to be encoded: size (M,)
out: (M, D)
"""
assert embed_dim % 2 == 0
omega = np.arange(embed_dim // 2, dtype=np.float32)
omega /= embed_dim / 2.0
omega = 1.0 / 10000**omega # (D/2,)
pos = pos.reshape(-1) # (M,)
out = np.einsum("m,d->md", pos, omega) # (M, D/2), outer product
emb_sin = np.sin(out) # (M, D/2)
emb_cos = np.cos(out) # (M, D/2)
emb = np.concatenate([emb_sin, emb_cos], axis=1) # (M, D)
return emb
def get_2d_sincos_pos_embed_from_grid(embed_dim, grid):
assert embed_dim % 2 == 0
# use half of dimensions to encode grid_h
emb_h = get_1d_sincos_pos_embed_from_grid(embed_dim // 2, grid[0]) # (H*W, D/2)
emb_w = get_1d_sincos_pos_embed_from_grid(embed_dim // 2, grid[1]) # (H*W, D/2)
emb = np.concatenate([emb_h, emb_w], axis=1) # (H*W, D)
return emb
def get_2d_sincos_pos_embed(embed_dim, grid_size, cls_token=False):
"""
grid_size: int of the grid height and width
return:
pos_embed: [grid_size*grid_size, embed_dim] or [1+grid_size*grid_size, embed_dim] (w/ or w/o cls_token)
"""
grid_h = np.arange(grid_size, dtype=np.float32)
grid_w = np.arange(grid_size, dtype=np.float32)
grid = np.meshgrid(grid_w, grid_h) # here w goes first
grid = np.stack(grid, axis=0)
grid = grid.reshape([2, 1, grid_size, grid_size])
pos_embed = get_2d_sincos_pos_embed_from_grid(embed_dim, grid)
if cls_token:
pos_embed = np.concatenate([np.zeros([1, embed_dim]), pos_embed], axis=0)
return pos_embed
def b64_2_img(data: str):
image_b64 = base64.b64decode(data)
img = Image.open(BytesIO(image_b64)).convert("RGB")
return img
def get_continuous_action(d_acts, c_act_max, c_act_min, n_bins):
c_act_max = c_act_max.to(d_acts.device)
c_act_min = c_act_min.to(d_acts.device)
c_acts = d_acts / (n_bins - 1) * (c_act_max - c_act_min) + c_act_min
return c_acts
def alpha2rotm(a):
"""Alpha euler angle to rotation matrix."""
rotm = np.array([[1, 0, 0], [0, np.cos(a), -np.sin(a)], [0, np.sin(a), np.cos(a)]])
return rotm
def beta2rotm(b):
"""Beta euler angle to rotation matrix."""
rotm = np.array([[np.cos(b), 0, np.sin(b)], [0, 1, 0], [-np.sin(b), 0, np.cos(b)]])
return rotm
def gamma2rotm(c):
"""Gamma euler angle to rotation matrix."""
rotm = np.array([[np.cos(c), -np.sin(c), 0], [np.sin(c), np.cos(c), 0], [0, 0, 1]])
return rotm
def euler2rotm(euler_angles):
"""Euler angle (ZYX) to rotation matrix."""
alpha = euler_angles[0]
beta = euler_angles[1]
gamma = euler_angles[2]
rotm_a = alpha2rotm(alpha)
rotm_b = beta2rotm(beta)
rotm_c = gamma2rotm(gamma)
rotm = rotm_c @ rotm_b @ rotm_a
return rotm
def isRotm(R):
# Checks if a matrix is a valid rotation matrix.
# Forked from Andy Zeng
Rt = np.transpose(R)
shouldBeIdentity = np.dot(Rt, R)
I = np.identity(3, dtype=R.dtype)
n = np.linalg.norm(I - shouldBeIdentity)
return n < 1e-6
def rotm2euler(R):
# Forked from: https://learnopencv.com/rotation-matrix-to-euler-angles/
# R = Rz * Ry * Rx
assert isRotm(R)
sy = math.sqrt(R[0, 0] * R[0, 0] + R[1, 0] * R[1, 0])
singular = sy < 1e-6
if not singular:
x = math.atan2(R[2, 1], R[2, 2])
y = math.atan2(-R[2, 0], sy)
z = math.atan2(R[1, 0], R[0, 0])
else:
x = math.atan2(-R[1, 2], R[1, 1])
y = math.atan2(-R[2, 0], sy)
z = 0
# (-pi , pi]
while x > np.pi:
x -= 2 * np.pi
while x <= -np.pi:
x += 2 * np.pi
while y > np.pi:
y -= 2 * np.pi
while y <= -np.pi:
y += 2 * np.pi
while z > np.pi:
z -= 2 * np.pi
while z <= -np.pi:
z += 2 * np.pi
return np.array([x, y, z])
def get_converted_fp32_paths(deepspeed_ckpt_path):
deepspeed_ckpt_path = deepspeed_ckpt_path.rstrip("/")
ckpt_dir = os.path.dirname(deepspeed_ckpt_path)
ckpt_name = os.path.basename(deepspeed_ckpt_path)
fp32_ckpt_name = f"{ckpt_name}.fp32.pt"
converted_path = os.path.join(ckpt_dir, fp32_ckpt_name)
return converted_path
def quat2rotm(quat):
"""Quaternion to rotation matrix.
Args:
quat (4, numpy array): quaternion x, y, z, w
Returns:
rotm (3x3 numpy array): rotation matrix
"""
w = quat[3]
x = quat[0]
y = quat[1]
z = quat[2]
s = w * w + x * x + y * y + z * z
rotm = np.array(
[
[1 - 2 * (y * y + z * z) / s, 2 * (x * y - z * w) / s, 2 * (x * z + y * w) / s],
[2 * (x * y + z * w) / s, 1 - 2 * (x * x + z * z) / s, 2 * (y * z - x * w) / s],
[2 * (x * z - y * w) / s, 2 * (y * z + x * w) / s, 1 - 2 * (x * x + y * y) / s],
]
)
return rotm
class Resize_Preprocess:
def __init__(self, size):
"""
Initialize the preprocessing class with the target size.
Args:
size (tuple): The target height and width as a tuple (height, width).
"""
self.size = size
def __call__(self, video_frames):
"""
Apply the transformation to each frame in the video.
Args:
video_frames (torch.Tensor): A tensor representing a batch of video frames.
Returns:
torch.Tensor: The transformed video frames.
"""
# Resize each frame in the video
resized_frames = torch.stack([F.resize(frame, self.size, antialias=True) for frame in video_frames])
return resized_frames
class Preprocess:
def __init__(self, size):
self.size = size
def __call__(self, clip):
clip = Preprocess.resize_scale(clip, self.size[0], self.size[1], interpolation_mode="bilinear")
return clip
def __repr__(self) -> str:
return f"{self.__class__.__name__}(size={self.size})"
@staticmethod
def resize_scale(clip, target_height, target_width, interpolation_mode):
target_ratio = target_height / target_width
H = clip.size(-2)
W = clip.size(-1)
clip_ratio = H / W
if clip_ratio > target_ratio:
scale_ = target_width / W
else:
scale_ = target_height / H
return torch.nn.functional.interpolate(clip, scale_factor=scale_, mode=interpolation_mode, align_corners=False)
class ToTensorVideo:
"""
Convert tensor data type from uint8 to float, divide value by 255.0 and
permute the dimensions of clip tensor
"""
def __init__(self):
pass
def __call__(self, clip):
"""
Args:
clip (torch.tensor, dtype=torch.uint8): Size is (T, C, H, W)
Return:
clip (torch.tensor, dtype=torch.float): Size is (T, C, H, W)
"""
return to_tensor(clip)
def __repr__(self) -> str:
return self.__class__.__name__
def to_tensor(clip):
"""
Convert tensor data type from uint8 to float, divide value by 255.0 and
permute the dimensions of clip tensor
Args:
clip (torch.tensor, dtype=torch.uint8): Size is (T, C, H, W)
Return:
clip (torch.tensor, dtype=torch.float): Size is (T, C, H, W)
"""
_is_tensor_video_clip(clip)
if not clip.dtype == torch.uint8:
raise TypeError("clip tensor should have data type uint8. Got %s" % str(clip.dtype))
# return clip.float().permute(3, 0, 1, 2) / 255.0
return clip.float() / 255.0
def _is_tensor_video_clip(clip):
if not torch.is_tensor(clip):
raise TypeError("clip should be Tensor. Got %s" % type(clip))
if not clip.ndimension() == 4:
raise ValueError("clip should be 4D. Got %dD" % clip.dim())
return True