mehran
update
fe79a14
# leaderboard/refresh.py
import json
import logging
from pathlib import Path
from typing import Any, Dict, List, Optional
import pandas as pd
import yaml
# --- Logging Setup ---
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(module)s - %(message)s"
)
logger = logging.getLogger(__name__)
# --- Path Definitions ---
SCRIPT_DIR = Path(__file__).resolve().parent
PROJECT_ROOT = SCRIPT_DIR.parent
# --- Default Input/Output Paths ---
DEFAULT_MODELS_FOLDER = PROJECT_ROOT.parent / "llm-leaderboard/models_info"
DEFAULT_RESULTS_FOLDER = PROJECT_ROOT.parent / "llm-leaderboard/results"
OUTPUT_FOLDER = SCRIPT_DIR / "boards_data"
CONFIG_FILE_PATH = SCRIPT_DIR / "leaderboard_config.yaml"
TEMPLATE_FOLDER = SCRIPT_DIR / "template_jsons"
# --- Constants for Subtask Processing ---
NLU_NLG_TASK_KEYS = ["persian_nlu", "persian_nlg"]
ALL_LEADERBOARD_COLUMNS = [
'Model Name', 'model_url', 'parameters_count', 'source_type', 'Average',
'Persian IFEval', 'Persian MT-Bench', "PerMMLU",
"PerCoR", "Persian NLU", "Persian NLG"
]
def load_tasks_from_config(config_path: Path) -> Dict[str, str]:
if not config_path.exists():
logger.error(f"Configuration file not found: {config_path}. Cannot load tasks.")
return {}
try:
with open(config_path, 'r', encoding='utf-8') as f:
config_data = yaml.safe_load(f)
tasks_from_config = config_data.get('task_display_names', {})
if not isinstance(tasks_from_config, dict):
logger.error(f"'task_display_names' in {config_path} is not a dictionary.")
return {}
processed_tasks = {k: v for k, v in tasks_from_config.items() if str(k).lower() != 'all'}
if not processed_tasks:
logger.warning(f"No tasks in {config_path} under 'task_display_names' (excluding 'all').")
return processed_tasks
except Exception as e:
logger.error(f"Error loading config {config_path}: {e}")
return {}
class ModelEvaluationProcessor:
def __init__(
self,
models_info_path: Path,
results_base_path: Path,
output_path: Path,
template_jsons_path: Path,
) -> None:
self.models_info_path = models_info_path
self.results_base_path = results_base_path
self.output_path = output_path
self.template_folder = template_jsons_path
self.output_path.mkdir(parents=True, exist_ok=True)
self.tasks_config = load_tasks_from_config(CONFIG_FILE_PATH)
if not self.tasks_config:
logger.error("Tasks config is empty. Processing might be affected.")
self.main_scores_map = {
"ifeval": "strict_instruction_accuracy",
"mt_bench": "score_mean",
"MMLU": "acc",
"persian_csr": "acc",
"persian_nlg": "nlg_score",
"persian_nlu": "nlu_score",
}
def _load_template(self, task_key: str) -> Dict[str, Any]:
path = self.template_folder / f"{task_key}.json"
try:
return json.loads(path.read_text(encoding="utf-8"))
except FileNotFoundError:
logger.warning(f"Template file not found for task_key {task_key} at {path}. Using empty template.")
return {}
except Exception as e:
logger.error(f"Cannot load template for task_key {task_key} from {path}: {e}")
return {}
def _deep_override(self, base: Any, override: Any) -> Any:
if isinstance(base, dict) and isinstance(override, dict):
merged = {}
for k, v_base in base.items():
if k in override and override[k] is not None and override[k] != -1:
merged[k] = self._deep_override(v_base, override[k])
else:
merged[k] = v_base
# for k, v_override in override.items():
# if k not in merged:
# merged[k] = v_override
return merged
elif override is not None and override != -1:
return override
else:
return base
def _load_model_raw_results(self, model_folder_name: str, task_key: str) -> Dict[str, Any]:
results_filename = f"{model_folder_name}___{task_key}.json"
results_file_path = self.results_base_path / results_filename
if results_file_path.exists():
try:
with open(results_file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return data if isinstance(data, dict) else {}
except json.JSONDecodeError as e:
logger.error(f"JSONDecodeError for model '{model_folder_name}', task_key '{task_key}' from {results_file_path}: {e}")
except Exception as e:
logger.error(f"Error loading results for model '{model_folder_name}', task_key '{task_key}' from {results_file_path}: {e}")
else:
logger.warning(f"Results file not found for model '{model_folder_name}', task_key '{task_key}' at {results_file_path}")
return {}
def load_and_fill_task_results(self, model_folder_name: str, task_key: str) -> Dict[str, Any]:
template = self._load_template(task_key)
raw_results = self._load_model_raw_results(model_folder_name, task_key)
return self._deep_override(template, raw_results)
def clean_previous_subtask_files(self) -> None:
logger.info("Cleaning previous NLU/NLG subtask JSONL files...")
for task_key_prefix in NLU_NLG_TASK_KEYS:
for result_file in self.results_base_path.glob(f"*___{task_key_prefix}.json"):
try:
task_data_content = result_file.read_text(encoding="utf-8")
if not task_data_content.strip():
logger.debug(f"Skipping empty result file for subtask cleaning: {result_file}")
continue
task_data = json.loads(task_data_content)
main_score_for_this_task_prefix = self.main_scores_map.get(task_key_prefix)
for subtask_name in task_data:
if subtask_name == main_score_for_this_task_prefix:
continue
if isinstance(task_data.get(subtask_name), dict):
subtask_output_path = self.output_path / f"{subtask_name}.jsonl"
if subtask_output_path.exists():
subtask_output_path.unlink()
logger.info(f"Deleted previous subtask file: {subtask_output_path}")
except json.JSONDecodeError as e:
logger.warning(f"Failed to decode JSON for subtask cleaning from {result_file}: {e}")
except Exception as e:
logger.warning(f"Failed to inspect/delete subtask files based on {result_file}: {e}")
def _process_subtask_data(self, task_results: Dict[str, Any], base_model_info: Dict[str, Any], parent_task_main_score_key: Optional[str], parent_task_key_for_log: str) -> None:
parent_task_main_score_value = task_results.get(parent_task_main_score_key) if parent_task_main_score_key else None
for subtask_name, subtask_scores_dict in task_results.items():
if subtask_name == parent_task_main_score_key:
continue
if not isinstance(subtask_scores_dict, dict):
logger.debug(f"Skipping entry '{subtask_name}' in '{parent_task_key_for_log}': not a dictionary of subtask scores.")
continue
row_data = base_model_info.copy()
row_data.update(subtask_scores_dict)
if parent_task_main_score_key:
row_data[parent_task_main_score_key] = parent_task_main_score_value
subtask_output_file = f"{subtask_name}.jsonl"
subtask_output_path = self.output_path / subtask_output_file
try:
current_entries = []
if subtask_output_path.exists():
existing_df = pd.read_json(subtask_output_path, lines=True)
if not existing_df.empty and 'Model Name' in existing_df.columns:
current_entries = existing_df[existing_df['Model Name'] != row_data['Model Name']].to_dict(orient='records')
current_entries.append(row_data)
updated_df = pd.DataFrame(current_entries)
updated_df.to_json(subtask_output_path, orient="records", lines=True, force_ascii=False)
logger.debug(f"Updated subtask file: {subtask_output_path} for model {base_model_info.get('Model Name')}, parent task {parent_task_key_for_log}")
except Exception as e:
logger.error(f"Error updating subtask file {subtask_output_path} for parent {parent_task_key_for_log}: {e}")
def process_nlu_nlg_subtasks(self, model_details: Dict[str, Any], model_folder_name: str, canonical_model_name: str) -> None:
common_subtask_model_info = {
"Model Name": canonical_model_name,
"model_url": model_details.get('model_url', model_details.get('link', model_details.get('homepage', 'https://google.com'))),
"parameters_count": str(model_details.get('n_parameters', "N/A")),
"source_type": "Closed-Source" # Default, will be refined
}
parameters_count_raw = model_details.get('n_parameters', None)
if parameters_count_raw is not None:
is_open_source_candidate = False
if isinstance(parameters_count_raw, (int, float)) and parameters_count_raw > 0:
is_open_source_candidate = True
elif isinstance(parameters_count_raw, str) and \
str(parameters_count_raw).strip().lower() not in ["", "n/a", "unknown", "private", "confidential", "tbd", "null", "closed"]:
is_open_source_candidate = True
common_subtask_model_info["source_type"] = "Open-Source" if is_open_source_candidate else "Closed-Source"
for task_key_for_subtasks in NLU_NLG_TASK_KEYS:
if task_key_for_subtasks not in self.tasks_config:
logger.debug(f"Subtask processing for '{task_key_for_subtasks}' skipped: not in tasks_config.")
continue
logger.info(f"Processing subtasks for '{task_key_for_subtasks}' for model '{canonical_model_name}'...")
parent_task_full_results = self.load_and_fill_task_results(model_folder_name, task_key_for_subtasks)
main_score_key_for_parent_task = self.main_scores_map.get(task_key_for_subtasks)
if not main_score_key_for_parent_task:
logger.warning(f"No main score key in main_scores_map for parent task '{task_key_for_subtasks}'.")
self._process_subtask_data(
parent_task_full_results,
common_subtask_model_info,
main_score_key_for_parent_task,
task_key_for_subtasks
)
def process_models(self) -> Dict[str, pd.DataFrame]:
processed_task_data: Dict[str, List[Dict[str, Any]]] = {task_key: [] for task_key in self.tasks_config.keys()}
all_models_summary_data: List[Dict[str, Any]] = []
if not self.models_info_path.exists() or not self.models_info_path.is_dir():
logger.critical(f"Configured MODELS_FOLDER path does not exist or is not a directory: {self.models_info_path}")
empty_dfs = {key: pd.DataFrame() for key in self.tasks_config.keys()}
empty_dfs["all"] = pd.DataFrame()
return empty_dfs
model_info_files = list(self.models_info_path.glob("*.json"))
if not model_info_files:
logger.warning(f"No model info files (*.json) found in {self.models_info_path}. No models will be processed.")
empty_dfs = {key: pd.DataFrame() for key in self.tasks_config.keys()}
empty_dfs["all"] = pd.DataFrame()
return empty_dfs
for model_info_file in model_info_files:
model_folder_name = model_info_file.stem
try:
with open(model_info_file, 'r', encoding='utf-8') as f:
model_details = json.load(f)
canonical_model_name = model_details.get('name_for_leaderboard',
model_details.get('model_hf_id',
model_details.get('name', model_folder_name)))
model_url = model_details.get('model_url', model_details.get('link', model_details.get('homepage', 'https_google.com')))
if not model_url: model_url = 'https_google.com'
parameters_count_raw = model_details.get('n_parameters', None)
parameters_count_display = str(parameters_count_raw) if parameters_count_raw is not None else "N/A"
source_type = "Closed-Source"
if parameters_count_raw is not None:
is_open_source_candidate = False
if isinstance(parameters_count_raw, (int, float)) and parameters_count_raw > 0:
is_open_source_candidate = True
elif isinstance(parameters_count_raw, str) and \
str(parameters_count_raw).strip().lower() not in ["", "n/a", "unknown", "private", "confidential", "tbd", "null", "closed"]:
is_open_source_candidate = True
source_type = "Open-Source" if is_open_source_candidate else "Closed-Source"
except Exception as e:
logger.error(f"Error loading/parsing model info from {model_info_file}: {e}. Skipping '{model_folder_name}'.")
continue
logger.info(f"Processing model: {canonical_model_name} (source ID: {model_folder_name})")
current_model_scores_for_summary: Dict[str, Any] = {
"Model Name": canonical_model_name,
"model_url": model_url,
"parameters_count": parameters_count_display,
"source_type": source_type
}
for task_key, task_display_name in self.tasks_config.items():
task_specific_results = self.load_and_fill_task_results(model_folder_name, task_key)
main_score_metric_name = self.main_scores_map.get(task_key)
task_data_entry_for_specific_jsonl: Dict[str, Any] = {
"Model Name": canonical_model_name,
"model_url": model_url,
"parameters_count": parameters_count_display,
"source_type": source_type
}
if isinstance(task_specific_results, dict) and task_specific_results:
for metric, value in task_specific_results.items():
task_data_entry_for_specific_jsonl[metric] = value
if main_score_metric_name and main_score_metric_name in task_specific_results:
score_value = task_specific_results[main_score_metric_name]
if task_key == "mt_bench" and score_value is not None:
try:
score_value = float(score_value) / 10.0
except (ValueError, TypeError):
logger.warning(f"Could not convert mt_bench score '{score_value}' to float for division for model {canonical_model_name}")
score_value = pd.NA
current_model_scores_for_summary[task_display_name] = score_value
elif main_score_metric_name:
logger.warning(f"Main score metric '{main_score_metric_name}' for task '{task_key}' (Display: {task_display_name}) not found for model '{canonical_model_name}'. Will be NA.")
current_model_scores_for_summary[task_display_name] = pd.NA
task_data_entry_for_specific_jsonl[main_score_metric_name] = pd.NA
else:
logger.warning(f"No valid results data for model '{canonical_model_name}', task_key '{task_key}'. Scores will be NA.")
if main_score_metric_name:
task_data_entry_for_specific_jsonl[main_score_metric_name] = pd.NA
current_model_scores_for_summary[task_display_name] = pd.NA
processed_task_data[task_key].append(task_data_entry_for_specific_jsonl)
all_models_summary_data.append(current_model_scores_for_summary)
self.process_nlu_nlg_subtasks(model_details, model_folder_name, canonical_model_name)
final_dataframes: Dict[str, pd.DataFrame] = {}
for task_key, data_list in processed_task_data.items():
df = pd.DataFrame(data_list) if data_list else pd.DataFrame()
main_score_col = self.main_scores_map.get(task_key)
if not df.empty and main_score_col and main_score_col in df.columns:
try:
df[main_score_col] = pd.to_numeric(df[main_score_col], errors='coerce')
# Sort by main score (NaNs will go last or first depending on na_position, default is last)
df = df.sort_values(by=main_score_col, ascending=False, na_position='last')
except Exception as e:
logger.warning(f"Could not sort dataframe for task {task_key} by score {main_score_col}: {e}")
final_dataframes[task_key] = df
if df.empty:
logger.warning(f"No data processed for task '{task_key}'. Resulting DataFrame is empty.")
if all_models_summary_data:
all_df = pd.DataFrame(all_models_summary_data)
score_cols_for_average = []
for _, task_display_name_for_avg in self.tasks_config.items():
if task_display_name_for_avg in all_df.columns:
numeric_col = pd.to_numeric(all_df[task_display_name_for_avg], errors='coerce')
if numeric_col.notna().any(): # Check if there is at least one non-NA numeric value
all_df[task_display_name_for_avg] = numeric_col
score_cols_for_average.append(task_display_name_for_avg)
else: # All values are NA or non-numeric
all_df[task_display_name_for_avg] = pd.NA # Ensure column is NA if not usable
logger.warning(f"Column '{task_display_name_for_avg}' for averaging in 'all' table is not numeric or all NaN. Excluding from average calculation and setting to NA.")
if score_cols_for_average:
try:
# Calculate mean; it will be NaN if any constituent score for a row is NaN.
all_df["Average"] = all_df[score_cols_for_average].mean(axis=1, skipna=False)
# Round only non-NaN averages
all_df.loc[all_df["Average"].notna(), "Average"] = all_df.loc[all_df["Average"].notna(), "Average"].round(4)
except Exception as e:
logger.error(f"Error calculating 'Average' for 'all' table: {e}. Average column might be NA or incorrect.")
all_df["Average"] = pd.NA # Fallback to NA
else:
logger.warning("No valid numeric score columns found to calculate 'Average' for 'all' table.")
all_df["Average"] = pd.NA # Assign pd.NA if no columns to average
# Sort 'all' table by Average (NaNs will be placed last by default with ascending=False)
if "Average" in all_df.columns: # Check if 'Average' column exists
# NaNs are typically sorted to the end by default when ascending=False or na_position='last'
all_df = all_df.sort_values(by="Average", ascending=False, na_position='last')
existing_cols_in_order = [col for col in ALL_LEADERBOARD_COLUMNS if col in all_df.columns]
other_cols = [col for col in all_df.columns if col not in existing_cols_in_order]
all_df = all_df[existing_cols_in_order + other_cols]
final_dataframes["all"] = all_df
else:
final_dataframes["all"] = pd.DataFrame()
logger.warning("No summary data collected for the 'all' table.")
return final_dataframes
def save_dataframe_as_jsonl(self, df: pd.DataFrame, filename_base: str) -> None:
if df is None or df.empty:
logger.warning(f"DataFrame for '{filename_base}.jsonl' is empty or None. Skipping save.")
return
output_file_path = self.output_path / f"{filename_base}.jsonl"
try:
df.to_json(output_file_path, orient="records", lines=True, force_ascii=False, index=False)
logger.info(f"Saved data to {output_file_path}")
except Exception as e:
logger.error(f"Failed to save DataFrame to {output_file_path}: {e}")
def run(self) -> None:
logger.info("Starting data processing pipeline in ModelEvaluationProcessor...")
self.clean_previous_subtask_files()
processed_dataframes = self.process_models()
for task_key_or_name, df in processed_dataframes.items():
self.save_dataframe_as_jsonl(df, task_key_or_name)
logger.info("Data processing pipeline completed successfully!")
def main() -> None:
models_folder_to_use = DEFAULT_MODELS_FOLDER
results_folder_to_use = DEFAULT_RESULTS_FOLDER
template_folder_to_use = TEMPLATE_FOLDER
logger.info(f"Refresh script running from: {SCRIPT_DIR}")
logger.info(f"CONFIGURED Input 'models_info' Path: {models_folder_to_use}")
logger.info(f"CONFIGURED Input 'results' Path: {results_folder_to_use}")
logger.info(f"CONFIGURED Input 'template_jsons' Path: {template_folder_to_use}")
logger.info(f"Outputting processed data to (inside 'leaderboard' dir): {OUTPUT_FOLDER}")
logger.info(f"Using configuration file (inside 'leaderboard' dir): {CONFIG_FILE_PATH}")
if not CONFIG_FILE_PATH.exists():
logger.critical(f"CRITICAL: Config file not found at {CONFIG_FILE_PATH}. Ensure '{CONFIG_FILE_PATH.name}' exists in '{SCRIPT_DIR}'.")
return
if not models_folder_to_use.exists() or not models_folder_to_use.is_dir():
logger.critical(f"CRITICAL: Input 'models_info' directory not found at {models_folder_to_use} or is not a directory.")
return
if not results_folder_to_use.exists() or not results_folder_to_use.is_dir():
logger.critical(f"CRITICAL: Input 'results' directory not found at {results_folder_to_use} or is not a directory.")
return
if not template_folder_to_use.exists() or not template_folder_to_use.is_dir():
logger.warning(f"WARNING: 'template_jsons' directory not found at {template_folder_to_use}. Template filling might not work as expected.")
try:
processor = ModelEvaluationProcessor(
models_info_path=models_folder_to_use,
results_base_path=results_folder_to_use,
output_path=OUTPUT_FOLDER,
template_jsons_path=template_folder_to_use,
)
processor.run()
except Exception as e:
logger.error(f"Unhandled exception in main: {e}", exc_info=True)
if __name__ == "__main__":
main()