leaderboard / src /leaderboard.py
akera's picture
Update src/leaderboard.py
6ddad96 verified
# src/leaderboard.py
import pandas as pd
from datasets import Dataset, load_dataset
import json
import datetime
from typing import Dict, List, Optional, Tuple
import os
import numpy as np
from config import (
LEADERBOARD_DATASET,
HF_TOKEN,
EVALUATION_TRACKS,
MODEL_CATEGORIES,
METRICS_CONFIG,
)
from src.utils import create_submission_id, sanitize_model_name
def initialize_leaderboard() -> pd.DataFrame:
"""Initialize empty leaderboard DataFrame with all required columns."""
columns = {
# Basic information
"submission_id": [],
"model_name": [],
"author": [],
"submission_date": [],
"model_category": [],
"description": [],
# Track-specific quality scores
"google_comparable_quality": [],
"ug40_complete_quality": [],
# Track-specific BLEU scores
"google_comparable_bleu": [],
"ug40_complete_bleu": [],
# Track-specific ChrF scores
"google_comparable_chrf": [],
"ug40_complete_chrf": [],
# Confidence intervals
"google_comparable_ci_lower": [],
"google_comparable_ci_upper": [],
"ug40_complete_ci_lower": [],
"ug40_complete_ci_upper": [],
# Coverage information
"google_comparable_samples": [],
"ug40_complete_samples": [],
"google_comparable_pairs": [],
"ug40_complete_pairs": [],
# Detailed results (JSON strings)
"detailed_google_comparable": [],
"detailed_ug40_complete": [],
# Metadata
"evaluation_date": [],
}
return pd.DataFrame(columns)
def load_leaderboard() -> pd.DataFrame:
"""Load current leaderboard from HuggingFace dataset."""
try:
print("๐Ÿ“ฅ Loading leaderboard...")
dataset = load_dataset(LEADERBOARD_DATASET, split="train", token=HF_TOKEN)
df = dataset.to_pandas()
# Ensure all required columns exist
required_columns = list(initialize_leaderboard().columns)
for col in required_columns:
if col not in df.columns:
if "quality" in col or "bleu" in col or "chrf" in col or "ci_" in col:
df[col] = 0.0
elif "samples" in col or "pairs" in col:
df[col] = 0
else:
df[col] = ""
# Ensure proper data types for numeric columns with robust conversion
numeric_columns = [
col for col in df.columns
if any(x in col for x in ["quality", "bleu", "chrf", "ci_", "samples", "pairs"])
]
for col in numeric_columns:
try:
# Convert to numeric, coercing errors to NaN, then fill NaN with 0
df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0.0)
# Ensure it's float type for consistency
df[col] = df[col].astype(float)
except Exception as e:
print(f"Warning: Could not convert column {col} to numeric: {e}")
df[col] = 0.0
# Ensure string columns are properly typed
string_columns = ["model_name", "author", "model_category", "description", "submission_date", "evaluation_date"]
for col in string_columns:
if col in df.columns:
df[col] = df[col].fillna("").astype(str)
print(f"โœ… Loaded leaderboard with {len(df)} entries")
return df
except Exception as e:
print(f"โš ๏ธ Could not load leaderboard: {e}")
print("๐Ÿ”„ Initializing empty leaderboard...")
return initialize_leaderboard()
def save_leaderboard(df: pd.DataFrame) -> bool:
"""Save leaderboard to HuggingFace dataset."""
try:
# Clean data before saving
df_clean = df.copy()
# Ensure numeric columns are proper types
numeric_columns = [
col for col in df_clean.columns
if any(x in col for x in ["quality", "bleu", "chrf", "ci_", "samples", "pairs"])
]
for col in numeric_columns:
if col in df_clean.columns:
df_clean[col] = pd.to_numeric(df_clean[col], errors="coerce").fillna(0.0)
# Convert to dataset
dataset = Dataset.from_pandas(df_clean)
# Push to hub
dataset.push_to_hub(
LEADERBOARD_DATASET,
token=HF_TOKEN,
commit_message=f"Update leaderboard - {datetime.datetime.now().isoformat()[:19]}",
)
print("โœ… Leaderboard saved successfully!")
return True
except Exception as e:
print(f"โŒ Error saving leaderboard: {e}")
return False
def add_model_to_leaderboard(
model_name: str,
author: str,
evaluation_results: Dict,
model_category: str = "community",
description: str = "",
) -> pd.DataFrame:
"""Add new model results to leaderboard."""
# Load current leaderboard
df = load_leaderboard()
# Remove existing entry if present
existing_mask = df["model_name"] == model_name
if existing_mask.any():
df = df[~existing_mask]
# Extract track results
tracks = evaluation_results.get("tracks", {})
# Prepare new entry
new_entry = {
"submission_id": create_submission_id(),
"model_name": sanitize_model_name(model_name),
"author": author[:100] if author else "Anonymous",
"submission_date": datetime.datetime.now().isoformat(),
"model_category": model_category if model_category in MODEL_CATEGORIES else "community",
"description": description[:500] if description else "",
# Extract track-specific metrics
**extract_track_metrics(tracks),
# Confidence intervals
**extract_confidence_intervals(tracks),
# Coverage information
**extract_coverage_information(tracks),
# Detailed results (JSON strings)
**serialize_detailed_results(tracks),
# Metadata
"evaluation_date": datetime.datetime.now().isoformat(),
}
# Convert to DataFrame and append
new_row_df = pd.DataFrame([new_entry])
updated_df = pd.concat([df, new_row_df], ignore_index=True)
# Save to hub
save_leaderboard(updated_df)
return updated_df
def extract_track_metrics(tracks: Dict) -> Dict:
"""Extract primary metrics from each track."""
metrics = {}
for track_name in EVALUATION_TRACKS.keys():
track_data = tracks.get(track_name, {})
track_averages = track_data.get("track_averages", {})
# Quality score
metrics[f"{track_name}_quality"] = float(track_averages.get("quality_score", 0.0))
# BLEU score
metrics[f"{track_name}_bleu"] = float(track_averages.get("bleu", 0.0))
# ChrF score
metrics[f"{track_name}_chrf"] = float(track_averages.get("chrf", 0.0))
return metrics
def extract_confidence_intervals(tracks: Dict) -> Dict:
"""Extract confidence intervals from each track."""
ci_data = {}
for track_name in EVALUATION_TRACKS.keys():
track_data = tracks.get(track_name, {})
track_confidence = track_data.get("track_confidence", {})
quality_stats = track_confidence.get("quality_score", {})
ci_data[f"{track_name}_ci_lower"] = float(quality_stats.get("ci_lower", 0.0))
ci_data[f"{track_name}_ci_upper"] = float(quality_stats.get("ci_upper", 0.0))
return ci_data
def extract_coverage_information(tracks: Dict) -> Dict:
"""Extract coverage information from each track."""
coverage = {}
for track_name in EVALUATION_TRACKS.keys():
track_data = tracks.get(track_name, {})
summary = track_data.get("summary", {})
coverage[f"{track_name}_samples"] = int(summary.get("total_samples", 0))
coverage[f"{track_name}_pairs"] = int(summary.get("language_pairs_evaluated", 0))
return coverage
def serialize_detailed_results(tracks: Dict) -> Dict:
"""Serialize detailed results for storage."""
detailed = {}
for track_name in EVALUATION_TRACKS.keys():
track_data = tracks.get(track_name, {})
# Create simplified detailed results for storage
simple_track_data = {
"pair_metrics": track_data.get("pair_metrics", {}),
"track_averages": track_data.get("track_averages", {}),
"track_confidence": track_data.get("track_confidence", {}),
"summary": track_data.get("summary", {})
}
detailed[f"detailed_{track_name}"] = json.dumps(simple_track_data)
return detailed
def get_track_leaderboard(
df: pd.DataFrame,
track: str,
metric: str = "quality",
category_filter: str = "all"
) -> pd.DataFrame:
"""Get leaderboard for a specific track with filtering."""
print(f"Getting track leaderboard for {track}, input df has {len(df)} rows")
if df.empty:
print("Input DataFrame is empty")
return df
track_quality_col = f"{track}_{metric}"
# Ensure columns exist
if track_quality_col not in df.columns:
print(f"Warning: Missing column {track_quality_col} for track {track}")
print(f"Available columns: {list(df.columns)}")
return pd.DataFrame()
try:
# Make a copy to avoid modifying original
df_filtered = df.copy()
print(f"Created copy with {len(df_filtered)} rows")
# Filter by category
if category_filter != "all":
original_count = len(df_filtered)
df_filtered = df_filtered[df_filtered["model_category"] == category_filter]
print(f"After category filter '{category_filter}': {len(df_filtered)} rows (was {original_count})")
# Ensure numeric columns are properly typed
numeric_columns = [
f"{track}_quality", f"{track}_bleu", f"{track}_chrf",
f"{track}_ci_lower", f"{track}_ci_upper",
f"{track}_samples", f"{track}_pairs"
]
print(f"Converting numeric columns: {[col for col in numeric_columns if col in df_filtered.columns]}")
for col in numeric_columns:
if col in df_filtered.columns:
try:
# Check original data type
print(f"Column {col} dtype: {df_filtered[col].dtype}, sample values: {df_filtered[col].head(3).tolist()}")
# Convert to numeric
df_filtered[col] = pd.to_numeric(df_filtered[col], errors='coerce').fillna(0.0)
print(f"Column {col} converted successfully")
except Exception as e:
print(f"Error converting column {col}: {e}")
df_filtered[col] = 0.0
# Filter to models that have this track
original_count = len(df_filtered)
quality_mask = df_filtered[track_quality_col] > 0
df_filtered = df_filtered[quality_mask]
print(f"After quality filter (>{track_quality_col} > 0): {len(df_filtered)} rows (was {original_count})")
if df_filtered.empty:
print("No models found with quality > 0 for this track")
return df_filtered
# Sort by track-specific metric
print(f"Sorting by {track_quality_col}")
df_filtered = df_filtered.sort_values(track_quality_col, ascending=False).reset_index(drop=True)
print(f"Sorted successfully, final result has {len(df_filtered)} rows")
return df_filtered
except Exception as e:
print(f"Error in get_track_leaderboard: {e}")
import traceback
traceback.print_exc()
return pd.DataFrame()
def prepare_leaderboard_display(df: pd.DataFrame, track: str) -> pd.DataFrame:
"""Prepare track-specific leaderboard for display."""
if df.empty:
return df
# Select relevant columns for this track
base_columns = ["model_name", "author", "submission_date", "model_category"]
track_columns = [
f"{track}_quality",
f"{track}_bleu",
f"{track}_chrf",
f"{track}_ci_lower",
f"{track}_ci_upper",
f"{track}_samples",
f"{track}_pairs",
]
# Only include columns that exist
available_columns = [col for col in base_columns + track_columns if col in df.columns]
display_df = df[available_columns].copy()
# Format numeric columns safely
def safe_format(value, precision=4):
"""Safely format numeric values."""
try:
if pd.isna(value) or value is None:
return "0.0000" if precision == 4 else "0.00"
return f"{float(value):.{precision}f}"
except (ValueError, TypeError):
return "0.0000" if precision == 4 else "0.00"
# Apply formatting to numeric columns
if f"{track}_quality" in display_df.columns:
display_df[f"{track}_quality"] = display_df[f"{track}_quality"].apply(lambda x: safe_format(x, 4))
if f"{track}_bleu" in display_df.columns:
display_df[f"{track}_bleu"] = display_df[f"{track}_bleu"].apply(lambda x: safe_format(x, 2))
if f"{track}_chrf" in display_df.columns:
display_df[f"{track}_chrf"] = display_df[f"{track}_chrf"].apply(lambda x: safe_format(x, 4))
if f"{track}_ci_lower" in display_df.columns:
display_df[f"{track}_ci_lower"] = display_df[f"{track}_ci_lower"].apply(lambda x: safe_format(x, 4))
if f"{track}_ci_upper" in display_df.columns:
display_df[f"{track}_ci_upper"] = display_df[f"{track}_ci_upper"].apply(lambda x: safe_format(x, 4))
# Format confidence intervals
if f"{track}_ci_lower" in display_df.columns and f"{track}_ci_upper" in display_df.columns:
display_df[f"{track}_confidence_interval"] = (
"[" + display_df[f"{track}_ci_lower"] + ", " + display_df[f"{track}_ci_upper"] + "]"
)
# Remove individual CI columns for cleaner display
display_df = display_df.drop(columns=[f"{track}_ci_lower", f"{track}_ci_upper"])
# Format submission date
if "submission_date" in display_df.columns:
display_df["submission_date"] = pd.to_datetime(display_df["submission_date"]).dt.strftime("%Y-%m-%d")
# Rename columns for better display
track_name = EVALUATION_TRACKS[track]["name"].split()[0] # First word
column_renames = {
"model_name": "Model Name",
"author": "Author",
"submission_date": "Submitted",
"model_category": "Category",
f"{track}_quality": f"{track_name} Quality",
f"{track}_bleu": f"{track_name} BLEU",
f"{track}_chrf": f"{track_name} ChrF",
f"{track}_confidence_interval": "95% CI",
f"{track}_samples": "Samples",
f"{track}_pairs": "Pairs",
}
display_df = display_df.rename(columns=column_renames)
return display_df