import argparse from collections import defaultdict from pathlib import Path from zipfile import ZipFile from io import TextIOWrapper import json import logging import numpy as np from benchmark.utils import load_poses, subsample_poses, load_K, precision_recall from benchmark.metrics import MetricManager, Inputs import benchmark.config as config from config.default import cfg def plot_perfect_curve(P): total_bins = 1000 prec_values = [] ratio_values = [] for i in range(total_bins): ratio_tmp = i/total_bins value = min(1, P / ratio_tmp) prec_values.append(value) ratio_values.append(ratio_tmp) return prec_values, ratio_values def compute_scene_metrics(dataset_path: Path, submission_zip: ZipFile, scene: str): metric_manager = MetricManager() # load intrinsics and poses try: K, W, H = load_K(dataset_path / scene / 'intrinsics.txt') with (dataset_path / scene / 'poses.txt').open('r', encoding='utf-8') as gt_poses_file: gt_poses = load_poses(gt_poses_file, load_confidence=False) except FileNotFoundError as e: logging.error(f'Could not find ground-truth dataset files: {e}') raise else: logging.info( f'Loaded ground-truth intrinsics and poses for scene {scene}') # try to load estimated poses from submission try: with submission_zip.open(f'pose_{scene}.txt') as estimated_poses_file: estimated_poses_file_wrapper = TextIOWrapper( estimated_poses_file, encoding='utf-8') estimated_poses = load_poses( estimated_poses_file_wrapper, load_confidence=True) except KeyError as e: logging.warning( f'Submission does not have estimates for scene {scene}.') return dict(), len(gt_poses) except UnicodeDecodeError as e: logging.error('Unsupported file encoding: please use UTF-8') raise else: logging.info(f'Loaded estimated poses for scene {scene}') # The val/test set is subsampled by a factor of 5 gt_poses = subsample_poses(gt_poses, subsample=5) # failures encode how many frames did not have an estimate # e.g. user/method did not provide an estimate for that frame # it's different from when an estimate is provided with low confidence! failures = 0 # Results encoded as dict # key: metric name; value: list of values (one per frame). # e.g. results['t_err'] = [1.2, 0.3, 0.5, ...] results = defaultdict(list) # compute metrics per frame for frame_num, (q_gt, t_gt, _) in gt_poses.items(): if frame_num not in estimated_poses: failures += 1 continue q_est, t_est, confidence = estimated_poses[frame_num] inputs = Inputs(q_gt=q_gt, t_gt=t_gt, q_est=q_est, t_est=t_est, confidence=confidence, K=K[frame_num], W=W, H=H) metric_manager(inputs, results) return results, failures def aggregate_results(all_results, all_failures): # aggregate metrics median_metrics = defaultdict(list) all_metrics = defaultdict(list) for scene_results in all_results.values(): for metric, values in scene_results.items(): median_metrics[metric].append(np.median(values)) all_metrics[metric].extend(values) all_metrics = {k: np.array(v) for k, v in all_metrics.items()} assert all([v.ndim == 1 for v in all_metrics.values()] ), 'invalid metrics shape' # compute avg median metrics avg_median_metrics = {metric: np.mean( values) for metric, values in median_metrics.items()} # compute precision/AUC for pose error and reprojection errors accepted_poses = (all_metrics['trans_err'] < config.t_threshold) * \ (all_metrics['rot_err'] < config.R_threshold) accepted_vcre = all_metrics['reproj_err'] < config.vcre_threshold total_samples = len(next(iter(all_metrics.values()))) + all_failures prec_pose = np.sum(accepted_poses) / total_samples prec_vcre = np.sum(accepted_vcre) / total_samples # compute AUC for pose and VCRE pose_prec_values, pose_recall_values, auc_pose = precision_recall( inliers=all_metrics['confidence'], tp=accepted_poses, failures=all_failures) vcre_prec_values, vcre_recall_values, auc_vcre = precision_recall( inliers=all_metrics['confidence'], tp=accepted_vcre, failures=all_failures) curves_data = {} curves_data['vcre_prec_values'], curves_data['vcre_recall_values'] = vcre_prec_values, vcre_recall_values curves_data['pose_prec_values'], curves_data['pose_recall_values'] = pose_prec_values, pose_recall_values # output metrics output_metrics = dict() output_metrics['Average Median Translation Error'] = avg_median_metrics['trans_err'] output_metrics['Average Median Rotation Error'] = avg_median_metrics['rot_err'] output_metrics['Average Median Reprojection Error'] = avg_median_metrics['reproj_err'] output_metrics[f'Precision @ Pose Error < ({config.t_threshold*100}cm, {config.R_threshold}deg)'] = prec_pose output_metrics[f'AUC @ Pose Error < ({config.t_threshold*100}cm, {config.R_threshold}deg)'] = auc_pose output_metrics[f'Precision @ VCRE < {config.vcre_threshold}px'] = prec_vcre output_metrics[f'AUC @ VCRE < {config.vcre_threshold}px'] = auc_vcre output_metrics[f'Estimates for % of frames'] = len(all_metrics['trans_err']) / total_samples return output_metrics, curves_data def count_unexpected_scenes(scenes: tuple, submission_zip: ZipFile): submission_scenes = [fname[5:-4] for fname in submission_zip.namelist() if fname.startswith("pose_")] return len(set(submission_scenes) - set(scenes)) def main(args): dataset_path = args.dataset_path / args.split scenes = tuple(f.name for f in dataset_path.iterdir() if f.is_dir()) try: submission_zip = ZipFile(args.submission_path, 'r') except FileNotFoundError as e: logging.error(f'Could not find ZIP file in path {args.submission_path}') return all_results = dict() all_failures = 0 for scene in scenes: metrics, failures = compute_scene_metrics( dataset_path, submission_zip, scene) all_results[scene] = metrics all_failures += failures if all_failures > 0: logging.warning( f'Submission is missing pose estimates for {all_failures} frames') unexpected_scene_count = count_unexpected_scenes(scenes, submission_zip) if unexpected_scene_count > 0: logging.warning( f'Submission contains estimates for {unexpected_scene_count} scenes outside the {args.split} set') if all((len(metrics) == 0 for metrics in all_results.values())): logging.error( f'Submission does not have any valid pose estimates') return output_metrics, curves_data = aggregate_results(all_results, all_failures) output_json = json.dumps(output_metrics, indent=2) print(output_json) if __name__ == '__main__': parser = argparse.ArgumentParser( 'eval', description='Evaluate submissions for the MapFree dataset benchmark') parser.add_argument('--submission_path', type=Path, default='', help='Path to the submission ZIP file') parser.add_argument('--split', choices=('val', 'test'), default='test', help='Dataset split to use for evaluation. Default: test') parser.add_argument('--log', choices=('warning', 'info', 'error'), default='warning', help='Logging level. Default: warning') parser.add_argument('--dataset_path', type=Path, default=None, help='Path to the dataset folder') args = parser.parse_args() if args.dataset_path is None: cfg.merge_from_file('config/datasets/mapfree.yaml') args.dataset_path = Path(cfg.DATASET.DATA_ROOT) logging.basicConfig(level=args.log.upper()) try: main(args) except Exception: logging.error("Unexpected behaviour. Exiting.")