apsys's picture
mode
935d31d
"""
Process and transform GuardBench leaderboard data.
"""
import json
import os
import pandas as pd
from datetime import datetime
from typing import Dict, List, Any, Tuple
import numpy as np
from src.display.utils import CATEGORIES, TEST_TYPES, METRICS
# Constants for Integral Score calculation (mirrors guardbench library)
MAX_PUNISHABLE_RUNTIME_MS = 6000.0
MIN_PUNISHABLE_RUNTIME_MS = 200.0
MAX_RUNTIME_PENALTY = 0.75 # Corresponds to 1.0 - MIN_TIME_FACTOR, library used 0.75
def calculate_integral_score(row: pd.Series) -> float:
"""
Calculate the integral score for a given model entry row.
Uses accuracy as the primary metric, micro error ratio, and micro runtime penalty.
Falls back to macro accuracy and averaged per-test-type errors/runtimes if micro values are missing.
"""
integral_score = 1.0
metric_count = 0
# Primary metric (using accuracy)
for test_type in TEST_TYPES:
metric_col = f"{test_type}_accuracy"
if metric_col in row and pd.notna(row[metric_col]):
# print(f"Found accuracy metric for {test_type}: {row[metric_col]}")
integral_score *= row[metric_col]
metric_count += 1
# print(f"Metric count: {metric_count}")
# If no accuracy metrics were found at all, the score remains 1.0 before penalties.
# The library returns 0.0 in this case (`return integral_score if count > 0 else 0.0`)
# Let's add that check back before applying penalties.
if metric_count == 0:
return 0.0
# Error Penalty
micro_error_col = "micro_avg_error_ratio"
if micro_error_col in row and pd.notna(row[micro_error_col]):
# Micro error is stored as %, convert back to ratio
micro_error_ratio = row[micro_error_col] / 100.0
integral_score *= (1.0 - micro_error_ratio)
# Runtime Penalty
avg_runtime_ms = None # Initialize
micro_runtime_col = "micro_avg_runtime_ms"
if micro_runtime_col in row and pd.notna(row[micro_runtime_col]):
avg_runtime_ms = row[micro_runtime_col]
if avg_runtime_ms is not None:
# Apply penalty based on runtime (only if micro avg runtime was found)
runtime = max(
min(avg_runtime_ms, MAX_PUNISHABLE_RUNTIME_MS),
MIN_PUNISHABLE_RUNTIME_MS,
)
if MAX_PUNISHABLE_RUNTIME_MS > MIN_PUNISHABLE_RUNTIME_MS:
normalized_time = (runtime - MIN_PUNISHABLE_RUNTIME_MS) / (
MAX_PUNISHABLE_RUNTIME_MS - MIN_PUNISHABLE_RUNTIME_MS
)
# Match reference library formula 1
time_factor = 1.0 - (1.0 - MAX_RUNTIME_PENALTY) * normalized_time
else:
# Match reference library formula (though less critical when max==min)
time_factor = 1.0 if runtime <= MIN_PUNISHABLE_RUNTIME_MS else (1.0 - MAX_RUNTIME_PENALTY)
# Match reference library formula 2 (enforce minimum factor)
time_factor = max(MAX_RUNTIME_PENALTY, time_factor)
integral_score *= time_factor
# Rooting is not done in the reference library's summary table calculation
return integral_score
def load_leaderboard_data(file_path: str) -> Dict:
"""
Load the leaderboard data from a JSON file.
"""
if not os.path.exists(file_path):
version = "v0"
if "_v" in file_path:
version = file_path.split("_")[-1].split(".")[0]
return {"entries": [], "last_updated": datetime.now().isoformat(), "version": version}
with open(file_path, 'r') as f:
data = json.load(f)
# Ensure version field exists
if "version" not in data:
version = "v0"
if "_v" in file_path:
version = file_path.split("_")[-1].split(".")[0]
data["version"] = version
return data
def save_leaderboard_data(data: Dict, file_path: str) -> None:
"""
Save the leaderboard data to a JSON file.
"""
# Ensure the directory exists
os.makedirs(os.path.dirname(file_path), exist_ok=True)
# Update the last_updated timestamp
data["last_updated"] = datetime.now().isoformat()
# Ensure version is set
if "version" not in data:
version = "v0"
if "_v" in file_path:
version = file_path.split("_")[-1].split(".")[0]
data["version"] = version
with open(file_path, 'w') as f:
json.dump(data, f, indent=2)
def process_submission(submission_data: List[Dict]) -> List[Dict]:
"""
Process submission data and convert it to leaderboard entries.
"""
entries = []
for item in submission_data:
# Create a new entry for the leaderboard
entry = {
"model_name": item.get("model_name", "Unknown Model"),
"per_category_metrics": {},
"avg_metrics": {},
"submission_date": datetime.now().isoformat(),
"version": item.get("version", "v0")
}
# Copy model metadata
for key in ["model_type", "base_model", "revision", "precision", "weight_type"]:
if key in item:
entry[key] = item[key]
# Process per-category metrics
if "per_category_metrics" in item:
entry["per_category_metrics"] = item["per_category_metrics"]
# Process average metrics
if "avg_metrics" in item:
entry["avg_metrics"] = item["avg_metrics"]
entries.append(entry)
return entries
def leaderboard_to_dataframe(leaderboard_data: Dict) -> pd.DataFrame:
"""
Convert leaderboard data to a pandas DataFrame for display.
"""
rows = []
for entry in leaderboard_data.get("entries", []):
model_name = entry.get("model_name", "Unknown Model")
# Extract average metrics for main display
row = {
"model_name": model_name,
"model_type": entry.get("model_type", "Unknown"),
"mode": entry.get("mode", "Strict"),
"submission_date": entry.get("submission_date", ""),
"version": entry.get("version", "v0"),
"guard_model_type": entry.get("guard_model_type", "llm_regexp").lower()
}
# Add additional metadata fields if present
for key in ["base_model", "revision", "precision", "weight_type"]:
if key in entry:
row[key] = entry[key]
# CASE 1: Metrics are flat in the root
for key, value in entry.items():
if any(test_type in key for test_type in TEST_TYPES) or \
key in ["average_f1", "average_recall", "average_precision",
"macro_accuracy", "macro_recall", "total_evals_count"]:
row[key] = value
# CASE 2: Metrics are in avg_metrics structure
avg_metrics = entry.get("avg_metrics", {})
if avg_metrics:
for test_type in TEST_TYPES:
if test_type in avg_metrics:
metrics = avg_metrics[test_type]
for metric in METRICS:
if metric in metrics:
col_name = f"{test_type}_{metric}"
row[col_name] = metrics[metric]
# Also add non-binary version for F1 scores
if metric == "f1_binary":
row[f"{test_type}_f1"] = metrics[metric]
# Calculate averages if not present
# Use accuracy for macro_accuracy
if "macro_accuracy" not in row:
accuracy_values = []
for test_type in TEST_TYPES:
# Check avg_metrics structure first
accuracy_val = None
if test_type in avg_metrics and "accuracy" in avg_metrics[test_type] and pd.notna(avg_metrics[test_type]["accuracy"]):
accuracy_val = avg_metrics[test_type]["accuracy"]
# Check flat structure as fallback (might be redundant but safer)
elif f"{test_type}_accuracy" in row and pd.notna(row[f"{test_type}_accuracy"]):
accuracy_val = row[f"{test_type}_accuracy"]
if accuracy_val is not None:
accuracy_values.append(accuracy_val)
if accuracy_values:
row["macro_accuracy"] = sum(accuracy_values) / len(accuracy_values)
# Use recall_binary for macro_recall
if "macro_recall" not in row:
recall_values = []
for test_type in TEST_TYPES:
if test_type in avg_metrics and "recall_binary" in avg_metrics[test_type] and pd.notna(avg_metrics[test_type]["recall_binary"]):
recall_values.append(avg_metrics[test_type]["recall_binary"])
if recall_values:
row["macro_recall"] = sum(recall_values) / len(recall_values)
if "total_evals_count" not in row:
total_samples = 0
found_samples = False
for test_type in TEST_TYPES:
if test_type in avg_metrics and "sample_count" in avg_metrics[test_type] and pd.notna(avg_metrics[test_type]["sample_count"]):
total_samples += avg_metrics[test_type]["sample_count"]
found_samples = True
if found_samples:
row["total_evals_count"] = total_samples
# Extract micro averages directly from entry if they exist (like in guardbench library)
row["micro_avg_error_ratio"] = entry.get("micro_avg_error_ratio", pd.NA)
row["micro_avg_runtime_ms"] = entry.get("micro_avg_runtime_ms", pd.NA)
# Convert error ratio to percentage for consistency with display name
if pd.notna(row["micro_avg_error_ratio"]):
row["micro_avg_error_ratio"] *= 100
rows.append(row)
# Create DataFrame and sort by average F1 score
df = pd.DataFrame(rows)
# Ensure all expected columns exist
for test_type in TEST_TYPES:
for metric in METRICS:
col_name = f"{test_type}_{metric}"
if col_name not in df.columns:
df[col_name] = pd.NA # Use pd.NA for missing numeric data
# Add non-binary F1 if binary exists and f1 is missing
if metric == "f1_binary" and f"{test_type}_f1" not in df.columns:
# Check if the binary column has data before copying
if col_name in df.columns:
df[f"{test_type}_f1"] = df[col_name]
else:
df[f"{test_type}_f1"] = pd.NA
# Calculate Integral Score
if not df.empty:
df["integral_score"] = df.apply(calculate_integral_score, axis=1)
# Sort by Integral Score instead of average_f1
df = df.sort_values(by="integral_score", ascending=False, na_position='last')
else:
# Add the column even if empty
df["integral_score"] = pd.NA
# Ensure summary columns exist
summary_cols = ["macro_accuracy", "macro_recall", "micro_avg_error_ratio", "micro_avg_runtime_ms", "total_evals_count"]
for col in summary_cols:
if col not in df.columns:
df[col] = pd.NA
# Remove old average columns if they somehow snuck in
old_avg_cols = ["average_f1", "average_recall", "average_precision"]
for col in old_avg_cols:
if col in df.columns:
df = df.drop(columns=[col])
# print("--- DataFrame before returning from leaderboard_to_dataframe ---")
# print(df[['model_name', 'macro_accuracy', 'macro_recall', 'total_evals_count']].head())
# print("-------------------------------------------------------------")
return df
def add_entries_to_leaderboard(leaderboard_data: Dict, new_entries: List[Dict]) -> Dict:
"""
Add new entries to the leaderboard, replacing any with the same model name.
"""
# Create a mapping of existing entries by model name and version
existing_entries = {
(entry["model_name"], entry.get("version", "v0")): i
for i, entry in enumerate(leaderboard_data.get("entries", []))
}
# Process each new entry
for new_entry in new_entries:
model_name = new_entry.get("model_name")
version = new_entry.get("version", "v0")
if (model_name, version) in existing_entries:
# Replace existing entry
leaderboard_data["entries"][existing_entries[(model_name, version)]] = new_entry
else:
# Add new entry
if "entries" not in leaderboard_data:
leaderboard_data["entries"] = []
leaderboard_data["entries"].append(new_entry)
# Update the last_updated timestamp
leaderboard_data["last_updated"] = datetime.now().isoformat()
return leaderboard_data
def process_jsonl_submission(file_path: str) -> Tuple[List[Dict], str]:
"""
Process a JSONL submission file and extract entries.
"""
entries = []
try:
with open(file_path, 'r') as f:
for line in f:
try:
entry = json.loads(line)
entries.append(entry)
except json.JSONDecodeError as e:
return [], f"Invalid JSON in submission file: {e}"
if not entries:
return [], "Submission file is empty"
return entries, "Successfully processed submission"
except Exception as e:
return [], f"Error processing submission file: {e}"