eawolf2357-git / videoxl /eval /model_vqa_mmmu.py
seawolf2357's picture
Upload folder using huggingface_hub
321d89c verified
import re
import random
import numpy as np
import os
import json
import yaml
import torch
from tqdm import tqdm
from datasets import load_dataset, concatenate_datasets
from argparse import ArgumentParser
from bunny.model.builder import load_pretrained_model
from bunny.util.mm_utils import get_model_name_from_path, tokenizer_image_token
from bunny.constants import IMAGE_TOKEN_INDEX, DEFAULT_IMAGE_TOKEN
from bunny.conversation import conv_templates
CAT_SHORT2LONG = {
'acc': 'Accounting',
'agri': 'Agriculture',
'arch': 'Architecture_and_Engineering',
'art': 'Art',
'art_theory': 'Art_Theory',
'bas_med': 'Basic_Medical_Science',
'bio': 'Biology',
'chem': 'Chemistry',
'cli_med': 'Clinical_Medicine',
'cs': 'Computer_Science',
'design': 'Design',
'diag_med': 'Diagnostics_and_Laboratory_Medicine',
'econ': 'Economics',
'elec': 'Electronics',
'ep': 'Energy_and_Power',
'fin': 'Finance',
'geo': 'Geography',
'his': 'History',
'liter': 'Literature',
'manage': 'Manage',
'mark': 'Marketing',
'mate': 'Materials',
'math': 'Math',
'mech': 'Mechanical_Engineering',
'music': 'Music',
'phar': 'Pharmacy',
'phys': 'Physics',
'psy': 'Psychology',
'pub_health': 'Public_Health',
'socio': 'Sociology'
}
# ----------- Process Multi-choice -------------
def parse_multi_choice_response(response, all_choices, index2ans):
"""
Parse the prediction from the generated response.
Return the predicted index e.g., A, B, C, D.
"""
for char in [',', '.', '!', '?', ';', ':', "'"]:
response = response.strip(char)
response = " " + response + " " # add space to avoid partial match
index_ans = True
ans_with_brack = False
candidates = []
for choice in all_choices: # e.g., (A) (B) (C) (D)
if f'({choice})' in response:
candidates.append(choice)
ans_with_brack = True
if len(candidates) == 0:
for choice in all_choices: # e.g., A B C D
if f' {choice} ' in response:
candidates.append(choice)
# if all above doesn't get candidates, check if the content is larger than 5 tokens and try to parse the example
if len(candidates) == 0 and len(response.split()) > 5:
for index, ans in index2ans.items():
if ans.lower() in response.lower():
candidates.append(index)
index_ans = False # it's content ans.
if len(candidates) == 0: # still not get answer, randomly choose one.
pred_index = random.choice(all_choices)
elif len(candidates) > 1:
start_indexes = []
if index_ans:
if ans_with_brack:
for can in candidates:
index = response.rfind(f'({can})')
start_indexes.append(index) # -1 will be ignored anyway
# start_indexes = [generated_response.index(f'({can})') for can in candidates]
else:
for can in candidates:
index = response.rfind(f" {can} ")
start_indexes.append(index)
else:
for can in candidates:
index = response.lower().rfind(index2ans[can].lower())
start_indexes.append(index)
# get the last one
pred_index = candidates[np.argmax(start_indexes)]
else: # if only one candidate, use it.
pred_index = candidates[0]
return pred_index
def call_bunny_engine_df(args, sample, model, tokenizer=None, processor=None):
def deal_with_prompt(input_text):
qs = input_text
qs = DEFAULT_IMAGE_TOKEN + '\n' + qs
return qs
prompt = sample['final_input_prompt']
prompt = deal_with_prompt(prompt)
conv = conv_templates[args.conv_mode].copy()
conv.append_message(conv.roles[0], prompt)
conv.append_message(conv.roles[1], None)
prompt = conv.get_prompt()
input_ids = tokenizer_image_token(prompt, tokenizer, IMAGE_TOKEN_INDEX, return_tensors='pt').unsqueeze(0).cuda()
image = sample['image']
if image is not None:
output_ids = model.generate(
input_ids,
images=image.unsqueeze(0).to(dtype=model.dtype, device='cuda', non_blocking=True),
do_sample=False,
temperature=0,
top_p=None,
# num_beams=5,
max_new_tokens=128,
use_cache=True)
input_token_len = input_ids.shape[1]
# n_diff_input_output = (input_ids != output_ids[:, :input_token_len]).sum().item()
# if n_diff_input_output > 0:
# print(f'[Warning] {n_diff_input_output} output_ids are not the same as the input_ids')
response = tokenizer.batch_decode(output_ids[:, input_token_len:], skip_special_tokens=True)[0]
else: # multiple images actually
if sample['question_type'] == 'multiple-choice':
all_choices = sample['all_choices']
response = random.choice(all_choices)
else:
response = 'INVALID GENERATION FOR MULTIPLE IMAGE INPUTS'
return response
def load_yaml(file_path):
with open(file_path, 'r') as stream:
try:
yaml_dict = yaml.safe_load(stream)
except yaml.YAMLError as exc:
print(exc)
return yaml_dict
def parse_img_path(text):
matches = re.findall("<img='(.*?)'>", text)
return matches
def process_single_sample(data):
question = data['question']
o_imgs_paths = []
for option in data['options']:
current_o_imgs_paths = parse_img_path(option)
for img_path in current_o_imgs_paths:
o_imgs_paths.append(img_path)
if len(o_imgs_paths) > 1: # multiple images in options, used for random selection
return {'id': data['id'], 'question': question, 'options': data['options'], 'answer': data['answer'],
'image': None, 'question_type': data['question_type']}
else:
return {'id': data['id'], 'question': question, 'options': data['options'], 'answer': data['answer'],
'image': data['image_1'], 'question_type': data['question_type']}
# DATA PROCESSING
def construct_prompt(sample, config):
question = sample['question']
options = eval(sample['options'])
example = ""
if sample['question_type'] == 'multiple-choice':
start_chr = 'A'
prediction_range = []
index2ans = {}
for option in options:
prediction_range.append(start_chr)
example += f"({start_chr}) {option}\n"
index2ans[start_chr] = option
start_chr = chr(ord(start_chr) + 1)
empty_prompt_sample_structure = config['multi_choice_example_format']
empty_prompt = empty_prompt_sample_structure.format(question, example)
res_dict = {}
res_dict['index2ans'] = index2ans
res_dict['correct_choice'] = sample['answer']
res_dict['all_choices'] = prediction_range
res_dict['empty_prompt'] = empty_prompt
if config['task_instructions']:
res_dict['final_input_prompt'] = config['task_instructions'].strip() + '\n\n' + empty_prompt
else:
res_dict['final_input_prompt'] = empty_prompt
res_dict['gt_content'] = options[ord(sample['answer'].upper()) - ord('A')]
else:
empty_prompt_sample_structure = config['short_ans_example_format']
empty_prompt = empty_prompt_sample_structure.format(question)
res_dict = {}
res_dict['empty_prompt'] = empty_prompt
if config['task_instructions']:
res_dict['final_input_prompt'] = config['task_instructions'].strip() + '\n\n' + empty_prompt
else:
res_dict['final_input_prompt'] = empty_prompt
res_dict['gt_content'] = sample['answer']
res_dict.update(sample)
return res_dict
def run_model(args, samples, model, call_model_engine_fn=None, tokenizer=None, processor=None):
out_samples = dict()
with torch.no_grad():
for sample in tqdm(samples):
if args.small_gpu_usage:
sample['image'] = sample['image'].cuda()
response = call_model_engine_fn(args, sample, model, tokenizer, processor)
if args.small_gpu_usage:
sample['image'] = sample['image'].cpu()
if sample['question_type'] == 'multiple-choice':
pred_ans = parse_multi_choice_response(response, sample['all_choices'], sample['index2ans'])
else: # open question
pred_ans = response
out_samples[sample['id']] = pred_ans
return out_samples
def set_seed(seed_value):
"""
Set the seed for PyTorch (both CPU and CUDA), Python, and NumPy for reproducible results.
:param seed_value: An integer value to be used as the seed.
"""
torch.manual_seed(seed_value)
if torch.cuda.is_available():
torch.cuda.manual_seed(seed_value)
torch.cuda.manual_seed_all(seed_value) # For multi-GPU setups
random.seed(seed_value)
np.random.seed(seed_value)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
def main():
parser = ArgumentParser()
parser.add_argument('--model-path', type=str, default=None)
parser.add_argument('--model-base', type=str, default=None)
parser.add_argument("--model-type", type=str, default=None)
parser.add_argument("--conv-mode", type=str, default=None)
parser.add_argument('--data-path', type=str, default=None)
parser.add_argument('--config-path', type=str, default=None)
parser.add_argument('--output-path', type=str, default=None)
parser.add_argument('--split', type=str, default='validation')
parser.add_argument('--seed', type=int, default=42)
parser.add_argument("--small-gpu-usage", action="store_true")
args = parser.parse_args()
device = torch.device("cuda") if torch.cuda.is_available() else "cpu"
set_seed(args.seed)
print('bunny_initializing...')
processor = None
call_model_engine = call_bunny_engine_df
# load config and process to one value
args.config = load_yaml(args.config_path)
for key, value in args.config.items():
if key != 'eval_params' and type(value) == list:
assert len(value) == 1, 'key {} has more than one value'.format(key)
args.config[key] = value[0]
# run for each subject
sub_dataset_list = []
for subject in CAT_SHORT2LONG.values():
sub_dataset = load_dataset(args.data_path, subject, split=args.split)
sub_dataset_list.append(sub_dataset)
# merge all dataset
dataset = concatenate_datasets(sub_dataset_list)
# load model
model_path = os.path.expanduser(args.model_path)
model_name = get_model_name_from_path(model_path)
tokenizer, model, vis_processors, context_len = load_pretrained_model(model_path, args.model_base, model_name,
args.model_type)
samples = []
print('Processing MMMU dataset...')
for sample in tqdm(dataset):
sample = process_single_sample(sample)
sample = construct_prompt(sample, args.config)
if sample['image']:
if args.small_gpu_usage:
sample['image'] = vis_processors.preprocess(sample['image'].convert('RGB'), return_tensors='pt')['pixel_values'][0]
else:
sample['image'] = vis_processors.preprocess(sample['image'].convert('RGB'), return_tensors='pt')['pixel_values'][0].to(device)
samples.append(sample)
print('Start to evaluate...')
# run ex
out_samples = run_model(args, samples, model, call_model_engine, tokenizer, processor)
os.makedirs(os.path.dirname(args.output_path), exist_ok=True)
with open(args.output_path, 'w') as f:
json.dump(out_samples, f, indent=4)
if __name__ == '__main__':
main()