import os import json import argparse from torch.utils.data import DataLoader from pointllm.data import ModelNet from tqdm import tqdm import torch from llava.constants import IMAGE_TOKEN_INDEX, DEFAULT_IMAGE_TOKEN from llava.conversation import conv_templates from llava.model.builder import load_pretrained_model from llava.mm_utils import tokenizer_image_token, get_model_name_from_path class MyClass: def __init__(self, arg): self.vision_tower = None self.pretrain_mm_mlp_adapter = arg.pretrain_mm_mlp_adapter self.encoder_type = 'pc_encoder' self.std=arg.std self.pc_encoder_type = arg.pc_encoder_type self.pc_feat_dim = 192 self.embed_dim = 1024 self.group_size = 64 self.num_group =512 self.pc_encoder_dim =512 self.patch_dropout = 0.0 self.pc_ckpt_path = arg.pc_ckpt_path self.lora_path = arg.lora_path self.model_path=arg.model_path self.get_pc_tokens_way=arg.get_pc_tokens_way def init_model(model_arg_): model_path = "llava-vicuna_phi_3_finetune_weight" model_name = get_model_name_from_path(model_path) model_path = model_arg_.model_path tokenizer, model, context_len = load_pretrained_model(model_path, None, model_name) if model_arg_.lora_path: from peft import PeftModel model = PeftModel.from_pretrained(model, model_arg_.lora_path) print("load lora weight ok") model.get_model().initialize_other_modules(model_arg_) print("load encoder, mlp ok") device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") # 将模型加载到CUDA设备 model.to(dtype=torch.bfloat16) model.get_model().vision_tower.to(dtype=torch.float) model.to(device) return tokenizer, model PROMPT_LISTS = [ "What is this?", "This is an object of " ] def load_dataset(data_path, config_path, split, subset_nums, use_color): print(f"Loading {split} split of ModelNet datasets.") dataset = ModelNet(data_path=data_path, config_path=config_path, split=split, subset_nums=subset_nums, use_color=use_color) print("Done!") return dataset def get_dataloader(dataset, batch_size, shuffle=False, num_workers=4): assert shuffle is False, "Since we using the index of ModelNet as Object ID when evaluation \ so shuffle shoudl be False and should always set random seed." dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=shuffle, num_workers=num_workers) return dataloader def start_generation(model, tokenizer, dataloader, prompt_index, output_dir, output_file, args): # stop_str = conv.sep if conv.sep_style != SeparatorStyle.TWO else conv.sep2 qs = PROMPT_LISTS[prompt_index] results = {"prompt": qs} qs = DEFAULT_IMAGE_TOKEN + "\n" + qs conv_mode = "phi3_instruct" conv = conv_templates[conv_mode].copy() conv.append_message(conv.roles[0], qs) conv.append_message(conv.roles[1], None) qs = conv.get_prompt() input_ids = ( tokenizer_image_token(qs, tokenizer, IMAGE_TOKEN_INDEX, return_tensors="pt") .unsqueeze(0) .cuda() ) responses = [] for batch in tqdm(dataloader): point_clouds = batch["point_clouds"].cuda() # * tensor of B, N, C(3) labels = batch["labels"] label_names = batch["label_names"] indice = batch["indice"] texts = input_ids.repeat(point_clouds.size()[0], 1) images_tensor = point_clouds.to(dtype=torch.bfloat16) temperature = args.temperature top_p = args.top_p max_new_tokens = args.max_new_tokens min_new_tokens = args.min_new_tokens num_beams = args.num_beams with torch.inference_mode(): output_ids = model.generate( texts, images=images_tensor, do_sample=True if temperature > 0 and num_beams == 1 else False, temperature=temperature, top_p=top_p, num_beams=num_beams, max_new_tokens=max_new_tokens, min_new_tokens=min_new_tokens, use_cache=True, ) answers = tokenizer.batch_decode(output_ids, skip_special_tokens=True) outputs = [] for answer in answers: answer = answer.strip() answer = answer.replace("<|end|>", "").strip() outputs.append(answer) # saving results for index, output, label, label_name in zip(indice, outputs, labels, label_names): responses.append({ "object_id": index.item(), "ground_truth": label.item(), "model_output": output, "label_name": label_name }) results["results"] = responses os.makedirs(output_dir, exist_ok=True) # save the results to a JSON file with open(os.path.join(output_dir, output_file), 'w') as fp: json.dump(results, fp, indent=2) # * print info print(f"Saved results to {os.path.join(output_dir, output_file)}") return results def main(args): # * ouptut args.output_dir = os.path.join(args.out_path, "evaluation") # * output file args.output_file = f"ModelNet_classification_prompt{args.prompt_index}.json" args.output_file_path = os.path.join(args.output_dir, args.output_file) # * First inferencing, then evaluate if not os.path.exists(args.output_file_path): # * need to generate results first dataset = load_dataset(data_path=args.data_path, config_path=None, split=args.split, subset_nums=args.subset_nums, use_color=args.use_color) # * defalut config dataloader = get_dataloader(dataset, args.batch_size, args.shuffle, args.num_workers) model_arg = MyClass(args) tokenizer, model = init_model(model_arg) model.eval() # * ouptut print(f'[INFO] Start generating results for {args.output_file}.') results = start_generation(model, tokenizer, dataloader, args.prompt_index, args.output_dir, args.output_file, args) # * release model and tokenizer, and release cuda memory del model torch.cuda.empty_cache() else: # * directly load the results print(f'[INFO] {args.output_file_path} already exists, directly loading...') with open(args.output_file_path, 'r') as fp: results = json.load(fp) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--out_path", type=str, default="./output_json") parser.add_argument("--pretrain_mm_mlp_adapter", type=str, required=True) parser.add_argument("--lora_path", type=str, default=None) parser.add_argument("--model_path", type=str, default='./lava-vicuna_2024_4_Phi-3-mini-4k-instruct') parser.add_argument("--std", type=float, default=0.0) parser.add_argument("--pc_ckpt_path", type=str, required=True, default="./pretrained_weight/Uni3D_PC_encoder/modelzoo/uni3d-small/model.pt") parser.add_argument("--pc_encoder_type", type=str, required=True, default='small') parser.add_argument("--get_pc_tokens_way", type=str, required=True) # * dataset type parser.add_argument("--data_path", type=str, default="./dataset/modelnet40_data", help="train or test.") parser.add_argument("--split", type=str, default="test", help="train or test.") parser.add_argument("--use_color", action="store_true", default=True) # * data loader, batch_size, shuffle, num_workers parser.add_argument("--batch_size", type=int, default=10) parser.add_argument("--shuffle", type=bool, default=False) parser.add_argument("--num_workers", type=int, default=20) parser.add_argument("--subset_nums", type=int, default=-1) # * only use "subset_nums" of samples, mainly for debug # * evaluation setting parser.add_argument("--prompt_index", type=int, required=True, help="0 or 1") ############## new add parser.add_argument("--max_new_tokens", type=int, default=110, help="max number of generated tokens") parser.add_argument("--min_new_tokens", type=int, default=0, help="min number of generated tokens") parser.add_argument("--num_beams", type=int, default=1) parser.add_argument("--temperature", type=float, default=0.1) parser.add_argument("--top_k", type=int, default=1) parser.add_argument("--top_p", type=float, default=0.7) ############## new add args = parser.parse_args() main(args)