Spaces:
Running
Running
import logging | |
import os | |
import shutil | |
import time | |
from abc import ABC, abstractmethod | |
from pathlib import Path | |
import numpy as np | |
import open3d as o3d | |
import torch | |
from dust3r.cloud_opt import GlobalAlignerMode, global_aligner | |
from dust3r.image_pairs import make_pairs | |
from dust3r.inference import inference | |
from dust3r.utils.device import to_numpy | |
from dust3r.utils.geometry import inv | |
from mast3r.model import AsymmetricMASt3R | |
from utils.sfm_utils import (compute_co_vis_masks, get_sorted_image_files, | |
load_images, save_extrinsic, save_intrinsics, | |
save_points3D) | |
from .utils import prepare_input, prepare_output, storePly | |
class BaseEstimator(ABC): | |
def get_poses(): | |
pass | |
class ColmapEstimator(BaseEstimator): | |
def __init__(self, cfg): | |
self.cfg = cfg | |
def get_poses(self, camera_model="OPENCV", use_gpu=True): | |
save_path = self.cfg.pipeline.data_path | |
database_path = os.path.join(save_path, "distorted", "database.db") | |
raw_img_path = os.path.join(save_path, "input") | |
sparse_path = os.path.join(save_path, "distorted", "sparse") | |
os.makedirs(os.path.join(save_path, "distorted"), exist_ok=True) | |
os.makedirs(sparse_path, exist_ok=True) | |
feat_extraction_cmd = [ | |
"colmap", "feature_extractor", | |
"--database_path", database_path, | |
"--image_path", raw_img_path, | |
"--ImageReader.single_camera", "1", | |
"--ImageReader.camera_model", camera_model, | |
"--SiftExtraction.use_gpu", str(int(use_gpu)) | |
] | |
feat_extraction_cmd = " ".join(feat_extraction_cmd) | |
exit_code = os.system(feat_extraction_cmd) | |
if exit_code != 0: | |
logging.error(f"Feature extraction failed with code {exit_code}. Exiting.") | |
exit(exit_code) | |
feat_matching_cmd = [ | |
"colmap", "exhaustive_matcher", | |
"--database_path", database_path, | |
"--SiftMatching.use_gpu", str(int(use_gpu)) | |
] | |
feat_matching_cmd = " ".join(feat_matching_cmd) | |
exit_code = os.system(feat_matching_cmd) | |
if exit_code != 0: | |
logging.error(f"Feature matching failed with code {exit_code}. Exiting.") | |
exit(exit_code) | |
mapper_cmd = [ | |
"colmap", "mapper", | |
"--database_path", database_path, | |
"--image_path", raw_img_path, | |
"--output_path", sparse_path, | |
"--Mapper.ba_global_function_tolerance=0.000001" | |
] | |
mapper_cmd = " ".join(mapper_cmd) | |
exit_code = os.system(mapper_cmd) | |
if exit_code != 0: | |
logging.error(f"Mapper failed with code {exit_code}. Exiting.") | |
exit(exit_code) | |
img_undist_cmd = [ | |
"colmap", "image_undistorter", | |
"--image_path", raw_img_path, | |
"--input_path", os.path.join(sparse_path, "0"), | |
"--output_path", save_path, | |
"--output_type", "COLMAP" | |
] | |
img_undist_cmd = " ".join(img_undist_cmd) | |
exit_code = os.system(img_undist_cmd) | |
if exit_code != 0: | |
logging.error(f"Mapper failed with code {exit_code}. Exiting.") | |
exit(exit_code) | |
# move data: | |
curr_path = os.path.join(save_path, "sparse") | |
dest_path = os.path.join(curr_path, "0") | |
os.makedirs(dest_path, exist_ok=True) | |
files = list(filter(lambda x: x != "0", os.listdir(curr_path))) | |
for file in files: | |
src_file = os.path.join(curr_path, file) | |
dest_file = os.path.join(dest_path, file) | |
shutil.move(src_file, dest_file) | |
class MASt3REstimator(BaseEstimator): | |
def __init__(self, cfg): | |
self.cfg = cfg | |
self.device = cfg.pose_estimator.device | |
self.model = AsymmetricMASt3R.from_pretrained(cfg.pose_estimator.model_path).to(self.device) | |
def get_poses(self): | |
save_path = self.cfg.pipeline.data_path | |
co_vis_dsp = self.cfg.pose_estimator.co_vis_dsp | |
sparse_path = os.path.join(save_path, "sparse", "0") | |
os.makedirs(sparse_path, exist_ok=True) | |
image_dir = Path(save_path) / "input" | |
image_files, image_suffix = get_sorted_image_files(image_dir) | |
n_views = len(image_files) | |
images, org_imgs_shape = load_images(image_files, size=512) | |
logging.info(">> Making pairs...") | |
pairs = make_pairs(images) | |
logging.info(">> Inference...") | |
output = inference(pairs, self.model, self.device, batch_size=1, verbose=True) | |
logging.info(f'>> Global alignment...') | |
scene = global_aligner(output, device=self.device, mode=GlobalAlignerMode.PointCloudOptimizer) | |
extrinsics_w2c = inv(to_numpy(scene.get_im_poses())) | |
intrinsics = to_numpy(scene.get_intrinsics()) | |
focals = to_numpy(scene.get_focals()) | |
imgs = np.array(scene.imgs) | |
pts3d = to_numpy(scene.get_pts3d()) | |
pts3d = np.array(pts3d) | |
depthmaps = to_numpy(scene.im_depthmaps.detach().cpu().numpy()) | |
values = [param.detach().cpu().numpy() for param in scene.im_conf] | |
confs = np.array(values) | |
logging.info(f'>> Confiden-aware Ranking...') | |
avg_conf_scores = confs.mean(axis=(1, 2)) | |
sorted_conf_indices = np.argsort(avg_conf_scores)[::-1] | |
sorted_conf_avg_conf_scores = avg_conf_scores[sorted_conf_indices] | |
logging.info("Sorted indices:", str(sorted_conf_indices)) | |
logging.info("Sorted average confidence scores:", str(sorted_conf_avg_conf_scores)) | |
logging.info(f'>> Calculate the co-visibility mask...') | |
depth_thre = self.cfg.pose_estimator.depth_thre | |
if depth_thre > 0: | |
overlapping_masks = compute_co_vis_masks(sorted_conf_indices, depthmaps, pts3d, intrinsics, extrinsics_w2c, imgs.shape, depth_threshold=depth_thre) | |
overlapping_masks = ~overlapping_masks | |
else: | |
co_vis_dsp = False | |
overlapping_masks = None | |
focals = np.repeat(focals[0], n_views) | |
logging.info(f'>> Saving results...') | |
save_extrinsic(Path(sparse_path), extrinsics_w2c, image_files, image_suffix) | |
save_intrinsics(Path(sparse_path), focals, org_imgs_shape, imgs.shape, save_focals=True) | |
pts_num = save_points3D(Path(sparse_path), imgs, pts3d, confs.reshape(pts3d.shape[0], -1), overlapping_masks, use_masks=co_vis_dsp, save_all_pts=True, save_txt_path=save_path, depth_threshold=depth_thre) | |
# save_images_and_masks(Path(sparse_path), n_views, imgs, overlapping_masks, image_files, image_suffix) | |
logging.info(f'MASt3R Reconstruction is successfully converted to COLMAP files in: {sparse_path}') | |
logging.info(f'Number of points: {pts3d.reshape(-1, 3).shape[0]}') | |
logging.info(f'Number of points after downsampling: {pts_num}') | |
class CUT3REstimator(BaseEstimator): | |
def __init__(self, cfg): | |
self.cfg = cfg | |
self.device = cfg.pose_estimator.device | |
def get_poses(self): | |
cfg = self.cfg | |
if self.device == "cuda" and not torch.cuda.is_available(): | |
print("cuda not available. switching to cpu.") | |
self.device = "cpu" | |
from cut3r.dust3r.inference import inference | |
from cut3r.dust3r.model import ARCroco3DStereo | |
save_path = self.cfg.pipeline.data_path | |
img_folder_path = os.path.join(save_path, "input") | |
img_paths = [os.path.join(img_folder_path, img_name) for img_name in os.listdir(img_folder_path)] | |
img_mask = [True] * len(img_paths) | |
views, orig_shape = prepare_input( | |
img_paths=img_paths, | |
img_mask=img_mask, | |
size=512, | |
revisit=1, | |
update=True, | |
) | |
model = ARCroco3DStereo.from_pretrained(cfg.pose_estimator.model_path).to(self.device) | |
model.eval() | |
logging.info("Running inference...") | |
start_time = time.time() | |
outputs, state_args = inference(views, model, self.device) | |
total_time = time.time() - start_time | |
per_frame_time = total_time / len(views) | |
print( | |
f"Inference completed in {total_time:.2f} seconds (average {per_frame_time:.2f} s per frame)." | |
) | |
pts3ds_other, colors, conf, cam_dict = prepare_output( | |
outputs, orig_shape, save_path, 1, True | |
) | |
conf = torch.cat(conf, dim=0) | |
if self.cfg.pipeline.selection: | |
conf_score = conf.mean(dim=(1, 2)) | |
chunk_num = self.cfg.pipeline.chunk_num | |
keep_num_per_chunk = self.cfg.pipeline.keep_num_per_chunk | |
conf_scores_tuple = conf_score.chunk(chunk_num) | |
selected_idxs = [] | |
total_conf_len = 0 | |
for conf_scores_chunk in conf_scores_tuple: | |
_, idxs = conf_scores_chunk.sort(descending=True) | |
idxs = idxs[:keep_num_per_chunk] | |
selected_idxs += [(idx + total_conf_len).item() for idx in idxs] | |
total_conf_len += len(conf_scores_chunk) | |
self.cfg.pipeline.selected_idxs = sorted(selected_idxs) | |
pts3ds_to_save = [pts3ds_other[idx].cpu().numpy() for idx in self.cfg.pipeline.selected_idxs] | |
colors_to_save = [colors[idx].cpu().numpy() for idx in self.cfg.pipeline.selected_idxs] | |
all_pts3ds = np.stack(pts3ds_to_save).reshape(-1, 3) | |
all_colors = np.stack(colors_to_save).reshape(-1, 3) | |
storePly(os.path.join(save_path, "points3D.ply"), all_pts3ds, all_colors) | |
class VGGTEstimator(BaseEstimator): | |
def __init__(self, cfg): | |
self.cfg = cfg | |
self.device = cfg.pose_estimator.device | |
def get_poses(self): | |
from vggt.models.vggt import VGGT | |
from vggt.utils.geometry import unproject_depth_map_to_point_map | |
from vggt.utils.load_fn import load_and_preprocess_images | |
from vggt.utils.pose_enc import pose_encoding_to_extri_intri | |
cfg = self.cfg | |
if self.device == "cuda" and not torch.cuda.is_available(): | |
print("cuda not available. switching to cpu.") | |
self.device = "cpu" | |
dtype = torch.bfloat16 if torch.cuda.get_device_capability()[0] >= 8 else torch.float16 | |
logging.info("Loading vggt...") | |
model = VGGT.from_pretrained("facebook/VGGT-1B").to(self.device) | |
save_path = self.cfg.pipeline.data_path | |
img_folder_path = os.path.join(save_path, "input") | |
img_paths = [os.path.join(img_folder_path, img_name) for img_name in os.listdir(img_folder_path)] | |
images = load_and_preprocess_images(img_paths).to(self.device) | |
with torch.no_grad(), torch.amp.autocast("cuda", dtype=dtype): | |
images = images[None] | |
aggregated_tokens_list, ps_idx = model.aggregator(images) | |
pose_enc = model.camera_head(aggregated_tokens_list)[-1] | |
extrinsic, intrinsic = pose_encoding_to_extri_intri(pose_enc, images.shape[-2:]) | |
depth_map, depth_conf = model.depth_head(aggregated_tokens_list, images, ps_idx) | |
point_map = unproject_depth_map_to_point_map( | |
depth_map.squeeze(0), | |
extrinsic.squeeze(0), | |
intrinsic.squeeze(0) | |
) | |
extrinsic, intrinsic = extrinsic.squeeze(), intrinsic.squeeze() | |
extrinsics_w2c = torch.eye(4)[None].repeat(len(extrinsic), 1, 1) | |
extrinsics_w2c[:, :3, :4] = extrinsic.cpu() | |
extrinsics_w2c = extrinsics_w2c.cpu().numpy() | |
intrinsics = intrinsic.cpu().numpy() | |
scaled_y, scaled_x = images.shape[-2:] | |
intrinsics[:, 0, 0] *= 720 / scaled_x | |
intrinsics[:, 1, 1] *= 480 / scaled_y | |
intrinsics[:, 0, 2] *= 720 / scaled_x | |
intrinsics[:, 1, 2] *= 480 / scaled_y | |
images = torch.stack([images[:, 0], images[:, -1]], dim=1) | |
point_map = np.stack([point_map[0], point_map[-1]], axis=0) | |
colors = images.permute(0, 1, 3, 4, 2).detach().cpu().numpy() | |
colors = colors.reshape(-1, 3) | |
point_map = point_map.reshape(-1, 3).astype(np.float32) | |
pcd = o3d.geometry.PointCloud() | |
pcd.points = o3d.utility.Vector3dVector(point_map) | |
pcd.colors = o3d.utility.Vector3dVector(colors) | |
o3d.io.write_point_cloud(os.path.join(save_path, "points3D.ply"), pcd) | |
camera_dir = os.path.join(save_path, "camera") | |
os.makedirs(camera_dir, exist_ok=True) | |
for i, (w2c, intrinsic) in enumerate(zip(extrinsics_w2c, intrinsics)): | |
c2w = np.eye(4) | |
c2w[:3, :3] = w2c[:3, :3].T | |
c2w[:3, 3] = - w2c[:3, :3].T @ w2c[:3, 3] | |
np.savez( | |
os.path.join(camera_dir, f"{i+1:04d}.npz"), | |
pose=c2w, | |
intrinsics=intrinsic | |
) | |
def get_pose_estimator(cfg): | |
POSE_ESTIMATOR = { | |
"colmap": ColmapEstimator, | |
"mast3r": MASt3REstimator, | |
"cut3r": CUT3REstimator, | |
"vggt": VGGTEstimator, | |
} | |
return POSE_ESTIMATOR[cfg.pose_estimator.type](cfg) | |