import sys sys.path.append(".") import numpy as np import torch from PIL import Image import tqdm import cv2 import argparse from RDD.RDD_helper import RDD_helper from RDD.RDD import build import matplotlib.pyplot as plt import matplotlib import os from benchmarks.utils import pose_auc, angle_error_vec, angle_error_mat, symmetric_epipolar_distance, compute_symmetrical_epipolar_errors, compute_pose_error, compute_relative_pose, estimate_pose, dynamic_alpha def make_matching_figure( img0, img1, mkpts0, mkpts1, color, kpts0=None, kpts1=None, text=[], dpi=75, path=None): # draw image pair assert mkpts0.shape[0] == mkpts1.shape[0], f'mkpts0: {mkpts0.shape[0]} v.s. mkpts1: {mkpts1.shape[0]}' fig, axes = plt.subplots(1, 2, figsize=(10, 6), dpi=dpi) axes[0].imshow(img0, cmap='gray') axes[1].imshow(img1, cmap='gray') for i in range(2): # clear all frames axes[i].get_yaxis().set_ticks([]) axes[i].get_xaxis().set_ticks([]) for spine in axes[i].spines.values(): spine.set_visible(False) plt.tight_layout(pad=1) if kpts0 is not None: assert kpts1 is not None axes[0].scatter(kpts0[:, 0], kpts0[:, 1], c='w', s=2) axes[1].scatter(kpts1[:, 0], kpts1[:, 1], c='w', s=2) # draw matches if mkpts0.shape[0] != 0 and mkpts1.shape[0] != 0: fig.canvas.draw() transFigure = fig.transFigure.inverted() fkpts0 = transFigure.transform(axes[0].transData.transform(mkpts0)) fkpts1 = transFigure.transform(axes[1].transData.transform(mkpts1)) fig.lines = [matplotlib.lines.Line2D((fkpts0[i, 0], fkpts1[i, 0]), (fkpts0[i, 1], fkpts1[i, 1]), transform=fig.transFigure, c=color[i], linewidth=1) for i in range(len(mkpts0))] axes[0].scatter(mkpts0[:, 0], mkpts0[:, 1], c=color, s=4) axes[1].scatter(mkpts1[:, 0], mkpts1[:, 1], c=color, s=4) # put txts txt_color = 'k' if img0[:100, :200].mean() > 200 else 'w' fig.text( 0.01, 0.99, '\n'.join(text), transform=fig.axes[0].transAxes, fontsize=15, va='top', ha='left', color=txt_color) # save or return figure if path: plt.savefig(str(path), bbox_inches='tight', pad_inches=0) plt.close() else: return fig def error_colormap(err, thr, alpha=1.0): assert alpha <= 1.0 and alpha > 0, f"Invaid alpha value: {alpha}" x = 1 - np.clip(err / (thr * 2), 0, 1) return np.clip( np.stack([2-x*2, x*2, np.zeros_like(x), np.ones_like(x)*alpha], -1), 0, 1) def _make_evaluation_figure(img0, img1, kpts0, kpts1, epi_errs, e_t, e_R, alpha='dynamic', path=None): conf_thr = 1e-4 img0 = np.array(img0) img1 = np.array(img1) kpts0 = kpts0 kpts1 = kpts1 epi_errs = epi_errs.cpu().numpy() correct_mask = epi_errs < conf_thr precision = np.mean(correct_mask) if len(correct_mask) > 0 else 0 n_correct = np.sum(correct_mask) # recall might be larger than 1, since the calculation of conf_matrix_gt # uses groundtruth depths and camera poses, but epipolar distance is used here. # matching info if alpha == 'dynamic': alpha = dynamic_alpha(len(correct_mask)) color = error_colormap(epi_errs, conf_thr, alpha=alpha) text = [ f'#Matches {len(kpts0)}', f'Precision({conf_thr:.2e}) ({100 * precision:.1f}%): {n_correct}/{len(kpts0)}', f'e_t: {e_t:.2f} | e_R: {e_R:.2f}', ] # make the figure figure = make_matching_figure(img0, img1, kpts0, kpts1, color, text=text, path=path) return figure class MegaDepthPoseMNNBenchmark: def __init__(self, data_root="./megadepth_test_1500", scene_names = None) -> None: if scene_names is None: self.scene_names = [ "0015_0.1_0.3.npz", "0015_0.3_0.5.npz", "0022_0.1_0.3.npz", "0022_0.3_0.5.npz", "0022_0.5_0.7.npz", ] else: self.scene_names = scene_names self.scenes = [ np.load(f"{data_root}/{scene}", allow_pickle=True) for scene in self.scene_names ] self.data_root = data_root def benchmark(self, model_helper, model_name = None, scale_intrinsics = False, calibrated = True, plot_every_iter=1, plot=False, method='sparse'): with torch.no_grad(): data_root = self.data_root tot_e_t, tot_e_R, tot_e_pose = [], [], [] thresholds = [5, 10, 20] for scene_ind in range(len(self.scenes)): import os scene_name = os.path.splitext(self.scene_names[scene_ind])[0] print(f"Processing {scene_name}") scene = self.scenes[scene_ind] pairs = scene["pair_infos"] intrinsics = scene["intrinsics"] poses = scene["poses"] im_paths = scene["image_paths"] pair_inds = range(len(pairs)) for pairind in tqdm.tqdm(pair_inds): idx0, idx1 = pairs[pairind][0] K0 = intrinsics[idx0].copy() T0 = poses[idx0].copy() R0, t0 = T0[:3, :3], T0[:3, 3] K1 = intrinsics[idx1].copy() T1 = poses[idx1].copy() R1, t1 = T1[:3, :3], T1[:3, 3] R, t = compute_relative_pose(R0, t0, R1, t1) T0_to_1 = np.concatenate((R,t[:,None]), axis=-1) im_A_path = f"{data_root}/{im_paths[idx0]}" im_B_path = f"{data_root}/{im_paths[idx1]}" im_A = cv2.imread(im_A_path) im_B = cv2.imread(im_B_path) if method == 'dense': kpts0, kpts1, conf = model_helper.match_dense(im_A, im_B, thr=0.01, resize=1600) elif method == 'lightglue': kpts0, kpts1, conf = model_helper.match_lg(im_A, im_B, thr=0.01, resize=1600) elif method == 'sparse': kpts0, kpts1, conf = model_helper.match(im_A, im_B, thr=0.01, resize=1600) else: kpts0, kpts1, conf = model_helper.match_3rd_party(im_A, im_B, thr=0.01, resize=1600, model=method) im_A = Image.open(im_A_path) w0, h0 = im_A.size im_B = Image.open(im_B_path) w1, h1 = im_B.size if scale_intrinsics: scale0 = 840 / max(w0, h0) scale1 = 840 / max(w1, h1) w0, h0 = scale0 * w0, scale0 * h0 w1, h1 = scale1 * w1, scale1 * h1 K0, K1 = K0.copy(), K1.copy() K0[:2] = K0[:2] * scale0 K1[:2] = K1[:2] * scale1 threshold = 0.5 if calibrated: norm_threshold = threshold / (np.mean(np.abs(K0[:2, :2])) + np.mean(np.abs(K1[:2, :2]))) ret = estimate_pose( kpts0, kpts1, K0, K1, norm_threshold, conf=0.99999, ) if ret is not None: R_est, t_est, mask = ret T0_to_1_est = np.concatenate((R_est, t_est), axis=-1) # T0_to_1 = np.concatenate((R, t[:,None]), axis=-1) e_t, e_R = compute_pose_error(T0_to_1_est, R, t) epi_errs = compute_symmetrical_epipolar_errors(T0_to_1, kpts0, kpts1, K0, K1) if scene_ind % plot_every_iter == 0 and plot: if not os.path.exists(f'outputs/mega_1500/{model_name}_{method}'): os.mkdir(f'outputs/mega_1500/{model_name}_{method}') name = f'outputs/mega_1500/{model_name}_{method}/{scene_name}_{pairind}.png' _make_evaluation_figure(im_A, im_B, kpts0, kpts1, epi_errs, e_t, e_R, path=name) e_pose = max(e_t, e_R) tot_e_t.append(e_t) tot_e_R.append(e_R) tot_e_pose.append(e_pose) tot_e_pose = np.array(tot_e_pose) auc = pose_auc(tot_e_pose, thresholds) acc_5 = (tot_e_pose < 5).mean() acc_10 = (tot_e_pose < 10).mean() acc_15 = (tot_e_pose < 15).mean() acc_20 = (tot_e_pose < 20).mean() map_5 = acc_5 map_10 = np.mean([acc_5, acc_10]) map_20 = np.mean([acc_5, acc_10, acc_15, acc_20]) print(f"{model_name} auc: {auc}") return { "auc_5": auc[0], "auc_10": auc[1], "auc_20": auc[2], "map_5": map_5, "map_10": map_10, "map_20": map_20, } def parse_arguments(): parser = argparse.ArgumentParser(description="Testing script.") parser.add_argument("--data_root", type=str, default="./data/megadepth_test_1500", help="Path to the MegaDepth dataset.") parser.add_argument("--weights", type=str, default="./weights/RDD-v2.pth", help="Path to the model checkpoint.") parser.add_argument("--plot", action="store_true", help="Whether to plot the results.") parser.add_argument("--method", type=str, default="sparse", help="Method for matching.") return parser.parse_args() if __name__ == "__main__": args = parse_arguments() if not os.path.exists('outputs'): os.mkdir('outputs') if not os.path.exists(f'outputs/mega_1500'): os.mkdir(f'outputs/mega_1500') model = build(weights=args.weights) benchmark = MegaDepthPoseMNNBenchmark(data_root=args.data_root) model.eval() model_helper = RDD_helper(model) with torch.no_grad(): method = args.method out = benchmark.benchmark(model_helper, model_name='RDD', plot_every_iter=1, plot=args.plot, method=method) with open(f'outputs/mega_1500/RDD_{method}.txt', 'w') as f: f.write(str(out))