""" Process and transform GuardBench leaderboard data. """ import json import os import pandas as pd from datetime import datetime from typing import Dict, List, Any, Tuple import numpy as np from src.display.utils import CATEGORIES, TEST_TYPES, METRICS # Constants for Integral Score calculation (mirrors guardbench library) MAX_PUNISHABLE_RUNTIME_MS = 6000.0 MIN_PUNISHABLE_RUNTIME_MS = 200.0 MAX_RUNTIME_PENALTY = 0.75 # Corresponds to 1.0 - MIN_TIME_FACTOR, library used 0.75 def calculate_integral_score(row: pd.Series) -> float: """ Calculate the integral score for a given model entry row. Uses accuracy as the primary metric, micro error ratio, and micro runtime penalty. Falls back to macro accuracy and averaged per-test-type errors/runtimes if micro values are missing. """ integral_score = 1.0 metric_count = 0 # Primary metric (using accuracy) for test_type in TEST_TYPES: metric_col = f"{test_type}_accuracy" if metric_col in row and pd.notna(row[metric_col]): # print(f"Found accuracy metric for {test_type}: {row[metric_col]}") integral_score *= row[metric_col] metric_count += 1 # print(f"Metric count: {metric_count}") # If no accuracy metrics were found at all, the score remains 1.0 before penalties. # The library returns 0.0 in this case (`return integral_score if count > 0 else 0.0`) # Let's add that check back before applying penalties. if metric_count == 0: return 0.0 # Error Penalty micro_error_col = "micro_avg_error_ratio" if micro_error_col in row and pd.notna(row[micro_error_col]): # Micro error is stored as %, convert back to ratio micro_error_ratio = row[micro_error_col] / 100.0 integral_score *= (1.0 - micro_error_ratio) # Runtime Penalty avg_runtime_ms = None # Initialize micro_runtime_col = "micro_avg_runtime_ms" if micro_runtime_col in row and pd.notna(row[micro_runtime_col]): avg_runtime_ms = row[micro_runtime_col] if avg_runtime_ms is not None: # Apply penalty based on runtime (only if micro avg runtime was found) runtime = max( min(avg_runtime_ms, MAX_PUNISHABLE_RUNTIME_MS), MIN_PUNISHABLE_RUNTIME_MS, ) if MAX_PUNISHABLE_RUNTIME_MS > MIN_PUNISHABLE_RUNTIME_MS: normalized_time = (runtime - MIN_PUNISHABLE_RUNTIME_MS) / ( MAX_PUNISHABLE_RUNTIME_MS - MIN_PUNISHABLE_RUNTIME_MS ) # Match reference library formula 1 time_factor = 1.0 - (1.0 - MAX_RUNTIME_PENALTY) * normalized_time else: # Match reference library formula (though less critical when max==min) time_factor = 1.0 if runtime <= MIN_PUNISHABLE_RUNTIME_MS else (1.0 - MAX_RUNTIME_PENALTY) # Match reference library formula 2 (enforce minimum factor) time_factor = max(MAX_RUNTIME_PENALTY, time_factor) integral_score *= time_factor # Rooting is not done in the reference library's summary table calculation return integral_score def load_leaderboard_data(file_path: str) -> Dict: """ Load the leaderboard data from a JSON file. """ if not os.path.exists(file_path): version = "v0" if "_v" in file_path: version = file_path.split("_")[-1].split(".")[0] return {"entries": [], "last_updated": datetime.now().isoformat(), "version": version} with open(file_path, 'r') as f: data = json.load(f) # Ensure version field exists if "version" not in data: version = "v0" if "_v" in file_path: version = file_path.split("_")[-1].split(".")[0] data["version"] = version return data def save_leaderboard_data(data: Dict, file_path: str) -> None: """ Save the leaderboard data to a JSON file. """ # Ensure the directory exists os.makedirs(os.path.dirname(file_path), exist_ok=True) # Update the last_updated timestamp data["last_updated"] = datetime.now().isoformat() # Ensure version is set if "version" not in data: version = "v0" if "_v" in file_path: version = file_path.split("_")[-1].split(".")[0] data["version"] = version with open(file_path, 'w') as f: json.dump(data, f, indent=2) def process_submission(submission_data: List[Dict]) -> List[Dict]: """ Process submission data and convert it to leaderboard entries. """ entries = [] for item in submission_data: # Create a new entry for the leaderboard entry = { "model_name": item.get("model_name", "Unknown Model"), "per_category_metrics": {}, "avg_metrics": {}, "submission_date": datetime.now().isoformat(), "version": item.get("version", "v0") } # Copy model metadata for key in ["model_type", "base_model", "revision", "precision", "weight_type"]: if key in item: entry[key] = item[key] # Process per-category metrics if "per_category_metrics" in item: entry["per_category_metrics"] = item["per_category_metrics"] # Process average metrics if "avg_metrics" in item: entry["avg_metrics"] = item["avg_metrics"] entries.append(entry) return entries def leaderboard_to_dataframe(leaderboard_data: Dict) -> pd.DataFrame: """ Convert leaderboard data to a pandas DataFrame for display. """ rows = [] for entry in leaderboard_data.get("entries", []): model_name = entry.get("model_name", "Unknown Model") # Extract average metrics for main display row = { "model_name": model_name, "model_type": entry.get("model_type", "Unknown"), "mode": entry.get("mode", "Strict"), "submission_date": entry.get("submission_date", ""), "version": entry.get("version", "v0"), "guard_model_type": entry.get("guard_model_type", "llm_regexp").lower() } # Add additional metadata fields if present for key in ["base_model", "revision", "precision", "weight_type"]: if key in entry: row[key] = entry[key] # CASE 1: Metrics are flat in the root for key, value in entry.items(): if any(test_type in key for test_type in TEST_TYPES) or \ key in ["average_f1", "average_recall", "average_precision", "macro_accuracy", "macro_recall", "total_evals_count"]: row[key] = value # CASE 2: Metrics are in avg_metrics structure avg_metrics = entry.get("avg_metrics", {}) if avg_metrics: for test_type in TEST_TYPES: if test_type in avg_metrics: metrics = avg_metrics[test_type] for metric in METRICS: if metric in metrics: col_name = f"{test_type}_{metric}" row[col_name] = metrics[metric] # Also add non-binary version for F1 scores if metric == "f1_binary": row[f"{test_type}_f1"] = metrics[metric] # Calculate averages if not present # Use accuracy for macro_accuracy if "macro_accuracy" not in row: accuracy_values = [] for test_type in TEST_TYPES: # Check avg_metrics structure first accuracy_val = None if test_type in avg_metrics and "accuracy" in avg_metrics[test_type] and pd.notna(avg_metrics[test_type]["accuracy"]): accuracy_val = avg_metrics[test_type]["accuracy"] # Check flat structure as fallback (might be redundant but safer) elif f"{test_type}_accuracy" in row and pd.notna(row[f"{test_type}_accuracy"]): accuracy_val = row[f"{test_type}_accuracy"] if accuracy_val is not None: accuracy_values.append(accuracy_val) if accuracy_values: row["macro_accuracy"] = sum(accuracy_values) / len(accuracy_values) # Use recall_binary for macro_recall if "macro_recall" not in row: recall_values = [] for test_type in TEST_TYPES: if test_type in avg_metrics and "recall_binary" in avg_metrics[test_type] and pd.notna(avg_metrics[test_type]["recall_binary"]): recall_values.append(avg_metrics[test_type]["recall_binary"]) if recall_values: row["macro_recall"] = sum(recall_values) / len(recall_values) if "total_evals_count" not in row: total_samples = 0 found_samples = False for test_type in TEST_TYPES: if test_type in avg_metrics and "sample_count" in avg_metrics[test_type] and pd.notna(avg_metrics[test_type]["sample_count"]): total_samples += avg_metrics[test_type]["sample_count"] found_samples = True if found_samples: row["total_evals_count"] = total_samples # Extract micro averages directly from entry if they exist (like in guardbench library) row["micro_avg_error_ratio"] = entry.get("micro_avg_error_ratio", pd.NA) row["micro_avg_runtime_ms"] = entry.get("micro_avg_runtime_ms", pd.NA) # Convert error ratio to percentage for consistency with display name if pd.notna(row["micro_avg_error_ratio"]): row["micro_avg_error_ratio"] *= 100 rows.append(row) # Create DataFrame and sort by average F1 score df = pd.DataFrame(rows) # Ensure all expected columns exist for test_type in TEST_TYPES: for metric in METRICS: col_name = f"{test_type}_{metric}" if col_name not in df.columns: df[col_name] = pd.NA # Use pd.NA for missing numeric data # Add non-binary F1 if binary exists and f1 is missing if metric == "f1_binary" and f"{test_type}_f1" not in df.columns: # Check if the binary column has data before copying if col_name in df.columns: df[f"{test_type}_f1"] = df[col_name] else: df[f"{test_type}_f1"] = pd.NA # Calculate Integral Score if not df.empty: df["integral_score"] = df.apply(calculate_integral_score, axis=1) # Sort by Integral Score instead of average_f1 df = df.sort_values(by="integral_score", ascending=False, na_position='last') else: # Add the column even if empty df["integral_score"] = pd.NA # Ensure summary columns exist summary_cols = ["macro_accuracy", "macro_recall", "micro_avg_error_ratio", "micro_avg_runtime_ms", "total_evals_count"] for col in summary_cols: if col not in df.columns: df[col] = pd.NA # Remove old average columns if they somehow snuck in old_avg_cols = ["average_f1", "average_recall", "average_precision"] for col in old_avg_cols: if col in df.columns: df = df.drop(columns=[col]) # print("--- DataFrame before returning from leaderboard_to_dataframe ---") # print(df[['model_name', 'macro_accuracy', 'macro_recall', 'total_evals_count']].head()) # print("-------------------------------------------------------------") return df def add_entries_to_leaderboard(leaderboard_data: Dict, new_entries: List[Dict]) -> Dict: """ Add new entries to the leaderboard, replacing any with the same model name. """ # Create a mapping of existing entries by model name and version existing_entries = { (entry["model_name"], entry.get("version", "v0")): i for i, entry in enumerate(leaderboard_data.get("entries", [])) } # Process each new entry for new_entry in new_entries: model_name = new_entry.get("model_name") version = new_entry.get("version", "v0") if (model_name, version) in existing_entries: # Replace existing entry leaderboard_data["entries"][existing_entries[(model_name, version)]] = new_entry else: # Add new entry if "entries" not in leaderboard_data: leaderboard_data["entries"] = [] leaderboard_data["entries"].append(new_entry) # Update the last_updated timestamp leaderboard_data["last_updated"] = datetime.now().isoformat() return leaderboard_data def process_jsonl_submission(file_path: str) -> Tuple[List[Dict], str]: """ Process a JSONL submission file and extract entries. """ entries = [] try: with open(file_path, 'r') as f: for line in f: try: entry = json.loads(line) entries.append(entry) except json.JSONDecodeError as e: return [], f"Invalid JSON in submission file: {e}" if not entries: return [], "Submission file is empty" return entries, "Successfully processed submission" except Exception as e: return [], f"Error processing submission file: {e}"