import json import os import io from datetime import datetime, timezone import hmac import hashlib import time import requests import pandas as pd from colorama import Fore from huggingface_hub import HfApi, snapshot_download import os import sys current_script_path = os.path.abspath(__file__) src_directory = os.path.join(os.path.dirname(current_script_path), '..', '..') sys.path.append(src_directory) # print(sys.path) from src.display.utils import EVAL_COLS,BENCHMARK_COLS,COLS from src.envs import API, EVAL_REQUESTS_PATH, DYNAMIC_INFO_REPO, DYNAMIC_INFO_FILE_PATH, DYNAMIC_INFO_PATH, EVAL_RESULTS_PATH, TOKEN, IS_PUBLIC, QUEUE_REPO, REPO_ID, RESULTS_REPO status_mapping = { 'P': 'PENDING', 'R': 'RUNNING', 'S': 'FINISHED', 'F': 'FAILED', 'C': 'CANCELLED' } dataset_metric_mapping = { 'ChartQA': ('accuracy','acc'), 'CMMMU': ('accuracy','acc'), 'CMMU': ('accuracy','acc'), 'MMMU': ('accuracy','acc'), 'MMMU_Pro_standard': ('accuracy','acc'), 'MMMU_Pro_vision': ('accuracy','acc'), 'OCRBench': ('accuracy','acc'), 'MathVision': ('accuracy','acc'), 'CII-Bench': ('accuracy','acc'), 'Blink': ('accuracy','acc'), } failed_mapping = {} # Example usage # 生产环境 #base_url = 'https://flageval.baai.ac.cn/api/hf' #secret = b'M2L84t36MdzwS1Lb' # 测试环境 base_url = 'http://120.92.17.239:8080/api/hf' secret = b'Dn29TMCxzvKBGMS8' # model_id = 'Qwen/Qwen1.5-0.5B' MAX_GPU_USAGE = 20 LC_A800_QUEUE_ID = "877467e6-808b-487e-8a06-af8e96c83fa6" A800_QUEUE_ID = "f016ff98-6ec8-4b1e-aed2-9a93753119b2" A100_QUEUE_ID = "7f8cb309-295f-4f56-8159-f43f60f03f9c" MAX_A800_UASGE = 1 def get_gpu_number(params=0): # 参数量除以 30 再向上取整,就算 params为0,最小为1 # return -(-params // 35) # return -(-params // 35) # return -(-params // 35) if params == 0: return 0, A100_QUEUE_ID if params < 9: return 1, A100_QUEUE_ID if params < 15: return 2, A100_QUEUE_ID elif params < 35: return 4, A100_QUEUE_ID elif params < 70: return 3, LC_A800_QUEUE_ID elif params < 100: return 5, LC_A800_QUEUE_ID elif params < 140: return 6, LC_A800_QUEUE_ID else: return 8, LC_A800_QUEUE_ID def generate_signature(secret, url, body): timestamp = str(int(time.time())) to_sign = f'{timestamp}{url}{body}' h = hmac.new(secret, to_sign.encode('utf-8'), digestmod=hashlib.sha256) sign = h.hexdigest() return sign, timestamp def submit_evaluation(base_url, secret, model_id, require_gpus=None, priority=None, gpus_queue_id=None, hf_user_id=None): url = f'{base_url}/mm/batches' data = {'modelId': model_id} if require_gpus is not None: data['requireGpus'] = require_gpus if priority is not None: data['priority'] = priority if gpus_queue_id is not None: data['gpus_queue_id'] = gpus_queue_id if hf_user_id is not None: data['hfUserId'] = hf_user_id raw_body = json.dumps(data) sign, timestamp = generate_signature(secret, url, raw_body) headers = { 'Content-Type': 'application/json', 'X-Flageval-Sign': sign, 'X-Flageval-Timestamp': timestamp, } response = requests.post(url, data=raw_body, headers=headers) print("submit_evaluation response",response) response_data = response.json() evaluation_info = { 'evaluationId': response_data.get('evaluationId'), 'eval_id': response_data.get('id') } return evaluation_info def poll_evaluation_progress(base_url, secret, batch_id): url = f'{base_url}/mm/batches/{int(batch_id)}' sign, timestamp = generate_signature(secret, url, '') headers = { 'X-Flageval-Sign': sign, 'X-Flageval-Timestamp': timestamp, } try: response = requests.get(url, headers=headers) response.raise_for_status() # 如果响应状态不是200,将引发HTTPError异常 response_data = response.json() evaluation_progress = { 'evaluationId': response_data.get('evaluationId'), 'eval_id': response_data.get('batchId'), 'status': response_data.get('status'), 'details': response_data.get('details', []) } return evaluation_progress except requests.exceptions.RequestException as e: print(f"请求错误: {e}") except ValueError: print(f"解析JSON时出错:{response}") except Exception as e: print(f"未知错误: {e}") return {'status': '未执行成功'} def update_gpu_usage(change): global current_gpu_usage current_gpu_usage += change def get_evaluation_queue_df(save_path: str, cols: list) -> list[pd.DataFrame]: all_evals = [] for root, dirs, files in os.walk(save_path): for file in files: if file.endswith(".json"): file_path = os.path.join(root, file) with open(file_path) as fp: data = json.load(fp) # 确保所有列都存在,不存在的列初始化为 None for col in cols: if col not in data: if col == "failed_status": data[col] = 0 else: data[col] = None all_evals.append(data) # all_eval order by submited_time all_evals = sorted(all_evals, key=lambda x: x['submitted_time']) pending_list = [e for e in all_evals if e["status"] in ["PENDING", "RERUN"]] pending_list = sorted(pending_list, key=lambda x: x['params']) pending_list = sorted(pending_list, key=lambda x: x['failed_status']) running_list = [e for e in all_evals if e["status"] == "RUNNING"] finished_list = [e for e in all_evals if e["status"].startswith("FINISHED") or e["status"] == "PENDING_NEW_EVAL"] df_pending = pd.DataFrame(pending_list) if pending_list else pd.DataFrame(columns=cols) df_running = pd.DataFrame(running_list) if running_list else pd.DataFrame(columns=cols) df_finished = pd.DataFrame(finished_list) if finished_list else pd.DataFrame(columns=cols) return df_finished[cols], df_running[cols], df_pending[cols] def update_evaluation_queue(model_name, nstatus, eval_id=None, flageval_id=None): print("update_evaluation_queue", model_name, nstatus, eval_id) fail_status = -1 if len(nstatus.split("_")) == 2: status, fail_status = nstatus.split("_")[0], int(nstatus.split("_")[1]) else: status = nstatus user_name, model_path = model_name.split("/") if "/" in model_name else ("", model_name) out_dir = f"{EVAL_REQUESTS_PATH}/{user_name}" json_files = [f for f in os.listdir(out_dir) if f.startswith(model_path + '_') and f.endswith(".json")] if not json_files: print(f"No JSON file found for model {model_name}") return for json_file in json_files: json_path = os.path.join(out_dir, json_file) with open(json_path, "r") as f: eval_entry = json.load(f) print("befor update_evaluation_queue", eval_entry['status'], eval_entry['failed_status']) eval_entry['status'] = status if fail_status >=0: eval_entry['failed_status'] = fail_status if eval_id is not None: eval_entry['eval_id'] = eval_id if flageval_id is not None: eval_entry['flageval_id'] = flageval_id print("after update_evaluation_queue status change", eval_entry['status'], eval_entry['failed_status']) with open(json_path, "w") as f: # f.write(json.dumps(eval_entry)) json.dump(eval_entry, f, indent=4) api.upload_file( path_or_fileobj=json_path, path_in_repo=json_path.split(f"{EVAL_REQUESTS_PATH}/")[1], repo_id=QUEUE_REPO, repo_type="dataset", commit_message=f"Update {model_name} status to {status}", ) def save_and_upload_results(model_name, details): converted_details = { "config_general": { "model_name": model_name, "model_dtype": "float16", "model_size": 0 }, "results": {}, "versions": {}, "config_tasks": {}, "summary_tasks": {}, "summary_general": {} } for detail in details: dataset = detail['dataset'] status = detail['status'] # accuracy = detail['accuracy'] if status == 'S' and dataset in dataset_metric_mapping.keys(): # dataset_key = f"harness|{dataset}|5" acc_key = dataset_metric_mapping[dataset][0] acc = detail['accuracy'] if acc_key == 'accuracy' else detail['rawDetails'][acc_key] converted_details['results'][dataset] = { dataset_metric_mapping[dataset][1]: acc, "acc_stderr": 0 } # 添加详细信息 for metric, value in detail['rawDetails'].items(): converted_details['results'][dataset][metric] = value out_dir = f"{EVAL_RESULTS_PATH}/{model_name}" os.makedirs(out_dir, exist_ok=True) result_path = os.path.join(out_dir, f"results_{datetime.now().strftime('%Y-%m-%dT%H-%M-%S.%f')}.json") with open(result_path, "w") as f: json.dump(converted_details, f, indent=4) api.upload_file( path_or_fileobj=result_path, path_in_repo=result_path.split(f"{EVAL_RESULTS_PATH}/")[1], repo_id=RESULTS_REPO, repo_type="dataset", commit_message=f"Add results for {model_name}", ) from tqdm.auto import tqdm import io class SilentTqdm(tqdm): def __init__(self, *args, **kwargs): kwargs['bar_format'] = '' kwargs['leave'] = False super().__init__(*args, **kwargs, file=io.StringIO()) def update(self, n=1): pass def close(self): pass def snapshot_download_with_retry(max_retries, wait_time, *args, **kwargs): for i in range(max_retries): try: return snapshot_download(*args, **kwargs) except Exception as e: if i < max_retries - 1: # i is zero indexed print(f"Error occurred: {e}. Retrying in {wait_time} seconds...") time.sleep(wait_time) else: print("Max retries reached. Raising exception.") raise api = HfApi() print(EVAL_REQUESTS_PATH) print(DYNAMIC_INFO_PATH) print(EVAL_RESULTS_PATH) prev_running_models = '' while True: snapshot_download_with_retry(5, 10, repo_id=QUEUE_REPO, local_dir=EVAL_REQUESTS_PATH, repo_type="dataset", tqdm_class=SilentTqdm, etag_timeout=30) snapshot_download_with_retry(5, 10, repo_id=DYNAMIC_INFO_REPO, local_dir=DYNAMIC_INFO_PATH, repo_type="dataset", tqdm_class=SilentTqdm, etag_timeout=30) snapshot_download_with_retry(5, 10, repo_id=RESULTS_REPO, local_dir=EVAL_RESULTS_PATH, repo_type="dataset", tqdm_class=SilentTqdm, etag_timeout=30) ( finished_eval_queue_df, running_eval_queue_df, pending_eval_queue_df, ) = get_evaluation_queue_df(EVAL_REQUESTS_PATH, ['model','status','params','eval_id', 'failed_status']) ## pending list test pending_list = [row for _,row in pending_eval_queue_df.iterrows()] for pend in pending_list: print("pending", pend) # 根据正在运行的评测队列更新当前 GPU 使用情况 current_gpu_usage = 0 current_A800gpu_usage = 0 for _, row in running_eval_queue_df.iterrows(): print(get_gpu_number(row['params']), row['params']) gpus_num, gpus_queue_id = get_gpu_number(row['params']) current_gpu_usage += gpus_num if gpus_queue_id == LC_A800_QUEUE_ID: current_A800gpu_usage += 1 # print(f'Current GPU usage: {current_gpu_usage}/{MAX_GPU_USAGE}') running_models = ", ".join([row["model"] for _, row in running_eval_queue_df.iterrows()]) if running_models != prev_running_models: print(f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} | GPU usage: {current_gpu_usage}/{MAX_GPU_USAGE} | Running models: {running_models}') prev_running_models = running_models print("current A800 GPU usage", current_A800gpu_usage) # 只查询 pending_eval_queue_df 中的前5个待处理的评测 if not pending_eval_queue_df.empty: for i,row in pending_eval_queue_df.iterrows(): #if i >= 3 : break required_gpus, gpus_queue_id = get_gpu_number(row['params']) if gpus_queue_id == LC_A800_QUEUE_ID: if current_A800gpu_usage >= MAX_A800_UASGE: print(current_A800gpu_usage >= MAX_A800_UASGE, row['model']) continue if "princeton-nlp/Llama-3-8B-ProLong-512k" in row['model']: required_gpus += 1 if current_gpu_usage + required_gpus <= MAX_GPU_USAGE: #确认是否有重复提交 if row['model'] in [row["model"] for _, row in running_eval_queue_df.iterrows()]: priniit(f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} | Evaluation {row["model"]} is already running') update_evaluation_queue(row['model'], 'CANCELLED', evaluation_info['eval_id'], evaluation_info['evaluationId']) continue # 提交评测 try: evaluation_info = submit_evaluation(base_url, secret, row['model'], require_gpus=required_gpus,priority='high',gpus_queue_id=gpus_queue_id) update_evaluation_queue(row['model'], 'RUNNING', evaluation_info['eval_id'], evaluation_info['evaluationId']) update_gpu_usage(required_gpus) print(f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} | Submitted evaluation {row["model"]} with {required_gpus} GPUs, submit info: {evaluation_info}') except Exception as e: print(e) continue # 查询正在运行的评测状态 for _, row in running_eval_queue_df.iterrows(): progress = poll_evaluation_progress(base_url, secret, row['eval_id']) if progress['status'] in ['S', 'F', 'C'] or progress['status'] == 'DI': new_status = status_mapping.get(progress['status'], 'FINISHED') update_evaluation_queue(row['model'], new_status) gpus_num, gpus_queue_id = get_gpu_number(row['params']) update_gpu_usage(-gpus_num) if gpus_queue_id == LC_A800_QUEUE_ID: current_A800gpu_usage -= 1 print(f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} | Evaluation {row["model"]} finished with status {progress["status"]}') if new_status == 'FAILED': print("failed_mapping0", failed_mapping) if row['model'] in failed_mapping: failed_mapping[row['model']] += 1 else: failed_mapping[row['model']] = 1 print("failed_mapping add", failed_mapping, row['failed_status']) if failed_mapping[row['model']] == 5: del failed_mapping[row['model']] update_evaluation_queue(row['model'], 'PENDING_'+str(int(row['failed_status']+1))) else: update_evaluation_queue(row['model'], 'PENDING') print(f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} |--------------- RePending {row["model"]} ------------ ') elif new_status == 'FINISHED': print(progress) save_and_upload_results(row['model'], progress['details']) time.sleep(300) # 调整队列检查间隔