Spaces:
Running
Running
# src/evaluation.py | |
import pandas as pd | |
import numpy as np | |
from sacrebleu.metrics import BLEU, CHRF | |
from rouge_score import rouge_scorer | |
import Levenshtein | |
from collections import defaultdict | |
from transformers.models.whisper.english_normalizer import BasicTextNormalizer | |
from typing import Dict, List, Tuple, Optional | |
from scipy import stats | |
from scipy.stats import bootstrap | |
import warnings | |
from config import ( | |
ALL_UG40_LANGUAGES, | |
GOOGLE_SUPPORTED_LANGUAGES, | |
METRICS_CONFIG, | |
STATISTICAL_CONFIG, | |
EVALUATION_TRACKS, | |
MODEL_CATEGORIES, | |
) | |
from src.utils import get_all_language_pairs, get_google_comparable_pairs | |
warnings.filterwarnings("ignore", category=RuntimeWarning) | |
def calculate_sentence_metrics(reference: str, prediction: str) -> Dict[str, float]: | |
"""Calculate all metrics for a single sentence pair with robust error handling.""" | |
# Handle empty predictions | |
if not prediction or not isinstance(prediction, str): | |
prediction = "" | |
if not reference or not isinstance(reference, str): | |
reference = "" | |
# Normalize texts | |
normalizer = BasicTextNormalizer() | |
pred_norm = normalizer(prediction) | |
ref_norm = normalizer(reference) | |
metrics = {} | |
# BLEU score (0-100 scale) | |
try: | |
bleu = BLEU(effective_order=True) | |
metrics["bleu"] = bleu.sentence_score(pred_norm, [ref_norm]).score | |
except: | |
metrics["bleu"] = 0.0 | |
# ChrF score (normalize to 0-1) | |
try: | |
chrf = CHRF() | |
metrics["chrf"] = chrf.sentence_score(pred_norm, [ref_norm]).score / 100.0 | |
except: | |
metrics["chrf"] = 0.0 | |
# Character Error Rate (CER) | |
try: | |
if len(ref_norm) > 0: | |
metrics["cer"] = Levenshtein.distance(ref_norm, pred_norm) / len(ref_norm) | |
else: | |
metrics["cer"] = 1.0 if len(pred_norm) > 0 else 0.0 | |
except: | |
metrics["cer"] = 1.0 | |
# Word Error Rate (WER) | |
try: | |
ref_words = ref_norm.split() | |
pred_words = pred_norm.split() | |
if len(ref_words) > 0: | |
metrics["wer"] = Levenshtein.distance(ref_words, pred_words) / len( | |
ref_words | |
) | |
else: | |
metrics["wer"] = 1.0 if len(pred_words) > 0 else 0.0 | |
except: | |
metrics["wer"] = 1.0 | |
# Length ratio | |
try: | |
if len(ref_norm) > 0: | |
metrics["len_ratio"] = len(pred_norm) / len(ref_norm) | |
else: | |
metrics["len_ratio"] = 1.0 if len(pred_norm) == 0 else float("inf") | |
except: | |
metrics["len_ratio"] = 1.0 | |
# ROUGE scores | |
try: | |
scorer = rouge_scorer.RougeScorer( | |
["rouge1", "rouge2", "rougeL"], use_stemmer=True | |
) | |
rouge_scores = scorer.score(ref_norm, pred_norm) | |
metrics["rouge1"] = rouge_scores["rouge1"].fmeasure | |
metrics["rouge2"] = rouge_scores["rouge2"].fmeasure | |
metrics["rougeL"] = rouge_scores["rougeL"].fmeasure | |
except: | |
metrics["rouge1"] = 0.0 | |
metrics["rouge2"] = 0.0 | |
metrics["rougeL"] = 0.0 | |
# Quality score (composite metric) | |
try: | |
quality_components = [ | |
metrics["bleu"] / 100.0, # Normalize BLEU to 0-1 | |
metrics["chrf"], # Already 0-1 | |
1.0 - min(metrics["cer"], 1.0), # Invert error rates | |
1.0 - min(metrics["wer"], 1.0), | |
metrics["rouge1"], | |
metrics["rougeL"], | |
] | |
metrics["quality_score"] = np.mean(quality_components) | |
except: | |
metrics["quality_score"] = 0.0 | |
return metrics | |
def calculate_statistical_metrics(values: List[float]) -> Dict[str, float]: | |
"""Calculate statistical measures including confidence intervals.""" | |
if not values or len(values) == 0: | |
return { | |
"mean": 0.0, | |
"std": 0.0, | |
"median": 0.0, | |
"ci_lower": 0.0, | |
"ci_upper": 0.0, | |
"n_samples": 0, | |
} | |
values = np.array(values) | |
values = values[~np.isnan(values)] # Remove NaN values | |
if len(values) == 0: | |
return { | |
"mean": 0.0, | |
"std": 0.0, | |
"median": 0.0, | |
"ci_lower": 0.0, | |
"ci_upper": 0.0, | |
"n_samples": 0, | |
} | |
stats_dict = { | |
"mean": float(np.mean(values)), | |
"std": float(np.std(values, ddof=1)) if len(values) > 1 else 0.0, | |
"median": float(np.median(values)), | |
"n_samples": len(values), | |
} | |
# Calculate confidence intervals using bootstrap if enough samples | |
if len(values) >= STATISTICAL_CONFIG["min_samples_for_ci"]: | |
try: | |
confidence_level = STATISTICAL_CONFIG["confidence_level"] | |
# Bootstrap confidence interval | |
def mean_func(x): | |
return np.mean(x) | |
res = bootstrap( | |
(values,), | |
mean_func, | |
n_resamples=STATISTICAL_CONFIG["bootstrap_samples"], | |
confidence_level=confidence_level, | |
random_state=42, | |
) | |
stats_dict["ci_lower"] = float(res.confidence_interval.low) | |
stats_dict["ci_upper"] = float(res.confidence_interval.high) | |
except Exception as e: | |
# Fallback to t-distribution CI | |
try: | |
alpha = 1 - confidence_level | |
t_val = stats.t.ppf(1 - alpha / 2, len(values) - 1) | |
margin = t_val * stats_dict["std"] / np.sqrt(len(values)) | |
stats_dict["ci_lower"] = stats_dict["mean"] - margin | |
stats_dict["ci_upper"] = stats_dict["mean"] + margin | |
except: | |
stats_dict["ci_lower"] = stats_dict["mean"] | |
stats_dict["ci_upper"] = stats_dict["mean"] | |
else: | |
stats_dict["ci_lower"] = stats_dict["mean"] | |
stats_dict["ci_upper"] = stats_dict["mean"] | |
return stats_dict | |
def perform_significance_test( | |
values1: List[float], values2: List[float], metric_name: str | |
) -> Dict[str, float]: | |
"""Perform statistical significance test between two groups.""" | |
if len(values1) < 2 or len(values2) < 2: | |
return {"p_value": 1.0, "effect_size": 0.0, "significant": False} | |
values1 = np.array(values1) | |
values2 = np.array(values2) | |
# Remove NaN values | |
values1 = values1[~np.isnan(values1)] | |
values2 = values2[~np.isnan(values2)] | |
if len(values1) < 2 or len(values2) < 2: | |
return {"p_value": 1.0, "effect_size": 0.0, "significant": False} | |
try: | |
# Perform t-test | |
t_stat, p_value = stats.ttest_ind(values1, values2, equal_var=False) | |
# Calculate effect size (Cohen's d) | |
pooled_std = np.sqrt( | |
( | |
(len(values1) - 1) * np.var(values1, ddof=1) | |
+ (len(values2) - 1) * np.var(values2, ddof=1) | |
) | |
/ (len(values1) + len(values2) - 2) | |
) | |
if pooled_std > 0: | |
effect_size = abs(np.mean(values1) - np.mean(values2)) / pooled_std | |
else: | |
effect_size = 0.0 | |
# Determine significance | |
significance_level = EVALUATION_TRACKS["google_comparable"][ | |
"significance_level" | |
] | |
significant = p_value < significance_level | |
return { | |
"p_value": float(p_value), | |
"effect_size": float(effect_size), | |
"significant": significant, | |
"t_statistic": float(t_stat), | |
} | |
except Exception as e: | |
return {"p_value": 1.0, "effect_size": 0.0, "significant": False} | |
def evaluate_predictions_by_track( | |
predictions: pd.DataFrame, test_set: pd.DataFrame, track: str | |
) -> Dict: | |
"""Evaluate predictions for a specific track with statistical analysis.""" | |
print(f"π Evaluating for {track} track...") | |
track_config = EVALUATION_TRACKS[track] | |
track_languages = track_config["languages"] | |
# Filter test set and predictions to track languages | |
track_test_set = test_set[ | |
(test_set["source_language"].isin(track_languages)) | |
& (test_set["target_language"].isin(track_languages)) | |
].copy() | |
# Merge predictions with test set | |
merged = track_test_set.merge( | |
predictions, on="sample_id", how="inner", suffixes=("", "_pred") | |
) | |
if len(merged) == 0: | |
return { | |
"error": f"No matching samples found for {track} track", | |
"evaluated_samples": 0, | |
"track": track, | |
} | |
print(f"π Evaluating {len(merged)} samples for {track} track...") | |
# Calculate metrics for each sample | |
sample_metrics = [] | |
for idx, row in merged.iterrows(): | |
metrics = calculate_sentence_metrics(row["target_text"], row["prediction"]) | |
metrics["sample_id"] = row["sample_id"] | |
metrics["source_language"] = row["source_language"] | |
metrics["target_language"] = row["target_language"] | |
sample_metrics.append(metrics) | |
sample_df = pd.DataFrame(sample_metrics) | |
# Aggregate by language pairs with statistical analysis | |
pair_metrics = {} | |
overall_metrics = defaultdict(list) | |
# Calculate metrics for each language pair | |
for src_lang in track_languages: | |
for tgt_lang in track_languages: | |
if src_lang == tgt_lang: | |
continue | |
pair_data = sample_df[ | |
(sample_df["source_language"] == src_lang) | |
& (sample_df["target_language"] == tgt_lang) | |
] | |
if len(pair_data) >= track_config["min_samples_per_pair"]: | |
pair_key = f"{src_lang}_to_{tgt_lang}" | |
pair_metrics[pair_key] = {} | |
# Calculate statistical metrics for each measure | |
for metric in ( | |
METRICS_CONFIG["primary_metrics"] | |
+ METRICS_CONFIG["secondary_metrics"] | |
): | |
if metric in pair_data.columns: | |
values = ( | |
pair_data[metric] | |
.replace([np.inf, -np.inf], np.nan) | |
.dropna() | |
) | |
if len(values) > 0: | |
stats_metrics = calculate_statistical_metrics( | |
values.tolist() | |
) | |
pair_metrics[pair_key][metric] = stats_metrics | |
# Add to overall metrics for track-level statistics | |
overall_metrics[metric].append(stats_metrics["mean"]) | |
pair_metrics[pair_key]["sample_count"] = len(pair_data) | |
pair_metrics[pair_key]["languages"] = f"{src_lang}-{tgt_lang}" | |
# Calculate track-level aggregated statistics | |
track_averages = {} | |
track_statistics = {} | |
for metric in overall_metrics: | |
if overall_metrics[metric]: | |
track_stats = calculate_statistical_metrics(overall_metrics[metric]) | |
track_averages[metric] = track_stats["mean"] | |
track_statistics[metric] = track_stats | |
# Generate evaluation summary | |
summary = { | |
"track": track, | |
"track_name": track_config["name"], | |
"total_samples": len(sample_df), | |
"language_pairs_evaluated": len( | |
[k for k in pair_metrics if pair_metrics[k].get("sample_count", 0) > 0] | |
), | |
"languages_covered": len( | |
set(sample_df["source_language"]) | set(sample_df["target_language"]) | |
), | |
"min_samples_per_pair": track_config["min_samples_per_pair"], | |
"statistical_power": track_config["statistical_power"], | |
"significance_level": track_config["significance_level"], | |
} | |
return { | |
"sample_metrics": sample_df, | |
"pair_metrics": pair_metrics, | |
"track_averages": track_averages, | |
"track_statistics": track_statistics, | |
"summary": summary, | |
"evaluated_samples": len(sample_df), | |
"track": track, | |
"error": None, | |
} | |
def evaluate_predictions_scientific( | |
predictions: pd.DataFrame, test_set: pd.DataFrame, model_category: str = "community" | |
) -> Dict: | |
"""Comprehensive evaluation across all tracks with scientific rigor.""" | |
print("π¬ Starting scientific evaluation...") | |
# Validate model category | |
if model_category not in MODEL_CATEGORIES: | |
model_category = "community" | |
evaluation_results = { | |
"model_category": model_category, | |
"category_info": MODEL_CATEGORIES[model_category], | |
"tracks": {}, | |
"cross_track_analysis": {}, | |
"scientific_metadata": { | |
"evaluation_timestamp": pd.Timestamp.now().isoformat(), | |
"total_samples_submitted": len(predictions), | |
"total_samples_available": len(test_set), | |
}, | |
} | |
# Evaluate each track | |
for track_name in EVALUATION_TRACKS.keys(): | |
track_result = evaluate_predictions_by_track(predictions, test_set, track_name) | |
evaluation_results["tracks"][track_name] = track_result | |
# Cross-track consistency analysis | |
evaluation_results["cross_track_analysis"] = analyze_cross_track_consistency( | |
evaluation_results["tracks"] | |
) | |
return evaluation_results | |
def analyze_cross_track_consistency(track_results: Dict) -> Dict: | |
"""Analyze consistency of model performance across different tracks.""" | |
consistency_analysis = { | |
"track_correlations": {}, | |
"performance_stability": {}, | |
"language_coverage_analysis": {}, | |
} | |
# Extract quality scores from each track for correlation analysis | |
track_scores = {} | |
for track_name, track_data in track_results.items(): | |
if ( | |
track_data.get("track_averages") | |
and "quality_score" in track_data["track_averages"] | |
): | |
track_scores[track_name] = track_data["track_averages"]["quality_score"] | |
# Calculate pairwise correlations (would need more data points for meaningful correlation) | |
if len(track_scores) >= 2: | |
track_names = list(track_scores.keys()) | |
for i, track1 in enumerate(track_names): | |
for track2 in track_names[i + 1 :]: | |
# This would be more meaningful with multiple models | |
consistency_analysis["track_correlations"][f"{track1}_vs_{track2}"] = { | |
"score_difference": abs( | |
track_scores[track1] - track_scores[track2] | |
), | |
"relative_performance": track_scores[track1] | |
/ max(track_scores[track2], 0.001), | |
} | |
# Language coverage analysis | |
for track_name, track_data in track_results.items(): | |
if track_data.get("summary"): | |
summary = track_data["summary"] | |
consistency_analysis["language_coverage_analysis"][track_name] = { | |
"coverage_rate": summary["language_pairs_evaluated"] | |
/ max(summary.get("total_possible_pairs", 1), 1), | |
"samples_per_pair": summary["total_samples"] | |
/ max(summary["language_pairs_evaluated"], 1), | |
"statistical_adequacy": summary["total_samples"] | |
>= EVALUATION_TRACKS[track_name]["min_samples_per_pair"] | |
* summary["language_pairs_evaluated"], | |
} | |
return consistency_analysis | |
def compare_models_statistically( | |
model1_results: Dict, model2_results: Dict, track: str = "google_comparable" | |
) -> Dict: | |
"""Perform statistical comparison between two models on a specific track.""" | |
if track not in model1_results.get("tracks", {}) or track not in model2_results.get( | |
"tracks", {} | |
): | |
return {"error": f"Track {track} not available for both models"} | |
track1_data = model1_results["tracks"][track] | |
track2_data = model2_results["tracks"][track] | |
if track1_data.get("error") or track2_data.get("error"): | |
return {"error": "One or both models have evaluation errors"} | |
comparison_results = { | |
"track": track, | |
"model1_category": model1_results.get("model_category", "unknown"), | |
"model2_category": model2_results.get("model_category", "unknown"), | |
"metric_comparisons": {}, | |
"language_pair_comparisons": {}, | |
"overall_significance": {}, | |
} | |
# Compare each metric | |
for metric in ( | |
METRICS_CONFIG["primary_metrics"] + METRICS_CONFIG["secondary_metrics"] | |
): | |
if metric in track1_data.get( | |
"track_statistics", {} | |
) and metric in track2_data.get("track_statistics", {}): | |
# Extract sample-level data for this metric from both models | |
# This would require access to the original sample metrics | |
# For now, we'll use the aggregated statistics | |
stats1 = track1_data["track_statistics"][metric] | |
stats2 = track2_data["track_statistics"][metric] | |
# Create comparison summary | |
comparison_results["metric_comparisons"][metric] = { | |
"model1_mean": stats1["mean"], | |
"model1_ci": [stats1["ci_lower"], stats1["ci_upper"]], | |
"model2_mean": stats2["mean"], | |
"model2_ci": [stats2["ci_lower"], stats2["ci_upper"]], | |
"difference": stats1["mean"] - stats2["mean"], | |
"ci_overlap": not ( | |
stats1["ci_upper"] < stats2["ci_lower"] | |
or stats2["ci_upper"] < stats1["ci_lower"] | |
), | |
} | |
return comparison_results | |
def generate_scientific_report( | |
results: Dict, model_name: str = "", baseline_results: Dict = None | |
) -> str: | |
"""Generate a comprehensive scientific evaluation report.""" | |
if any( | |
track_data.get("error") for track_data in results.get("tracks", {}).values() | |
): | |
return f"β **Evaluation Error**: Unable to complete scientific evaluation" | |
report = [] | |
# Header | |
report.append(f"# π¬ Scientific Evaluation Report: {model_name or 'Model'}") | |
report.append("") | |
# Model categorization | |
category_info = results.get("category_info", {}) | |
report.append(f"**Model Category**: {category_info.get('name', 'Unknown')}") | |
report.append( | |
f"**Category Description**: {category_info.get('description', 'N/A')}" | |
) | |
report.append("") | |
# Track-by-track analysis | |
for track_name, track_data in results.get("tracks", {}).items(): | |
if track_data.get("error"): | |
continue | |
track_config = EVALUATION_TRACKS[track_name] | |
summary = track_data.get("summary", {}) | |
track_stats = track_data.get("track_statistics", {}) | |
report.append(f"## {track_config['name']}") | |
report.append(f"*{track_config['description']}*") | |
report.append("") | |
# Summary statistics | |
report.append("### π Summary Statistics") | |
report.append(f"- **Samples Evaluated**: {summary.get('total_samples', 0):,}") | |
report.append( | |
f"- **Language Pairs**: {summary.get('language_pairs_evaluated', 0)}" | |
) | |
report.append(f"- **Languages Covered**: {summary.get('languages_covered', 0)}") | |
report.append(f"- **Statistical Power**: {track_config['statistical_power']}") | |
report.append("") | |
# Primary metrics with confidence intervals | |
report.append("### π― Primary Metrics (95% Confidence Intervals)") | |
for metric in METRICS_CONFIG["primary_metrics"]: | |
if metric in track_stats: | |
stats = track_stats[metric] | |
mean_val = stats["mean"] | |
ci_lower = stats["ci_lower"] | |
ci_upper = stats["ci_upper"] | |
report.append( | |
f"- **{metric.upper()}**: {mean_val:.4f} [{ci_lower:.4f}, {ci_upper:.4f}]" | |
) | |
report.append("") | |
# Statistical adequacy assessment | |
min_required = track_config["min_samples_per_pair"] * summary.get( | |
"language_pairs_evaluated", 0 | |
) | |
adequacy = ( | |
"β Adequate" | |
if summary.get("total_samples", 0) >= min_required | |
else "β οΈ Limited" | |
) | |
report.append(f"**Statistical Adequacy**: {adequacy}") | |
report.append("") | |
# Cross-track analysis | |
cross_track = results.get("cross_track_analysis", {}) | |
if cross_track: | |
report.append("## π Cross-Track Consistency Analysis") | |
coverage_analysis = cross_track.get("language_coverage_analysis", {}) | |
for track_name, coverage_info in coverage_analysis.items(): | |
adequacy = ( | |
"β Statistically adequate" | |
if coverage_info.get("statistical_adequacy") | |
else "β οΈ Limited statistical power" | |
) | |
report.append(f"- **{track_name}**: {adequacy}") | |
report.append("") | |
# Baseline comparison if available | |
if baseline_results: | |
report.append("## π Baseline Comparison") | |
# This would include detailed statistical comparisons | |
report.append("*Statistical comparison with baseline models*") | |
report.append("") | |
# Scientific recommendations | |
report.append("## π‘ Scientific Recommendations") | |
total_samples = sum( | |
track_data.get("summary", {}).get("total_samples", 0) | |
for track_data in results.get("tracks", {}).values() | |
if not track_data.get("error") | |
) | |
if total_samples < SAMPLE_SIZE_RECOMMENDATIONS["publication_quality"]: | |
report.append( | |
"- β οΈ Consider collecting more evaluation samples for publication-quality results" | |
) | |
google_track = results.get("tracks", {}).get("google_comparable", {}) | |
if ( | |
not google_track.get("error") | |
and google_track.get("summary", {}).get("total_samples", 0) > 100 | |
): | |
report.append("- β Sufficient data for comparison with commercial systems") | |
report.append("") | |
return "\n".join(report) | |