leaderboard / src /evaluation.py
akera's picture
Update src/evaluation.py
097598e verified
raw
history blame
21.7 kB
# src/evaluation.py
import pandas as pd
import numpy as np
from sacrebleu.metrics import BLEU, CHRF
from rouge_score import rouge_scorer
import Levenshtein
from collections import defaultdict
from transformers.models.whisper.english_normalizer import BasicTextNormalizer
from typing import Dict, List, Tuple, Optional
from scipy import stats
from scipy.stats import bootstrap
import warnings
from config import (
ALL_UG40_LANGUAGES,
GOOGLE_SUPPORTED_LANGUAGES,
METRICS_CONFIG,
STATISTICAL_CONFIG,
EVALUATION_TRACKS,
MODEL_CATEGORIES,
SAMPLE_SIZE_RECOMMENDATIONS,
)
from src.utils import get_all_language_pairs, get_google_comparable_pairs
warnings.filterwarnings("ignore", category=RuntimeWarning)
def calculate_sentence_metrics(reference: str, prediction: str) -> Dict[str, float]:
"""Calculate all metrics for a single sentence pair with robust error handling."""
# Handle empty predictions
if not prediction or not isinstance(prediction, str):
prediction = ""
if not reference or not isinstance(reference, str):
reference = ""
# Normalize texts
normalizer = BasicTextNormalizer()
pred_norm = normalizer(prediction)
ref_norm = normalizer(reference)
metrics = {}
# BLEU score (0-100 scale)
try:
bleu = BLEU(effective_order=True)
metrics["bleu"] = bleu.sentence_score(pred_norm, [ref_norm]).score
except:
metrics["bleu"] = 0.0
# ChrF score (normalize to 0-1)
try:
chrf = CHRF()
metrics["chrf"] = chrf.sentence_score(pred_norm, [ref_norm]).score / 100.0
except:
metrics["chrf"] = 0.0
# Character Error Rate (CER)
try:
if len(ref_norm) > 0:
metrics["cer"] = Levenshtein.distance(ref_norm, pred_norm) / len(ref_norm)
else:
metrics["cer"] = 1.0 if len(pred_norm) > 0 else 0.0
except:
metrics["cer"] = 1.0
# Word Error Rate (WER)
try:
ref_words = ref_norm.split()
pred_words = pred_norm.split()
if len(ref_words) > 0:
metrics["wer"] = Levenshtein.distance(ref_words, pred_words) / len(ref_words)
else:
metrics["wer"] = 1.0 if len(pred_words) > 0 else 0.0
except:
metrics["wer"] = 1.0
# Length ratio
try:
if len(ref_norm) > 0:
metrics["len_ratio"] = len(pred_norm) / len(ref_norm)
else:
metrics["len_ratio"] = 1.0 if len(pred_norm) == 0 else float("inf")
except:
metrics["len_ratio"] = 1.0
# ROUGE scores
try:
scorer = rouge_scorer.RougeScorer(
["rouge1", "rouge2", "rougeL"], use_stemmer=True
)
rouge_scores = scorer.score(ref_norm, pred_norm)
metrics["rouge1"] = rouge_scores["rouge1"].fmeasure
metrics["rouge2"] = rouge_scores["rouge2"].fmeasure
metrics["rougeL"] = rouge_scores["rougeL"].fmeasure
except:
metrics["rouge1"] = 0.0
metrics["rouge2"] = 0.0
metrics["rougeL"] = 0.0
# Quality score (composite metric)
try:
quality_components = [
metrics["bleu"] / 100.0, # Normalize BLEU to 0-1
metrics["chrf"], # Already 0-1
1.0 - min(metrics["cer"], 1.0), # Invert error rates
1.0 - min(metrics["wer"], 1.0),
metrics["rouge1"],
metrics["rougeL"],
]
metrics["quality_score"] = np.mean(quality_components)
except:
metrics["quality_score"] = 0.0
return metrics
def calculate_statistical_metrics(values: List[float]) -> Dict[str, float]:
"""Calculate statistical measures including confidence intervals."""
if not values or len(values) == 0:
return {
"mean": 0.0,
"std": 0.0,
"median": 0.0,
"ci_lower": 0.0,
"ci_upper": 0.0,
"n_samples": 0,
}
values = np.array(values)
values = values[~np.isnan(values)] # Remove NaN values
if len(values) == 0:
return {
"mean": 0.0,
"std": 0.0,
"median": 0.0,
"ci_lower": 0.0,
"ci_upper": 0.0,
"n_samples": 0,
}
stats_dict = {
"mean": float(np.mean(values)),
"std": float(np.std(values, ddof=1)) if len(values) > 1 else 0.0,
"median": float(np.median(values)),
"n_samples": len(values),
}
# Calculate confidence intervals using bootstrap if enough samples
if len(values) >= STATISTICAL_CONFIG["min_samples_for_ci"]:
try:
confidence_level = STATISTICAL_CONFIG["confidence_level"]
# Bootstrap confidence interval
def mean_func(x):
return np.mean(x)
res = bootstrap(
(values,),
mean_func,
n_resamples=STATISTICAL_CONFIG["bootstrap_samples"],
confidence_level=confidence_level,
random_state=42,
)
stats_dict["ci_lower"] = float(res.confidence_interval.low)
stats_dict["ci_upper"] = float(res.confidence_interval.high)
except Exception as e:
# Fallback to t-distribution CI
try:
alpha = 1 - confidence_level
t_val = stats.t.ppf(1 - alpha / 2, len(values) - 1)
margin = t_val * stats_dict["std"] / np.sqrt(len(values))
stats_dict["ci_lower"] = stats_dict["mean"] - margin
stats_dict["ci_upper"] = stats_dict["mean"] + margin
except:
stats_dict["ci_lower"] = stats_dict["mean"]
stats_dict["ci_upper"] = stats_dict["mean"]
else:
stats_dict["ci_lower"] = stats_dict["mean"]
stats_dict["ci_upper"] = stats_dict["mean"]
return stats_dict
def perform_significance_test(
values1: List[float], values2: List[float], metric_name: str
) -> Dict[str, float]:
"""Perform statistical significance test between two groups."""
if len(values1) < 2 or len(values2) < 2:
return {"p_value": 1.0, "effect_size": 0.0, "significant": False}
values1 = np.array(values1)
values2 = np.array(values2)
# Remove NaN values
values1 = values1[~np.isnan(values1)]
values2 = values2[~np.isnan(values2)]
if len(values1) < 2 or len(values2) < 2:
return {"p_value": 1.0, "effect_size": 0.0, "significant": False}
try:
# Perform t-test
t_stat, p_value = stats.ttest_ind(values1, values2, equal_var=False)
# Calculate effect size (Cohen's d)
pooled_std = np.sqrt(
((len(values1) - 1) * np.var(values1, ddof=1) +
(len(values2) - 1) * np.var(values2, ddof=1)) /
(len(values1) + len(values2) - 2)
)
if pooled_std > 0:
effect_size = abs(np.mean(values1) - np.mean(values2)) / pooled_std
else:
effect_size = 0.0
# Determine significance
significance_level = EVALUATION_TRACKS["google_comparable"]["significance_level"]
significant = p_value < significance_level
return {
"p_value": float(p_value),
"effect_size": float(effect_size),
"significant": significant,
"t_statistic": float(t_stat),
}
except Exception as e:
return {"p_value": 1.0, "effect_size": 0.0, "significant": False}
def evaluate_predictions_by_track(
predictions: pd.DataFrame, test_set: pd.DataFrame, track: str
) -> Dict:
"""Evaluate predictions for a specific track with statistical analysis."""
print(f"πŸ”„ Evaluating for {track} track...")
track_config = EVALUATION_TRACKS[track]
track_languages = track_config["languages"]
# Filter test set and predictions to track languages
track_test_set = test_set[
(test_set["source_language"].isin(track_languages)) &
(test_set["target_language"].isin(track_languages))
].copy()
# Merge predictions with test set
merged = track_test_set.merge(
predictions, on="sample_id", how="inner", suffixes=("", "_pred")
)
if len(merged) == 0:
return {
"error": f"No matching samples found for {track} track",
"evaluated_samples": 0,
"track": track,
}
print(f"πŸ“Š Evaluating {len(merged)} samples for {track} track...")
# Calculate metrics for each sample
sample_metrics = []
for idx, row in merged.iterrows():
metrics = calculate_sentence_metrics(row["target_text"], row["prediction"])
metrics["sample_id"] = row["sample_id"]
metrics["source_language"] = row["source_language"]
metrics["target_language"] = row["target_language"]
sample_metrics.append(metrics)
sample_df = pd.DataFrame(sample_metrics)
# Aggregate by language pairs with statistical analysis
pair_metrics = {}
overall_metrics = defaultdict(list)
# Calculate metrics for each language pair
for src_lang in track_languages:
for tgt_lang in track_languages:
if src_lang == tgt_lang:
continue
pair_data = sample_df[
(sample_df["source_language"] == src_lang) &
(sample_df["target_language"] == tgt_lang)
]
if len(pair_data) >= track_config["min_samples_per_pair"]:
pair_key = f"{src_lang}_to_{tgt_lang}"
pair_metrics[pair_key] = {}
# Calculate statistical metrics for each measure
for metric in (
METRICS_CONFIG["primary_metrics"] + METRICS_CONFIG["secondary_metrics"]
):
if metric in pair_data.columns:
values = pair_data[metric].replace([np.inf, -np.inf], np.nan).dropna()
if len(values) > 0:
stats_metrics = calculate_statistical_metrics(values.tolist())
pair_metrics[pair_key][metric] = stats_metrics
# Add to overall metrics for track-level statistics
overall_metrics[metric].append(stats_metrics["mean"])
pair_metrics[pair_key]["sample_count"] = len(pair_data)
pair_metrics[pair_key]["languages"] = f"{src_lang}-{tgt_lang}"
# Calculate track-level aggregated statistics
track_averages = {}
track_statistics = {}
for metric in overall_metrics:
if overall_metrics[metric]:
track_stats = calculate_statistical_metrics(overall_metrics[metric])
track_averages[metric] = track_stats["mean"]
track_statistics[metric] = track_stats
# Generate evaluation summary
summary = {
"track": track,
"track_name": track_config["name"],
"total_samples": len(sample_df),
"language_pairs_evaluated": len([k for k in pair_metrics if pair_metrics[k].get("sample_count", 0) > 0]),
"languages_covered": len(set(sample_df["source_language"]) | set(sample_df["target_language"])),
"min_samples_per_pair": track_config["min_samples_per_pair"],
"statistical_power": track_config["statistical_power"],
"significance_level": track_config["significance_level"],
}
return {
"sample_metrics": sample_df,
"pair_metrics": pair_metrics,
"track_averages": track_averages,
"track_statistics": track_statistics,
"summary": summary,
"evaluated_samples": len(sample_df),
"track": track,
"error": None,
}
def evaluate_predictions_scientific(
predictions: pd.DataFrame, test_set: pd.DataFrame, model_category: str = "community"
) -> Dict:
"""Comprehensive evaluation across all tracks with scientific rigor."""
print("πŸ”¬ Starting scientific evaluation...")
# Validate model category
if model_category not in MODEL_CATEGORIES:
model_category = "community"
evaluation_results = {
"model_category": model_category,
"category_info": MODEL_CATEGORIES[model_category],
"tracks": {},
"cross_track_analysis": {},
"scientific_metadata": {
"evaluation_timestamp": pd.Timestamp.now().isoformat(),
"total_samples_submitted": len(predictions),
"total_samples_available": len(test_set),
},
}
# Evaluate each track
for track_name in EVALUATION_TRACKS.keys():
track_result = evaluate_predictions_by_track(predictions, test_set, track_name)
evaluation_results["tracks"][track_name] = track_result
# Cross-track consistency analysis
evaluation_results["cross_track_analysis"] = analyze_cross_track_consistency(
evaluation_results["tracks"]
)
return evaluation_results
def analyze_cross_track_consistency(track_results: Dict) -> Dict:
"""Analyze consistency of model performance across different tracks."""
consistency_analysis = {
"track_correlations": {},
"performance_stability": {},
"language_coverage_analysis": {},
}
# Extract quality scores from each track for correlation analysis
track_scores = {}
for track_name, track_data in track_results.items():
if track_data.get("track_averages") and "quality_score" in track_data["track_averages"]:
track_scores[track_name] = track_data["track_averages"]["quality_score"]
# Calculate pairwise correlations (would need more data points for meaningful correlation)
if len(track_scores) >= 2:
track_names = list(track_scores.keys())
for i, track1 in enumerate(track_names):
for track2 in track_names[i + 1:]:
# This would be more meaningful with multiple models
consistency_analysis["track_correlations"][f"{track1}_vs_{track2}"] = {
"score_difference": abs(track_scores[track1] - track_scores[track2]),
"relative_performance": track_scores[track1] / max(track_scores[track2], 0.001),
}
# Language coverage analysis
for track_name, track_data in track_results.items():
if track_data.get("summary"):
summary = track_data["summary"]
consistency_analysis["language_coverage_analysis"][track_name] = {
"coverage_rate": summary["language_pairs_evaluated"] / max(summary.get("total_possible_pairs", 1), 1),
"samples_per_pair": summary["total_samples"] / max(summary["language_pairs_evaluated"], 1),
"statistical_adequacy": summary["total_samples"] >= EVALUATION_TRACKS[track_name]["min_samples_per_pair"] * summary["language_pairs_evaluated"],
}
return consistency_analysis
def compare_models_statistically(
model1_results: Dict, model2_results: Dict, track: str = "google_comparable"
) -> Dict:
"""Perform statistical comparison between two models on a specific track."""
if track not in model1_results.get("tracks", {}) or track not in model2_results.get("tracks", {}):
return {"error": f"Track {track} not available for both models"}
track1_data = model1_results["tracks"][track]
track2_data = model2_results["tracks"][track]
if track1_data.get("error") or track2_data.get("error"):
return {"error": "One or both models have evaluation errors"}
comparison_results = {
"track": track,
"model1_category": model1_results.get("model_category", "unknown"),
"model2_category": model2_results.get("model_category", "unknown"),
"metric_comparisons": {},
"language_pair_comparisons": {},
"overall_significance": {},
}
# Compare each metric
for metric in METRICS_CONFIG["primary_metrics"] + METRICS_CONFIG["secondary_metrics"]:
if (metric in track1_data.get("track_statistics", {}) and
metric in track2_data.get("track_statistics", {})):
# Extract sample-level data for this metric from both models
# This would require access to the original sample metrics
# For now, we'll use the aggregated statistics
stats1 = track1_data["track_statistics"][metric]
stats2 = track2_data["track_statistics"][metric]
# Create comparison summary
comparison_results["metric_comparisons"][metric] = {
"model1_mean": stats1["mean"],
"model1_ci": [stats1["ci_lower"], stats1["ci_upper"]],
"model2_mean": stats2["mean"],
"model2_ci": [stats2["ci_lower"], stats2["ci_upper"]],
"difference": stats1["mean"] - stats2["mean"],
"ci_overlap": not (stats1["ci_upper"] < stats2["ci_lower"] or
stats2["ci_upper"] < stats1["ci_lower"]),
}
return comparison_results
def generate_scientific_report(
results: Dict, model_name: str = "", baseline_results: Dict = None
) -> str:
"""Generate a comprehensive scientific evaluation report."""
if any(track_data.get("error") for track_data in results.get("tracks", {}).values()):
return f"❌ **Evaluation Error**: Unable to complete scientific evaluation"
report = []
# Header
report.append(f"# πŸ”¬ Scientific Evaluation Report: {model_name or 'Model'}")
report.append("")
# Model categorization
category_info = results.get("category_info", {})
report.append(f"**Model Category**: {category_info.get('name', 'Unknown')}")
report.append(f"**Category Description**: {category_info.get('description', 'N/A')}")
report.append("")
# Track-by-track analysis
for track_name, track_data in results.get("tracks", {}).items():
if track_data.get("error"):
continue
track_config = EVALUATION_TRACKS[track_name]
summary = track_data.get("summary", {})
track_stats = track_data.get("track_statistics", {})
report.append(f"## {track_config['name']}")
report.append(f"*{track_config['description']}*")
report.append("")
# Summary statistics
report.append("### πŸ“Š Summary Statistics")
report.append(f"- **Samples Evaluated**: {summary.get('total_samples', 0):,}")
report.append(f"- **Language Pairs**: {summary.get('language_pairs_evaluated', 0)}")
report.append(f"- **Languages Covered**: {summary.get('languages_covered', 0)}")
report.append(f"- **Statistical Power**: {track_config['statistical_power']}")
report.append("")
# Primary metrics with confidence intervals
report.append("### 🎯 Primary Metrics (95% Confidence Intervals)")
for metric in METRICS_CONFIG["primary_metrics"]:
if metric in track_stats:
stats = track_stats[metric]
mean_val = stats["mean"]
ci_lower = stats["ci_lower"]
ci_upper = stats["ci_upper"]
report.append(f"- **{metric.upper()}**: {mean_val:.4f} [{ci_lower:.4f}, {ci_upper:.4f}]")
report.append("")
# Statistical adequacy assessment
min_required = track_config["min_samples_per_pair"] * summary.get("language_pairs_evaluated", 0)
adequacy = "βœ… Adequate" if summary.get("total_samples", 0) >= min_required else "⚠️ Limited"
report.append(f"**Statistical Adequacy**: {adequacy}")
report.append("")
# Cross-track analysis
cross_track = results.get("cross_track_analysis", {})
if cross_track:
report.append("## πŸ”„ Cross-Track Consistency Analysis")
coverage_analysis = cross_track.get("language_coverage_analysis", {})
for track_name, coverage_info in coverage_analysis.items():
adequacy = "βœ… Statistically adequate" if coverage_info.get("statistical_adequacy") else "⚠️ Limited statistical power"
report.append(f"- **{track_name}**: {adequacy}")
report.append("")
# Baseline comparison if available
if baseline_results:
report.append("## πŸ“ˆ Baseline Comparison")
# This would include detailed statistical comparisons
report.append("*Statistical comparison with baseline models*")
report.append("")
# Scientific recommendations
report.append("## πŸ’‘ Scientific Recommendations")
total_samples = sum(
track_data.get("summary", {}).get("total_samples", 0)
for track_data in results.get("tracks", {}).values()
if not track_data.get("error")
)
if total_samples < SAMPLE_SIZE_RECOMMENDATIONS["publication_quality"]:
report.append("- ⚠️ Consider collecting more evaluation samples for publication-quality results")
google_track = results.get("tracks", {}).get("google_comparable", {})
if not google_track.get("error") and google_track.get("summary", {}).get("total_samples", 0) > 100:
report.append("- βœ… Sufficient data for comparison with commercial systems")
report.append("")
return "\n".join(report)