|
""" |
|
Process and transform GuardBench leaderboard data. |
|
""" |
|
|
|
import json |
|
import os |
|
import pandas as pd |
|
from datetime import datetime |
|
from typing import Dict, List, Any, Tuple |
|
import numpy as np |
|
|
|
from src.display.utils import CATEGORIES, TEST_TYPES, METRICS |
|
|
|
|
|
MAX_PUNISHABLE_RUNTIME_MS = 6000.0 |
|
MIN_PUNISHABLE_RUNTIME_MS = 200.0 |
|
MAX_RUNTIME_PENALTY = 0.75 |
|
|
|
def calculate_integral_score(row: pd.Series) -> float: |
|
""" |
|
Calculate the integral score for a given model entry row. |
|
Uses accuracy as the primary metric, micro error ratio, and micro runtime penalty. |
|
Falls back to macro accuracy and averaged per-test-type errors/runtimes if micro values are missing. |
|
""" |
|
integral_score = 1.0 |
|
metric_count = 0 |
|
|
|
|
|
for test_type in TEST_TYPES: |
|
metric_col = f"{test_type}_accuracy" |
|
if metric_col in row and pd.notna(row[metric_col]): |
|
|
|
integral_score *= row[metric_col] |
|
metric_count += 1 |
|
|
|
|
|
|
|
|
|
|
|
if metric_count == 0: |
|
return 0.0 |
|
|
|
|
|
micro_error_col = "micro_avg_error_ratio" |
|
if micro_error_col in row and pd.notna(row[micro_error_col]): |
|
|
|
micro_error_ratio = row[micro_error_col] / 100.0 |
|
integral_score *= (1.0 - micro_error_ratio) |
|
|
|
|
|
avg_runtime_ms = None |
|
micro_runtime_col = "micro_avg_runtime_ms" |
|
if micro_runtime_col in row and pd.notna(row[micro_runtime_col]): |
|
avg_runtime_ms = row[micro_runtime_col] |
|
|
|
if avg_runtime_ms is not None: |
|
|
|
runtime = max( |
|
min(avg_runtime_ms, MAX_PUNISHABLE_RUNTIME_MS), |
|
MIN_PUNISHABLE_RUNTIME_MS, |
|
) |
|
|
|
if MAX_PUNISHABLE_RUNTIME_MS > MIN_PUNISHABLE_RUNTIME_MS: |
|
normalized_time = (runtime - MIN_PUNISHABLE_RUNTIME_MS) / ( |
|
MAX_PUNISHABLE_RUNTIME_MS - MIN_PUNISHABLE_RUNTIME_MS |
|
) |
|
|
|
time_factor = 1.0 - (1.0 - MAX_RUNTIME_PENALTY) * normalized_time |
|
else: |
|
|
|
time_factor = 1.0 if runtime <= MIN_PUNISHABLE_RUNTIME_MS else (1.0 - MAX_RUNTIME_PENALTY) |
|
|
|
|
|
time_factor = max(MAX_RUNTIME_PENALTY, time_factor) |
|
integral_score *= time_factor |
|
|
|
|
|
return integral_score |
|
|
|
|
|
def load_leaderboard_data(file_path: str) -> Dict: |
|
""" |
|
Load the leaderboard data from a JSON file. |
|
""" |
|
if not os.path.exists(file_path): |
|
version = "v0" |
|
if "_v" in file_path: |
|
version = file_path.split("_")[-1].split(".")[0] |
|
return {"entries": [], "last_updated": datetime.now().isoformat(), "version": version} |
|
|
|
with open(file_path, 'r') as f: |
|
data = json.load(f) |
|
|
|
|
|
if "version" not in data: |
|
version = "v0" |
|
if "_v" in file_path: |
|
version = file_path.split("_")[-1].split(".")[0] |
|
data["version"] = version |
|
|
|
return data |
|
|
|
|
|
def save_leaderboard_data(data: Dict, file_path: str) -> None: |
|
""" |
|
Save the leaderboard data to a JSON file. |
|
""" |
|
|
|
os.makedirs(os.path.dirname(file_path), exist_ok=True) |
|
|
|
|
|
data["last_updated"] = datetime.now().isoformat() |
|
|
|
|
|
if "version" not in data: |
|
version = "v0" |
|
if "_v" in file_path: |
|
version = file_path.split("_")[-1].split(".")[0] |
|
data["version"] = version |
|
|
|
with open(file_path, 'w') as f: |
|
json.dump(data, f, indent=2) |
|
|
|
|
|
def process_submission(submission_data: List[Dict]) -> List[Dict]: |
|
""" |
|
Process submission data and convert it to leaderboard entries. |
|
""" |
|
entries = [] |
|
|
|
for item in submission_data: |
|
|
|
entry = { |
|
"model_name": item.get("model_name", "Unknown Model"), |
|
"per_category_metrics": {}, |
|
"avg_metrics": {}, |
|
"submission_date": datetime.now().isoformat(), |
|
"version": item.get("version", "v0") |
|
} |
|
|
|
|
|
for key in ["model_type", "base_model", "revision", "precision", "weight_type"]: |
|
if key in item: |
|
entry[key] = item[key] |
|
|
|
|
|
if "per_category_metrics" in item: |
|
entry["per_category_metrics"] = item["per_category_metrics"] |
|
|
|
|
|
if "avg_metrics" in item: |
|
entry["avg_metrics"] = item["avg_metrics"] |
|
|
|
entries.append(entry) |
|
|
|
return entries |
|
|
|
|
|
def leaderboard_to_dataframe(leaderboard_data: Dict) -> pd.DataFrame: |
|
""" |
|
Convert leaderboard data to a pandas DataFrame for display. |
|
""" |
|
rows = [] |
|
|
|
for entry in leaderboard_data.get("entries", []): |
|
model_name = entry.get("model_name", "Unknown Model") |
|
|
|
|
|
row = { |
|
"model_name": model_name, |
|
"model_type": entry.get("model_type", "Unknown"), |
|
"mode": entry.get("mode", "Strict"), |
|
"submission_date": entry.get("submission_date", ""), |
|
"version": entry.get("version", "v0"), |
|
"guard_model_type": entry.get("guard_model_type", "llm_regexp").lower() |
|
} |
|
|
|
|
|
for key in ["base_model", "revision", "precision", "weight_type"]: |
|
if key in entry: |
|
row[key] = entry[key] |
|
|
|
|
|
for key, value in entry.items(): |
|
if any(test_type in key for test_type in TEST_TYPES) or \ |
|
key in ["average_f1", "average_recall", "average_precision", |
|
"macro_accuracy", "macro_recall", "total_evals_count"]: |
|
row[key] = value |
|
|
|
|
|
avg_metrics = entry.get("avg_metrics", {}) |
|
if avg_metrics: |
|
for test_type in TEST_TYPES: |
|
if test_type in avg_metrics: |
|
metrics = avg_metrics[test_type] |
|
for metric in METRICS: |
|
if metric in metrics: |
|
col_name = f"{test_type}_{metric}" |
|
row[col_name] = metrics[metric] |
|
|
|
|
|
if metric == "f1_binary": |
|
row[f"{test_type}_f1"] = metrics[metric] |
|
|
|
|
|
|
|
if "macro_accuracy" not in row: |
|
accuracy_values = [] |
|
for test_type in TEST_TYPES: |
|
|
|
accuracy_val = None |
|
if test_type in avg_metrics and "accuracy" in avg_metrics[test_type] and pd.notna(avg_metrics[test_type]["accuracy"]): |
|
accuracy_val = avg_metrics[test_type]["accuracy"] |
|
|
|
elif f"{test_type}_accuracy" in row and pd.notna(row[f"{test_type}_accuracy"]): |
|
accuracy_val = row[f"{test_type}_accuracy"] |
|
|
|
if accuracy_val is not None: |
|
accuracy_values.append(accuracy_val) |
|
|
|
if accuracy_values: |
|
row["macro_accuracy"] = sum(accuracy_values) / len(accuracy_values) |
|
|
|
|
|
if "macro_recall" not in row: |
|
recall_values = [] |
|
for test_type in TEST_TYPES: |
|
if test_type in avg_metrics and "recall_binary" in avg_metrics[test_type] and pd.notna(avg_metrics[test_type]["recall_binary"]): |
|
recall_values.append(avg_metrics[test_type]["recall_binary"]) |
|
if recall_values: |
|
row["macro_recall"] = sum(recall_values) / len(recall_values) |
|
|
|
if "total_evals_count" not in row: |
|
total_samples = 0 |
|
found_samples = False |
|
for test_type in TEST_TYPES: |
|
if test_type in avg_metrics and "sample_count" in avg_metrics[test_type] and pd.notna(avg_metrics[test_type]["sample_count"]): |
|
total_samples += avg_metrics[test_type]["sample_count"] |
|
found_samples = True |
|
if found_samples: |
|
row["total_evals_count"] = total_samples |
|
|
|
|
|
row["micro_avg_error_ratio"] = entry.get("micro_avg_error_ratio", pd.NA) |
|
row["micro_avg_runtime_ms"] = entry.get("micro_avg_runtime_ms", pd.NA) |
|
|
|
|
|
if pd.notna(row["micro_avg_error_ratio"]): |
|
row["micro_avg_error_ratio"] *= 100 |
|
|
|
rows.append(row) |
|
|
|
|
|
df = pd.DataFrame(rows) |
|
|
|
|
|
for test_type in TEST_TYPES: |
|
for metric in METRICS: |
|
col_name = f"{test_type}_{metric}" |
|
if col_name not in df.columns: |
|
df[col_name] = pd.NA |
|
|
|
|
|
if metric == "f1_binary" and f"{test_type}_f1" not in df.columns: |
|
|
|
if col_name in df.columns: |
|
df[f"{test_type}_f1"] = df[col_name] |
|
else: |
|
df[f"{test_type}_f1"] = pd.NA |
|
|
|
|
|
if not df.empty: |
|
df["integral_score"] = df.apply(calculate_integral_score, axis=1) |
|
|
|
df = df.sort_values(by="integral_score", ascending=False, na_position='last') |
|
else: |
|
|
|
df["integral_score"] = pd.NA |
|
|
|
|
|
summary_cols = ["macro_accuracy", "macro_recall", "micro_avg_error_ratio", "micro_avg_runtime_ms", "total_evals_count"] |
|
for col in summary_cols: |
|
if col not in df.columns: |
|
df[col] = pd.NA |
|
|
|
|
|
old_avg_cols = ["average_f1", "average_recall", "average_precision"] |
|
for col in old_avg_cols: |
|
if col in df.columns: |
|
df = df.drop(columns=[col]) |
|
|
|
|
|
|
|
return df |
|
|
|
|
|
def add_entries_to_leaderboard(leaderboard_data: Dict, new_entries: List[Dict]) -> Dict: |
|
""" |
|
Add new entries to the leaderboard, replacing any with the same model name. |
|
""" |
|
|
|
existing_entries = { |
|
(entry["model_name"], entry.get("version", "v0")): i |
|
for i, entry in enumerate(leaderboard_data.get("entries", [])) |
|
} |
|
|
|
|
|
for new_entry in new_entries: |
|
model_name = new_entry.get("model_name") |
|
version = new_entry.get("version", "v0") |
|
|
|
if (model_name, version) in existing_entries: |
|
|
|
leaderboard_data["entries"][existing_entries[(model_name, version)]] = new_entry |
|
else: |
|
|
|
if "entries" not in leaderboard_data: |
|
leaderboard_data["entries"] = [] |
|
leaderboard_data["entries"].append(new_entry) |
|
|
|
|
|
leaderboard_data["last_updated"] = datetime.now().isoformat() |
|
|
|
return leaderboard_data |
|
|
|
|
|
def process_jsonl_submission(file_path: str) -> Tuple[List[Dict], str]: |
|
""" |
|
Process a JSONL submission file and extract entries. |
|
""" |
|
entries = [] |
|
try: |
|
with open(file_path, 'r') as f: |
|
for line in f: |
|
try: |
|
entry = json.loads(line) |
|
entries.append(entry) |
|
except json.JSONDecodeError as e: |
|
return [], f"Invalid JSON in submission file: {e}" |
|
|
|
if not entries: |
|
return [], "Submission file is empty" |
|
|
|
return entries, "Successfully processed submission" |
|
except Exception as e: |
|
return [], f"Error processing submission file: {e}" |
|
|