import re import random import numpy as np import os import json import yaml import torch from tqdm import tqdm from datasets import load_dataset, concatenate_datasets from argparse import ArgumentParser from bunny.model.builder import load_pretrained_model from bunny.util.mm_utils import get_model_name_from_path, tokenizer_image_token from bunny.constants import IMAGE_TOKEN_INDEX, DEFAULT_IMAGE_TOKEN from bunny.conversation import conv_templates CAT_SHORT2LONG = { 'acc': 'Accounting', 'agri': 'Agriculture', 'arch': 'Architecture_and_Engineering', 'art': 'Art', 'art_theory': 'Art_Theory', 'bas_med': 'Basic_Medical_Science', 'bio': 'Biology', 'chem': 'Chemistry', 'cli_med': 'Clinical_Medicine', 'cs': 'Computer_Science', 'design': 'Design', 'diag_med': 'Diagnostics_and_Laboratory_Medicine', 'econ': 'Economics', 'elec': 'Electronics', 'ep': 'Energy_and_Power', 'fin': 'Finance', 'geo': 'Geography', 'his': 'History', 'liter': 'Literature', 'manage': 'Manage', 'mark': 'Marketing', 'mate': 'Materials', 'math': 'Math', 'mech': 'Mechanical_Engineering', 'music': 'Music', 'phar': 'Pharmacy', 'phys': 'Physics', 'psy': 'Psychology', 'pub_health': 'Public_Health', 'socio': 'Sociology' } # ----------- Process Multi-choice ------------- def parse_multi_choice_response(response, all_choices, index2ans): """ Parse the prediction from the generated response. Return the predicted index e.g., A, B, C, D. """ for char in [',', '.', '!', '?', ';', ':', "'"]: response = response.strip(char) response = " " + response + " " # add space to avoid partial match index_ans = True ans_with_brack = False candidates = [] for choice in all_choices: # e.g., (A) (B) (C) (D) if f'({choice})' in response: candidates.append(choice) ans_with_brack = True if len(candidates) == 0: for choice in all_choices: # e.g., A B C D if f' {choice} ' in response: candidates.append(choice) # if all above doesn't get candidates, check if the content is larger than 5 tokens and try to parse the example if len(candidates) == 0 and len(response.split()) > 5: for index, ans in index2ans.items(): if ans.lower() in response.lower(): candidates.append(index) index_ans = False # it's content ans. if len(candidates) == 0: # still not get answer, randomly choose one. pred_index = random.choice(all_choices) elif len(candidates) > 1: start_indexes = [] if index_ans: if ans_with_brack: for can in candidates: index = response.rfind(f'({can})') start_indexes.append(index) # -1 will be ignored anyway # start_indexes = [generated_response.index(f'({can})') for can in candidates] else: for can in candidates: index = response.rfind(f" {can} ") start_indexes.append(index) else: for can in candidates: index = response.lower().rfind(index2ans[can].lower()) start_indexes.append(index) # get the last one pred_index = candidates[np.argmax(start_indexes)] else: # if only one candidate, use it. pred_index = candidates[0] return pred_index def call_bunny_engine_df(args, sample, model, tokenizer=None, processor=None): def deal_with_prompt(input_text): qs = input_text qs = DEFAULT_IMAGE_TOKEN + '\n' + qs return qs prompt = sample['final_input_prompt'] prompt = deal_with_prompt(prompt) conv = conv_templates[args.conv_mode].copy() conv.append_message(conv.roles[0], prompt) conv.append_message(conv.roles[1], None) prompt = conv.get_prompt() input_ids = tokenizer_image_token(prompt, tokenizer, IMAGE_TOKEN_INDEX, return_tensors='pt').unsqueeze(0).cuda() image = sample['image'] if image is not None: output_ids = model.generate( input_ids, images=image.unsqueeze(0).to(dtype=model.dtype, device='cuda', non_blocking=True), do_sample=False, temperature=0, top_p=None, # num_beams=5, max_new_tokens=128, use_cache=True) input_token_len = input_ids.shape[1] # n_diff_input_output = (input_ids != output_ids[:, :input_token_len]).sum().item() # if n_diff_input_output > 0: # print(f'[Warning] {n_diff_input_output} output_ids are not the same as the input_ids') response = tokenizer.batch_decode(output_ids[:, input_token_len:], skip_special_tokens=True)[0] else: # multiple images actually if sample['question_type'] == 'multiple-choice': all_choices = sample['all_choices'] response = random.choice(all_choices) else: response = 'INVALID GENERATION FOR MULTIPLE IMAGE INPUTS' return response def load_yaml(file_path): with open(file_path, 'r') as stream: try: yaml_dict = yaml.safe_load(stream) except yaml.YAMLError as exc: print(exc) return yaml_dict def parse_img_path(text): matches = re.findall("", text) return matches def process_single_sample(data): question = data['question'] o_imgs_paths = [] for option in data['options']: current_o_imgs_paths = parse_img_path(option) for img_path in current_o_imgs_paths: o_imgs_paths.append(img_path) if len(o_imgs_paths) > 1: # multiple images in options, used for random selection return {'id': data['id'], 'question': question, 'options': data['options'], 'answer': data['answer'], 'image': None, 'question_type': data['question_type']} else: return {'id': data['id'], 'question': question, 'options': data['options'], 'answer': data['answer'], 'image': data['image_1'], 'question_type': data['question_type']} # DATA PROCESSING def construct_prompt(sample, config): question = sample['question'] options = eval(sample['options']) example = "" if sample['question_type'] == 'multiple-choice': start_chr = 'A' prediction_range = [] index2ans = {} for option in options: prediction_range.append(start_chr) example += f"({start_chr}) {option}\n" index2ans[start_chr] = option start_chr = chr(ord(start_chr) + 1) empty_prompt_sample_structure = config['multi_choice_example_format'] empty_prompt = empty_prompt_sample_structure.format(question, example) res_dict = {} res_dict['index2ans'] = index2ans res_dict['correct_choice'] = sample['answer'] res_dict['all_choices'] = prediction_range res_dict['empty_prompt'] = empty_prompt if config['task_instructions']: res_dict['final_input_prompt'] = config['task_instructions'].strip() + '\n\n' + empty_prompt else: res_dict['final_input_prompt'] = empty_prompt res_dict['gt_content'] = options[ord(sample['answer'].upper()) - ord('A')] else: empty_prompt_sample_structure = config['short_ans_example_format'] empty_prompt = empty_prompt_sample_structure.format(question) res_dict = {} res_dict['empty_prompt'] = empty_prompt if config['task_instructions']: res_dict['final_input_prompt'] = config['task_instructions'].strip() + '\n\n' + empty_prompt else: res_dict['final_input_prompt'] = empty_prompt res_dict['gt_content'] = sample['answer'] res_dict.update(sample) return res_dict def run_model(args, samples, model, call_model_engine_fn=None, tokenizer=None, processor=None): out_samples = dict() with torch.no_grad(): for sample in tqdm(samples): if args.small_gpu_usage: sample['image'] = sample['image'].cuda() response = call_model_engine_fn(args, sample, model, tokenizer, processor) if args.small_gpu_usage: sample['image'] = sample['image'].cpu() if sample['question_type'] == 'multiple-choice': pred_ans = parse_multi_choice_response(response, sample['all_choices'], sample['index2ans']) else: # open question pred_ans = response out_samples[sample['id']] = pred_ans return out_samples def set_seed(seed_value): """ Set the seed for PyTorch (both CPU and CUDA), Python, and NumPy for reproducible results. :param seed_value: An integer value to be used as the seed. """ torch.manual_seed(seed_value) if torch.cuda.is_available(): torch.cuda.manual_seed(seed_value) torch.cuda.manual_seed_all(seed_value) # For multi-GPU setups random.seed(seed_value) np.random.seed(seed_value) torch.backends.cudnn.deterministic = True torch.backends.cudnn.benchmark = False def main(): parser = ArgumentParser() parser.add_argument('--model-path', type=str, default=None) parser.add_argument('--model-base', type=str, default=None) parser.add_argument("--model-type", type=str, default=None) parser.add_argument("--conv-mode", type=str, default=None) parser.add_argument('--data-path', type=str, default=None) parser.add_argument('--config-path', type=str, default=None) parser.add_argument('--output-path', type=str, default=None) parser.add_argument('--split', type=str, default='validation') parser.add_argument('--seed', type=int, default=42) parser.add_argument("--small-gpu-usage", action="store_true") args = parser.parse_args() device = torch.device("cuda") if torch.cuda.is_available() else "cpu" set_seed(args.seed) print('bunny_initializing...') processor = None call_model_engine = call_bunny_engine_df # load config and process to one value args.config = load_yaml(args.config_path) for key, value in args.config.items(): if key != 'eval_params' and type(value) == list: assert len(value) == 1, 'key {} has more than one value'.format(key) args.config[key] = value[0] # run for each subject sub_dataset_list = [] for subject in CAT_SHORT2LONG.values(): sub_dataset = load_dataset(args.data_path, subject, split=args.split) sub_dataset_list.append(sub_dataset) # merge all dataset dataset = concatenate_datasets(sub_dataset_list) # load model model_path = os.path.expanduser(args.model_path) model_name = get_model_name_from_path(model_path) tokenizer, model, vis_processors, context_len = load_pretrained_model(model_path, args.model_base, model_name, args.model_type) samples = [] print('Processing MMMU dataset...') for sample in tqdm(dataset): sample = process_single_sample(sample) sample = construct_prompt(sample, args.config) if sample['image']: if args.small_gpu_usage: sample['image'] = vis_processors.preprocess(sample['image'].convert('RGB'), return_tensors='pt')['pixel_values'][0] else: sample['image'] = vis_processors.preprocess(sample['image'].convert('RGB'), return_tensors='pt')['pixel_values'][0].to(device) samples.append(sample) print('Start to evaluate...') # run ex out_samples = run_model(args, samples, model, call_model_engine, tokenizer, processor) os.makedirs(os.path.dirname(args.output_path), exist_ok=True) with open(args.output_path, 'w') as f: json.dump(out_samples, f, indent=4) if __name__ == '__main__': main()