Spaces:
Sleeping
Sleeping
# src/leaderboard.py | |
import pandas as pd | |
from datasets import Dataset, load_dataset | |
import json | |
import datetime | |
from typing import Dict, List, Optional, Tuple | |
import os | |
import numpy as np | |
from config import ( | |
LEADERBOARD_DATASET, | |
HF_TOKEN, | |
EVALUATION_TRACKS, | |
MODEL_CATEGORIES, | |
STATISTICAL_CONFIG, | |
METRICS_CONFIG, | |
SAMPLE_SIZE_RECOMMENDATIONS, | |
) | |
from src.utils import create_submission_id, sanitize_model_name | |
def initialize_scientific_leaderboard() -> pd.DataFrame: | |
"""Initialize empty scientific leaderboard DataFrame with all required columns.""" | |
columns = { | |
# Basic information | |
"submission_id": [], | |
"model_name": [], | |
"author": [], | |
"submission_date": [], | |
"model_category": [], | |
"description": [], | |
# Track-specific quality scores | |
"google_comparable_quality": [], | |
"ug40_complete_quality": [], | |
"language_pair_matrix_quality": [], | |
# Track-specific BLEU scores | |
"google_comparable_bleu": [], | |
"ug40_complete_bleu": [], | |
"language_pair_matrix_bleu": [], | |
# Track-specific ChrF scores | |
"google_comparable_chrf": [], | |
"ug40_complete_chrf": [], | |
"language_pair_matrix_chrf": [], | |
# Statistical metadata | |
"google_comparable_ci_lower": [], | |
"google_comparable_ci_upper": [], | |
"ug40_complete_ci_lower": [], | |
"ug40_complete_ci_upper": [], | |
"language_pair_matrix_ci_lower": [], | |
"language_pair_matrix_ci_upper": [], | |
# Coverage information | |
"google_comparable_samples": [], | |
"ug40_complete_samples": [], | |
"language_pair_matrix_samples": [], | |
"google_comparable_pairs": [], | |
"ug40_complete_pairs": [], | |
"language_pair_matrix_pairs": [], | |
# Statistical adequacy flags | |
"google_comparable_adequate": [], | |
"ug40_complete_adequate": [], | |
"language_pair_matrix_adequate": [], | |
# Detailed results (JSON strings) | |
"detailed_google_comparable": [], | |
"detailed_ug40_complete": [], | |
"detailed_language_pair_matrix": [], | |
"cross_track_analysis": [], | |
# Metadata | |
"evaluation_date": [], | |
"leaderboard_version": [], | |
"scientific_adequacy_score": [], | |
} | |
return pd.DataFrame(columns) | |
def load_scientific_leaderboard() -> pd.DataFrame: | |
"""Load current scientific leaderboard from HuggingFace dataset.""" | |
try: | |
print("📥 Loading scientific leaderboard...") | |
dataset = load_dataset(LEADERBOARD_DATASET + "-scientific", split="train") | |
df = dataset.to_pandas() | |
# Ensure all required columns exist | |
required_columns = list(initialize_scientific_leaderboard().columns) | |
for col in required_columns: | |
if col not in df.columns: | |
if "quality" in col or "bleu" in col or "chrf" in col or "ci_" in col: | |
df[col] = 0.0 | |
elif "samples" in col or "pairs" in col: | |
df[col] = 0 | |
elif "adequate" in col: | |
df[col] = False | |
elif col == "scientific_adequacy_score": | |
df[col] = 0.0 | |
elif col == "leaderboard_version": | |
df[col] = 2 # Scientific version | |
else: | |
df[col] = "" | |
# Ensure proper data types for boolean columns | |
boolean_columns = [col for col in df.columns if "adequate" in col] | |
for col in boolean_columns: | |
df[col] = df[col].fillna(False).astype(bool) | |
# Ensure proper data types for numeric columns | |
numeric_columns = [ | |
col for col in df.columns | |
if any(x in col for x in ["quality", "bleu", "chrf", "ci_", "samples", "pairs", "adequacy"]) | |
and "adequate" not in col | |
] | |
for col in numeric_columns: | |
df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0.0) | |
print(f"✅ Loaded scientific leaderboard with {len(df)} entries") | |
return df | |
except Exception as e: | |
print(f"⚠️ Could not load scientific leaderboard: {e}") | |
print("🔄 Initializing empty scientific leaderboard...") | |
return initialize_scientific_leaderboard() | |
def save_scientific_leaderboard(df: pd.DataFrame) -> bool: | |
"""Save scientific leaderboard to HuggingFace dataset.""" | |
try: | |
# Clean data before saving | |
df_clean = df.copy() | |
# Ensure numeric columns are proper types | |
numeric_columns = [ | |
col for col in df_clean.columns | |
if any(x in col for x in ["quality", "bleu", "chrf", "ci_", "samples", "pairs", "adequacy"]) | |
] | |
for col in numeric_columns: | |
if col in df_clean.columns: | |
if "adequate" in col: | |
df_clean[col] = df_clean[col].astype(bool) | |
else: | |
df_clean[col] = pd.to_numeric(df_clean[col], errors="coerce").fillna(0.0) | |
# Convert to dataset | |
dataset = Dataset.from_pandas(df_clean) | |
# Push to hub | |
dataset.push_to_hub( | |
LEADERBOARD_DATASET + "-scientific", | |
token=HF_TOKEN, | |
commit_message=f"Update scientific leaderboard - {datetime.datetime.now().isoformat()[:19]}", | |
) | |
print("✅ Scientific leaderboard saved successfully!") | |
return True | |
except Exception as e: | |
print(f"❌ Error saving scientific leaderboard: {e}") | |
return False | |
def add_model_to_scientific_leaderboard( | |
model_name: str, | |
author: str, | |
evaluation_results: Dict, | |
model_category: str = "community", | |
description: str = "", | |
) -> pd.DataFrame: | |
"""Add new model results to scientific leaderboard.""" | |
# Load current leaderboard | |
df = load_scientific_leaderboard() | |
# Remove existing entry if present | |
existing_mask = df["model_name"] == model_name | |
if existing_mask.any(): | |
df = df[~existing_mask] | |
# Extract track results | |
tracks = evaluation_results.get("tracks", {}) | |
cross_track = evaluation_results.get("cross_track_analysis", {}) | |
# Calculate scientific adequacy score | |
adequacy_score = calculate_scientific_adequacy_score(evaluation_results) | |
# Prepare new entry | |
new_entry = { | |
"submission_id": create_submission_id(), | |
"model_name": sanitize_model_name(model_name), | |
"author": author[:100] if author else "Anonymous", | |
"submission_date": datetime.datetime.now().isoformat(), | |
"model_category": model_category if model_category in MODEL_CATEGORIES else "community", | |
"description": description[:500] if description else "", | |
# Extract track-specific metrics | |
**extract_track_metrics(tracks), | |
# Statistical metadata | |
**extract_statistical_metadata(tracks), | |
# Coverage information | |
**extract_coverage_information(tracks), | |
# Adequacy flags | |
**extract_adequacy_flags(tracks), | |
# Detailed results (JSON strings) | |
**serialize_detailed_results(tracks, cross_track), | |
# Metadata | |
"evaluation_date": datetime.datetime.now().isoformat(), | |
"leaderboard_version": 2, | |
"scientific_adequacy_score": adequacy_score, | |
} | |
# Convert to DataFrame and append | |
new_row_df = pd.DataFrame([new_entry]) | |
updated_df = pd.concat([df, new_row_df], ignore_index=True) | |
# Save to hub | |
save_scientific_leaderboard(updated_df) | |
return updated_df | |
def extract_track_metrics(tracks: Dict) -> Dict: | |
"""Extract primary metrics from each track.""" | |
metrics = {} | |
for track_name in EVALUATION_TRACKS.keys(): | |
track_data = tracks.get(track_name, {}) | |
track_averages = track_data.get("track_averages", {}) | |
# Quality score | |
metrics[f"{track_name}_quality"] = float(track_averages.get("quality_score", 0.0)) | |
# BLEU score | |
metrics[f"{track_name}_bleu"] = float(track_averages.get("bleu", 0.0)) | |
# ChrF score | |
metrics[f"{track_name}_chrf"] = float(track_averages.get("chrf", 0.0)) | |
return metrics | |
def extract_statistical_metadata(tracks: Dict) -> Dict: | |
"""Extract confidence intervals from each track.""" | |
metadata = {} | |
for track_name in EVALUATION_TRACKS.keys(): | |
track_data = tracks.get(track_name, {}) | |
track_statistics = track_data.get("track_statistics", {}) | |
quality_stats = track_statistics.get("quality_score", {}) | |
metadata[f"{track_name}_ci_lower"] = float(quality_stats.get("ci_lower", 0.0)) | |
metadata[f"{track_name}_ci_upper"] = float(quality_stats.get("ci_upper", 0.0)) | |
return metadata | |
def extract_coverage_information(tracks: Dict) -> Dict: | |
"""Extract coverage information from each track.""" | |
coverage = {} | |
for track_name in EVALUATION_TRACKS.keys(): | |
track_data = tracks.get(track_name, {}) | |
summary = track_data.get("summary", {}) | |
coverage[f"{track_name}_samples"] = int(summary.get("total_samples", 0)) | |
coverage[f"{track_name}_pairs"] = int(summary.get("language_pairs_evaluated", 0)) | |
return coverage | |
def extract_adequacy_flags(tracks: Dict) -> Dict: | |
"""Extract statistical adequacy flags for each track.""" | |
adequacy = {} | |
for track_name in EVALUATION_TRACKS.keys(): | |
track_data = tracks.get(track_name, {}) | |
summary = track_data.get("summary", {}) | |
min_required = EVALUATION_TRACKS[track_name]["min_samples_per_pair"] * summary.get("language_pairs_evaluated", 0) | |
is_adequate = summary.get("total_samples", 0) >= min_required | |
adequacy[f"{track_name}_adequate"] = bool(is_adequate) | |
return adequacy | |
def serialize_detailed_results(tracks: Dict, cross_track: Dict) -> Dict: | |
"""Serialize detailed results for storage.""" | |
detailed = {} | |
for track_name in EVALUATION_TRACKS.keys(): | |
track_data = tracks.get(track_name, {}) | |
# Remove non-serializable data | |
safe_track_data = {} | |
for key, value in track_data.items(): | |
if key != "sample_metrics": # Skip large DataFrames | |
safe_track_data[key] = value | |
detailed[f"detailed_{track_name}"] = json.dumps(safe_track_data) | |
detailed["cross_track_analysis"] = json.dumps(cross_track) | |
return detailed | |
def calculate_scientific_adequacy_score(evaluation_results: Dict) -> float: | |
"""Calculate overall scientific adequacy score (0-1).""" | |
tracks = evaluation_results.get("tracks", {}) | |
adequacy_scores = [] | |
for track_name in EVALUATION_TRACKS.keys(): | |
track_data = tracks.get(track_name, {}) | |
summary = track_data.get("summary", {}) | |
if track_data.get("error"): | |
adequacy_scores.append(0.0) | |
continue | |
# Sample size adequacy | |
min_required = EVALUATION_TRACKS[track_name]["min_samples_per_pair"] * summary.get("language_pairs_evaluated", 0) | |
sample_adequacy = min(summary.get("total_samples", 0) / max(min_required, 1), 1.0) | |
# Coverage adequacy | |
total_possible_pairs = len(EVALUATION_TRACKS[track_name]["languages"]) * (len(EVALUATION_TRACKS[track_name]["languages"]) - 1) | |
coverage_adequacy = summary.get("language_pairs_evaluated", 0) / max(total_possible_pairs, 1) | |
# Track adequacy | |
track_adequacy = (sample_adequacy + coverage_adequacy) / 2 | |
adequacy_scores.append(track_adequacy) | |
return float(np.mean(adequacy_scores)) | |
def get_track_leaderboard( | |
df: pd.DataFrame, | |
track: str, | |
metric: str = "quality", | |
category_filter: str = "all", | |
min_adequacy: float = 0.0 | |
) -> pd.DataFrame: | |
"""Get leaderboard for a specific track with filtering.""" | |
if df.empty: | |
return df | |
track_quality_col = f"{track}_{metric}" | |
track_adequate_col = f"{track}_adequate" | |
# Ensure columns exist | |
if track_quality_col not in df.columns or track_adequate_col not in df.columns: | |
print(f"Warning: Missing columns for track {track}") | |
return pd.DataFrame() | |
# Filter by adequacy | |
if min_adequacy > 0: | |
adequacy_mask = df["scientific_adequacy_score"] >= min_adequacy | |
df = df[adequacy_mask] | |
# Filter by category | |
if category_filter != "all": | |
df = df[df["model_category"] == category_filter] | |
# Filter to models that have this track - fix boolean operation | |
# Convert to proper boolean and handle NaN values | |
quality_mask = pd.to_numeric(df[track_quality_col], errors='coerce') > 0 | |
adequate_mask = df[track_adequate_col].fillna(False).astype(bool) | |
valid_mask = quality_mask & adequate_mask | |
df = df[valid_mask] | |
if df.empty: | |
return df | |
# Sort by track-specific metric | |
df = df.sort_values(track_quality_col, ascending=False).reset_index(drop=True) | |
return df | |
def prepare_track_leaderboard_display(df: pd.DataFrame, track: str) -> pd.DataFrame: | |
"""Prepare track-specific leaderboard for display.""" | |
if df.empty: | |
return df | |
# Select relevant columns for this track | |
base_columns = ["model_name", "author", "submission_date", "model_category"] | |
track_columns = [ | |
f"{track}_quality", | |
f"{track}_bleu", | |
f"{track}_chrf", | |
f"{track}_ci_lower", | |
f"{track}_ci_upper", | |
f"{track}_samples", | |
f"{track}_pairs", | |
f"{track}_adequate", | |
] | |
# Only include columns that exist | |
available_columns = [col for col in base_columns + track_columns if col in df.columns] | |
display_df = df[available_columns].copy() | |
# Format numeric columns | |
numeric_format = { | |
f"{track}_quality": "{:.4f}", | |
f"{track}_bleu": "{:.2f}", | |
f"{track}_chrf": "{:.4f}", | |
f"{track}_ci_lower": "{:.4f}", | |
f"{track}_ci_upper": "{:.4f}", | |
} | |
for col, fmt in numeric_format.items(): | |
if col in display_df.columns: | |
display_df[col] = display_df[col].apply( | |
lambda x: fmt.format(float(x)) if pd.notnull(x) else "0.0000" | |
) | |
# Format confidence intervals | |
if f"{track}_ci_lower" in display_df.columns and f"{track}_ci_upper" in display_df.columns: | |
display_df[f"{track}_confidence_interval"] = ( | |
"[" + display_df[f"{track}_ci_lower"] + ", " + display_df[f"{track}_ci_upper"] + "]" | |
) | |
# Remove individual CI columns for cleaner display | |
display_df = display_df.drop(columns=[f"{track}_ci_lower", f"{track}_ci_upper"]) | |
# Format submission date | |
if "submission_date" in display_df.columns: | |
display_df["submission_date"] = pd.to_datetime(display_df["submission_date"]).dt.strftime("%Y-%m-%d") | |
# Rename columns for better display | |
track_name = EVALUATION_TRACKS[track]["name"].split()[0] # First word | |
column_renames = { | |
"model_name": "Model Name", | |
"author": "Author", | |
"submission_date": "Submitted", | |
"model_category": "Category", | |
f"{track}_quality": f"{track_name} Quality", | |
f"{track}_bleu": f"{track_name} BLEU", | |
f"{track}_chrf": f"{track_name} ChrF", | |
f"{track}_confidence_interval": "95% CI", | |
f"{track}_samples": "Samples", | |
f"{track}_pairs": "Pairs", | |
f"{track}_adequate": "Adequate", | |
} | |
display_df = display_df.rename(columns=column_renames) | |
return display_df | |
def get_scientific_leaderboard_stats(df: pd.DataFrame, track: str = None) -> Dict: | |
"""Get comprehensive statistics for the scientific leaderboard.""" | |
if df.empty: | |
return { | |
"total_models": 0, | |
"models_by_category": {}, | |
"track_statistics": {}, | |
"adequacy_distribution": {}, | |
"best_models_by_track": {}, | |
} | |
stats = { | |
"total_models": len(df), | |
"models_by_category": df["model_category"].value_counts().to_dict(), | |
"adequacy_distribution": {}, | |
"track_statistics": {}, | |
"best_models_by_track": {}, | |
} | |
# Adequacy distribution | |
adequacy_bins = pd.cut( | |
df["scientific_adequacy_score"], | |
bins=[0, 0.3, 0.6, 0.8, 1.0], | |
labels=["Poor", "Fair", "Good", "Excellent"] | |
) | |
stats["adequacy_distribution"] = adequacy_bins.value_counts().to_dict() | |
# Track-specific statistics | |
for track_name in EVALUATION_TRACKS.keys(): | |
quality_col = f"{track_name}_quality" | |
adequate_col = f"{track_name}_adequate" | |
if quality_col in df.columns and adequate_col in df.columns: | |
track_models = df[df[adequate_col] & (df[quality_col] > 0)] | |
if len(track_models) > 0: | |
stats["track_statistics"][track_name] = { | |
"participating_models": len(track_models), | |
"avg_quality": float(track_models[quality_col].mean()), | |
"std_quality": float(track_models[quality_col].std()), | |
"best_quality": float(track_models[quality_col].max()), | |
} | |
# Best model for this track | |
best_model = track_models.loc[track_models[quality_col].idxmax()] | |
stats["best_models_by_track"][track_name] = { | |
"name": best_model["model_name"], | |
"category": best_model["model_category"], | |
"quality": float(best_model[quality_col]), | |
} | |
return stats | |
def perform_fair_comparison( | |
df: pd.DataFrame, | |
model_names: List[str], | |
shared_pairs_only: bool = True | |
) -> Dict: | |
"""Perform fair comparison between models using only shared language pairs.""" | |
models = df[df["model_name"].isin(model_names)] | |
if len(models) == 0: | |
return {"error": "No models found"} | |
comparison = { | |
"models": list(models["model_name"]), | |
"fair_comparison_possible": True, | |
"track_comparisons": {}, | |
"statistical_significance": {}, | |
"recommendations": [], | |
} | |
# Check if fair comparison is possible | |
categories = models["model_category"].unique() | |
if len(categories) > 1: | |
comparison["recommendations"].append( | |
"⚠️ Comparing models from different categories - interpret results carefully" | |
) | |
# For each track, compare models | |
for track_name in EVALUATION_TRACKS.keys(): | |
quality_col = f"{track_name}_quality" | |
adequate_col = f"{track_name}_adequate" | |
track_models = models[models[adequate_col] & (models[quality_col] > 0)] | |
if len(track_models) >= 2: | |
comparison["track_comparisons"][track_name] = { | |
"participating_models": len(track_models), | |
"quality_scores": dict(zip(track_models["model_name"], track_models[quality_col])), | |
"confidence_intervals": {}, | |
} | |
# Extract confidence intervals | |
for _, model in track_models.iterrows(): | |
ci_lower = model.get(f"{track_name}_ci_lower", 0) | |
ci_upper = model.get(f"{track_name}_ci_upper", 0) | |
comparison["track_comparisons"][track_name]["confidence_intervals"][model["model_name"]] = [ci_lower, ci_upper] | |
return comparison | |
def export_scientific_leaderboard( | |
df: pd.DataFrame, | |
track: str = "all", | |
format: str = "csv", | |
include_detailed: bool = False | |
) -> str: | |
"""Export scientific leaderboard in specified format.""" | |
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") | |
if track != "all": | |
# Export specific track | |
export_df = prepare_track_leaderboard_display(df, track) | |
filename_prefix = f"salt_leaderboard_{track}_{timestamp}" | |
else: | |
# Export all tracks | |
if include_detailed: | |
export_df = df.copy() | |
else: | |
# Select essential columns | |
essential_columns = [ | |
"model_name", "author", "submission_date", "model_category", | |
"scientific_adequacy_score" | |
] | |
# Add track-specific quality scores | |
for track_name in EVALUATION_TRACKS.keys(): | |
essential_columns.extend([ | |
f"{track_name}_quality", | |
f"{track_name}_adequate", | |
]) | |
available_columns = [col for col in essential_columns if col in df.columns] | |
export_df = df[available_columns].copy() | |
filename_prefix = f"salt_leaderboard_scientific_{timestamp}" | |
# Export in specified format | |
if format == "csv": | |
filename = f"{filename_prefix}.csv" | |
export_df.to_csv(filename, index=False) | |
elif format == "json": | |
filename = f"{filename_prefix}.json" | |
export_df.to_json(filename, orient="records", indent=2) | |
elif format == "xlsx": | |
filename = f"{filename_prefix}.xlsx" | |
export_df.to_excel(filename, index=False) | |
else: | |
raise ValueError(f"Unsupported format: {format}") | |
return filename |