Spaces:
Starting
on
T4
Starting
on
T4
import os | |
import random | |
import json | |
from utils.system_utils import searchForMaxIteration | |
from scene.dataset_readers import sceneLoadTypeCallbacks | |
from scene.gaussian_model import GaussianModel | |
from scene.obj_model import ObjModel | |
from scene.cameras import cameraList_from_camInfos | |
import torch | |
import open3d as o3d | |
import numpy as np | |
import shutil | |
def load_cameras(args, data_type, ignore_dynamic=False): | |
train_cameras = {} | |
test_cameras = {} | |
if os.path.exists(os.path.join(args.source_path, "meta_data.json")): | |
print("Found meta_data.json file, assuming HUGSIM format data set!") | |
scene_info = sceneLoadTypeCallbacks['HUGSIM'](args.source_path, data_type, ignore_dynamic) | |
else: | |
assert False, "Could not recognize scene type! "+args.source_path | |
print("Loading Training Cameras") | |
train_cameras = cameraList_from_camInfos(scene_info.train_cameras, args) | |
print("Loading Test Cameras") | |
test_cameras = cameraList_from_camInfos(scene_info.test_cameras, args) | |
return train_cameras, test_cameras, scene_info | |
class Scene: | |
def __init__(self, args, gaussians:GaussianModel, load_iteration=None, shuffle=True, | |
data_type='kitti360', ignore_dynamic=False, planning=None): | |
"""b | |
:param path: Path to colmap scene main folder. | |
""" | |
self.model_path = args.model_path | |
self.loaded_iter = None | |
self.gaussians = gaussians | |
self.data_type = data_type | |
if load_iteration: | |
if load_iteration == -1: | |
self.loaded_iter = searchForMaxIteration(os.path.join(self.model_path, "ckpts")) | |
else: | |
self.loaded_iter = load_iteration | |
print("Loading trained model at iteration {}".format(self.loaded_iter)) | |
self.train_cameras, self.test_cameras, scene_info = load_cameras(args, data_type, ignore_dynamic) | |
self.dynamic_verts = scene_info.verts | |
self.dynamic_gaussians = {} | |
for track_id in scene_info.verts: | |
self.dynamic_gaussians[track_id] = ObjModel(args.model.sh_degree, feat_mutable=False) | |
if planning is not None: | |
for plan_id in planning.keys(): | |
self.dynamic_gaussians[plan_id] = ObjModel(args.model.sh_degree, feat_mutable=False) | |
if not self.loaded_iter: | |
shutil.copyfile(scene_info.ply_path, os.path.join(self.model_path, "input.ply")) | |
shutil.copyfile(os.path.join(args.source_path, 'meta_data.json'), os.path.join(self.model_path, 'meta_data.json')) | |
shutil.copyfile(os.path.join(args.source_path, 'ground_param.pkl'), os.path.join(self.model_path, 'ground_param.pkl')) | |
if shuffle: | |
random.shuffle(scene_info.train_cameras) | |
random.shuffle(scene_info.test_cameras) | |
self.cameras_extent = scene_info.nerf_normalization["radius"] | |
if self.loaded_iter: | |
(model_params, first_iter) = torch.load(os.path.join(self.model_path, "ckpts", f"chkpnt{self.loaded_iter}.pth")) | |
gaussians.restore(model_params, None) | |
for iid, dynamic_gaussian in self.dynamic_gaussians.items(): | |
if planning is None or iid not in planning: | |
(model_params, first_iter) = torch.load(os.path.join(self.model_path, "ckpts", f"dynamic_{iid}_chkpnt{self.loaded_iter}.pth")) | |
dynamic_gaussian.restore(model_params, None) | |
else: | |
(model_params, first_iter) = torch.load(planning[iid]) | |
model_params = list(model_params) | |
model_params.append(None) | |
dynamic_gaussian.restore(model_params, None) | |
# for iid, unicycle_pkg in self.unicycles.items(): | |
# model_params = torch.load(os.path.join(self.model_path, "ckpts", f"unicycle_{iid}_chkpnt{self.loaded_iter}.pth")) | |
# unicycle_pkg['model'].restore(model_params) | |
else: | |
self.gaussians.create_from_pcd(scene_info.point_cloud, self.cameras_extent) | |
for track_id in self.dynamic_gaussians.keys(): | |
vertices = scene_info.verts[track_id] | |
# init from template | |
l, h, w = vertices[:, 0].max() - vertices[:, 0].min(), vertices[:, 1].max() - vertices[:, 1].min(), vertices[:, 2].max() - vertices[:, 2].min() | |
pcd = o3d.io.read_point_cloud(f"utils/vehicle_template/benz_{data_type}.ply") | |
points = np.array(pcd.points) * np.array([l, h, w]) | |
pcd.points = o3d.utility.Vector3dVector(points) | |
pcd.colors = o3d.utility.Vector3dVector(np.ones_like(points) * 0.5) | |
self.dynamic_gaussians[track_id].create_from_pcd(pcd, self.cameras_extent) | |
def save(self, iteration): | |
# self.gaussians.save_ply(os.path.join(point_cloud_path, "point_cloud.ply")) | |
point_cloud_vis_path = os.path.join(self.model_path, "point_cloud_vis/iteration_{}".format(iteration)) | |
self.gaussians.save_vis_ply(os.path.join(point_cloud_vis_path, "point.ply")) | |
for iid, dynamic_gaussian in self.dynamic_gaussians.items(): | |
dynamic_gaussian.save_vis_ply(os.path.join(point_cloud_vis_path, f"dynamic_{iid}.ply")) | |
def getTrainCameras(self): | |
return self.train_cameras | |
def getTestCameras(self): | |
return self.test_cameras |