# src/evaluation.py import pandas as pd import numpy as np from sacrebleu.metrics import BLEU, CHRF from rouge_score import rouge_scorer import Levenshtein from collections import defaultdict from transformers.models.whisper.english_normalizer import BasicTextNormalizer from typing import Dict, List, Tuple, Optional from scipy import stats from scipy.stats import bootstrap import warnings from config import ( ALL_UG40_LANGUAGES, GOOGLE_SUPPORTED_LANGUAGES, METRICS_CONFIG, STATISTICAL_CONFIG, EVALUATION_TRACKS, MODEL_CATEGORIES, ) from src.utils import get_all_language_pairs, get_google_comparable_pairs warnings.filterwarnings("ignore", category=RuntimeWarning) def calculate_sentence_metrics(reference: str, prediction: str) -> Dict[str, float]: """Calculate all metrics for a single sentence pair with robust error handling.""" # Handle empty predictions if not prediction or not isinstance(prediction, str): prediction = "" if not reference or not isinstance(reference, str): reference = "" # Normalize texts normalizer = BasicTextNormalizer() pred_norm = normalizer(prediction) ref_norm = normalizer(reference) metrics = {} # BLEU score (0-100 scale) try: bleu = BLEU(effective_order=True) metrics["bleu"] = bleu.sentence_score(pred_norm, [ref_norm]).score except: metrics["bleu"] = 0.0 # ChrF score (normalize to 0-1) try: chrf = CHRF() metrics["chrf"] = chrf.sentence_score(pred_norm, [ref_norm]).score / 100.0 except: metrics["chrf"] = 0.0 # Character Error Rate (CER) try: if len(ref_norm) > 0: metrics["cer"] = Levenshtein.distance(ref_norm, pred_norm) / len(ref_norm) else: metrics["cer"] = 1.0 if len(pred_norm) > 0 else 0.0 except: metrics["cer"] = 1.0 # Word Error Rate (WER) try: ref_words = ref_norm.split() pred_words = pred_norm.split() if len(ref_words) > 0: metrics["wer"] = Levenshtein.distance(ref_words, pred_words) / len( ref_words ) else: metrics["wer"] = 1.0 if len(pred_words) > 0 else 0.0 except: metrics["wer"] = 1.0 # Length ratio try: if len(ref_norm) > 0: metrics["len_ratio"] = len(pred_norm) / len(ref_norm) else: metrics["len_ratio"] = 1.0 if len(pred_norm) == 0 else float("inf") except: metrics["len_ratio"] = 1.0 # ROUGE scores try: scorer = rouge_scorer.RougeScorer( ["rouge1", "rouge2", "rougeL"], use_stemmer=True ) rouge_scores = scorer.score(ref_norm, pred_norm) metrics["rouge1"] = rouge_scores["rouge1"].fmeasure metrics["rouge2"] = rouge_scores["rouge2"].fmeasure metrics["rougeL"] = rouge_scores["rougeL"].fmeasure except: metrics["rouge1"] = 0.0 metrics["rouge2"] = 0.0 metrics["rougeL"] = 0.0 # Quality score (composite metric) try: quality_components = [ metrics["bleu"] / 100.0, # Normalize BLEU to 0-1 metrics["chrf"], # Already 0-1 1.0 - min(metrics["cer"], 1.0), # Invert error rates 1.0 - min(metrics["wer"], 1.0), metrics["rouge1"], metrics["rougeL"], ] metrics["quality_score"] = np.mean(quality_components) except: metrics["quality_score"] = 0.0 return metrics def calculate_statistical_metrics(values: List[float]) -> Dict[str, float]: """Calculate statistical measures including confidence intervals.""" if not values or len(values) == 0: return { "mean": 0.0, "std": 0.0, "median": 0.0, "ci_lower": 0.0, "ci_upper": 0.0, "n_samples": 0, } values = np.array(values) values = values[~np.isnan(values)] # Remove NaN values if len(values) == 0: return { "mean": 0.0, "std": 0.0, "median": 0.0, "ci_lower": 0.0, "ci_upper": 0.0, "n_samples": 0, } stats_dict = { "mean": float(np.mean(values)), "std": float(np.std(values, ddof=1)) if len(values) > 1 else 0.0, "median": float(np.median(values)), "n_samples": len(values), } # Calculate confidence intervals using bootstrap if enough samples if len(values) >= STATISTICAL_CONFIG["min_samples_for_ci"]: try: confidence_level = STATISTICAL_CONFIG["confidence_level"] # Bootstrap confidence interval def mean_func(x): return np.mean(x) res = bootstrap( (values,), mean_func, n_resamples=STATISTICAL_CONFIG["bootstrap_samples"], confidence_level=confidence_level, random_state=42, ) stats_dict["ci_lower"] = float(res.confidence_interval.low) stats_dict["ci_upper"] = float(res.confidence_interval.high) except Exception as e: # Fallback to t-distribution CI try: alpha = 1 - confidence_level t_val = stats.t.ppf(1 - alpha / 2, len(values) - 1) margin = t_val * stats_dict["std"] / np.sqrt(len(values)) stats_dict["ci_lower"] = stats_dict["mean"] - margin stats_dict["ci_upper"] = stats_dict["mean"] + margin except: stats_dict["ci_lower"] = stats_dict["mean"] stats_dict["ci_upper"] = stats_dict["mean"] else: stats_dict["ci_lower"] = stats_dict["mean"] stats_dict["ci_upper"] = stats_dict["mean"] return stats_dict def perform_significance_test( values1: List[float], values2: List[float], metric_name: str ) -> Dict[str, float]: """Perform statistical significance test between two groups.""" if len(values1) < 2 or len(values2) < 2: return {"p_value": 1.0, "effect_size": 0.0, "significant": False} values1 = np.array(values1) values2 = np.array(values2) # Remove NaN values values1 = values1[~np.isnan(values1)] values2 = values2[~np.isnan(values2)] if len(values1) < 2 or len(values2) < 2: return {"p_value": 1.0, "effect_size": 0.0, "significant": False} try: # Perform t-test t_stat, p_value = stats.ttest_ind(values1, values2, equal_var=False) # Calculate effect size (Cohen's d) pooled_std = np.sqrt( ( (len(values1) - 1) * np.var(values1, ddof=1) + (len(values2) - 1) * np.var(values2, ddof=1) ) / (len(values1) + len(values2) - 2) ) if pooled_std > 0: effect_size = abs(np.mean(values1) - np.mean(values2)) / pooled_std else: effect_size = 0.0 # Determine significance significance_level = EVALUATION_TRACKS["google_comparable"][ "significance_level" ] significant = p_value < significance_level return { "p_value": float(p_value), "effect_size": float(effect_size), "significant": significant, "t_statistic": float(t_stat), } except Exception as e: return {"p_value": 1.0, "effect_size": 0.0, "significant": False} def evaluate_predictions_by_track( predictions: pd.DataFrame, test_set: pd.DataFrame, track: str ) -> Dict: """Evaluate predictions for a specific track with statistical analysis.""" print(f"🔄 Evaluating for {track} track...") track_config = EVALUATION_TRACKS[track] track_languages = track_config["languages"] # Filter test set and predictions to track languages track_test_set = test_set[ (test_set["source_language"].isin(track_languages)) & (test_set["target_language"].isin(track_languages)) ].copy() # Merge predictions with test set merged = track_test_set.merge( predictions, on="sample_id", how="inner", suffixes=("", "_pred") ) if len(merged) == 0: return { "error": f"No matching samples found for {track} track", "evaluated_samples": 0, "track": track, } print(f"📊 Evaluating {len(merged)} samples for {track} track...") # Calculate metrics for each sample sample_metrics = [] for idx, row in merged.iterrows(): metrics = calculate_sentence_metrics(row["target_text"], row["prediction"]) metrics["sample_id"] = row["sample_id"] metrics["source_language"] = row["source_language"] metrics["target_language"] = row["target_language"] sample_metrics.append(metrics) sample_df = pd.DataFrame(sample_metrics) # Aggregate by language pairs with statistical analysis pair_metrics = {} overall_metrics = defaultdict(list) # Calculate metrics for each language pair for src_lang in track_languages: for tgt_lang in track_languages: if src_lang == tgt_lang: continue pair_data = sample_df[ (sample_df["source_language"] == src_lang) & (sample_df["target_language"] == tgt_lang) ] if len(pair_data) >= track_config["min_samples_per_pair"]: pair_key = f"{src_lang}_to_{tgt_lang}" pair_metrics[pair_key] = {} # Calculate statistical metrics for each measure for metric in ( METRICS_CONFIG["primary_metrics"] + METRICS_CONFIG["secondary_metrics"] ): if metric in pair_data.columns: values = ( pair_data[metric] .replace([np.inf, -np.inf], np.nan) .dropna() ) if len(values) > 0: stats_metrics = calculate_statistical_metrics( values.tolist() ) pair_metrics[pair_key][metric] = stats_metrics # Add to overall metrics for track-level statistics overall_metrics[metric].append(stats_metrics["mean"]) pair_metrics[pair_key]["sample_count"] = len(pair_data) pair_metrics[pair_key]["languages"] = f"{src_lang}-{tgt_lang}" # Calculate track-level aggregated statistics track_averages = {} track_statistics = {} for metric in overall_metrics: if overall_metrics[metric]: track_stats = calculate_statistical_metrics(overall_metrics[metric]) track_averages[metric] = track_stats["mean"] track_statistics[metric] = track_stats # Generate evaluation summary summary = { "track": track, "track_name": track_config["name"], "total_samples": len(sample_df), "language_pairs_evaluated": len( [k for k in pair_metrics if pair_metrics[k].get("sample_count", 0) > 0] ), "languages_covered": len( set(sample_df["source_language"]) | set(sample_df["target_language"]) ), "min_samples_per_pair": track_config["min_samples_per_pair"], "statistical_power": track_config["statistical_power"], "significance_level": track_config["significance_level"], } return { "sample_metrics": sample_df, "pair_metrics": pair_metrics, "track_averages": track_averages, "track_statistics": track_statistics, "summary": summary, "evaluated_samples": len(sample_df), "track": track, "error": None, } def evaluate_predictions_scientific( predictions: pd.DataFrame, test_set: pd.DataFrame, model_category: str = "community" ) -> Dict: """Comprehensive evaluation across all tracks with scientific rigor.""" print("🔬 Starting scientific evaluation...") # Validate model category if model_category not in MODEL_CATEGORIES: model_category = "community" evaluation_results = { "model_category": model_category, "category_info": MODEL_CATEGORIES[model_category], "tracks": {}, "cross_track_analysis": {}, "scientific_metadata": { "evaluation_timestamp": pd.Timestamp.now().isoformat(), "total_samples_submitted": len(predictions), "total_samples_available": len(test_set), }, } # Evaluate each track for track_name in EVALUATION_TRACKS.keys(): track_result = evaluate_predictions_by_track(predictions, test_set, track_name) evaluation_results["tracks"][track_name] = track_result # Cross-track consistency analysis evaluation_results["cross_track_analysis"] = analyze_cross_track_consistency( evaluation_results["tracks"] ) return evaluation_results def analyze_cross_track_consistency(track_results: Dict) -> Dict: """Analyze consistency of model performance across different tracks.""" consistency_analysis = { "track_correlations": {}, "performance_stability": {}, "language_coverage_analysis": {}, } # Extract quality scores from each track for correlation analysis track_scores = {} for track_name, track_data in track_results.items(): if ( track_data.get("track_averages") and "quality_score" in track_data["track_averages"] ): track_scores[track_name] = track_data["track_averages"]["quality_score"] # Calculate pairwise correlations (would need more data points for meaningful correlation) if len(track_scores) >= 2: track_names = list(track_scores.keys()) for i, track1 in enumerate(track_names): for track2 in track_names[i + 1 :]: # This would be more meaningful with multiple models consistency_analysis["track_correlations"][f"{track1}_vs_{track2}"] = { "score_difference": abs( track_scores[track1] - track_scores[track2] ), "relative_performance": track_scores[track1] / max(track_scores[track2], 0.001), } # Language coverage analysis for track_name, track_data in track_results.items(): if track_data.get("summary"): summary = track_data["summary"] consistency_analysis["language_coverage_analysis"][track_name] = { "coverage_rate": summary["language_pairs_evaluated"] / max(summary.get("total_possible_pairs", 1), 1), "samples_per_pair": summary["total_samples"] / max(summary["language_pairs_evaluated"], 1), "statistical_adequacy": summary["total_samples"] >= EVALUATION_TRACKS[track_name]["min_samples_per_pair"] * summary["language_pairs_evaluated"], } return consistency_analysis def compare_models_statistically( model1_results: Dict, model2_results: Dict, track: str = "google_comparable" ) -> Dict: """Perform statistical comparison between two models on a specific track.""" if track not in model1_results.get("tracks", {}) or track not in model2_results.get( "tracks", {} ): return {"error": f"Track {track} not available for both models"} track1_data = model1_results["tracks"][track] track2_data = model2_results["tracks"][track] if track1_data.get("error") or track2_data.get("error"): return {"error": "One or both models have evaluation errors"} comparison_results = { "track": track, "model1_category": model1_results.get("model_category", "unknown"), "model2_category": model2_results.get("model_category", "unknown"), "metric_comparisons": {}, "language_pair_comparisons": {}, "overall_significance": {}, } # Compare each metric for metric in ( METRICS_CONFIG["primary_metrics"] + METRICS_CONFIG["secondary_metrics"] ): if metric in track1_data.get( "track_statistics", {} ) and metric in track2_data.get("track_statistics", {}): # Extract sample-level data for this metric from both models # This would require access to the original sample metrics # For now, we'll use the aggregated statistics stats1 = track1_data["track_statistics"][metric] stats2 = track2_data["track_statistics"][metric] # Create comparison summary comparison_results["metric_comparisons"][metric] = { "model1_mean": stats1["mean"], "model1_ci": [stats1["ci_lower"], stats1["ci_upper"]], "model2_mean": stats2["mean"], "model2_ci": [stats2["ci_lower"], stats2["ci_upper"]], "difference": stats1["mean"] - stats2["mean"], "ci_overlap": not ( stats1["ci_upper"] < stats2["ci_lower"] or stats2["ci_upper"] < stats1["ci_lower"] ), } return comparison_results def generate_scientific_report( results: Dict, model_name: str = "", baseline_results: Dict = None ) -> str: """Generate a comprehensive scientific evaluation report.""" if any( track_data.get("error") for track_data in results.get("tracks", {}).values() ): return f"❌ **Evaluation Error**: Unable to complete scientific evaluation" report = [] # Header report.append(f"# 🔬 Scientific Evaluation Report: {model_name or 'Model'}") report.append("") # Model categorization category_info = results.get("category_info", {}) report.append(f"**Model Category**: {category_info.get('name', 'Unknown')}") report.append( f"**Category Description**: {category_info.get('description', 'N/A')}" ) report.append("") # Track-by-track analysis for track_name, track_data in results.get("tracks", {}).items(): if track_data.get("error"): continue track_config = EVALUATION_TRACKS[track_name] summary = track_data.get("summary", {}) track_stats = track_data.get("track_statistics", {}) report.append(f"## {track_config['name']}") report.append(f"*{track_config['description']}*") report.append("") # Summary statistics report.append("### 📊 Summary Statistics") report.append(f"- **Samples Evaluated**: {summary.get('total_samples', 0):,}") report.append( f"- **Language Pairs**: {summary.get('language_pairs_evaluated', 0)}" ) report.append(f"- **Languages Covered**: {summary.get('languages_covered', 0)}") report.append(f"- **Statistical Power**: {track_config['statistical_power']}") report.append("") # Primary metrics with confidence intervals report.append("### 🎯 Primary Metrics (95% Confidence Intervals)") for metric in METRICS_CONFIG["primary_metrics"]: if metric in track_stats: stats = track_stats[metric] mean_val = stats["mean"] ci_lower = stats["ci_lower"] ci_upper = stats["ci_upper"] report.append( f"- **{metric.upper()}**: {mean_val:.4f} [{ci_lower:.4f}, {ci_upper:.4f}]" ) report.append("") # Statistical adequacy assessment min_required = track_config["min_samples_per_pair"] * summary.get( "language_pairs_evaluated", 0 ) adequacy = ( "✅ Adequate" if summary.get("total_samples", 0) >= min_required else "⚠️ Limited" ) report.append(f"**Statistical Adequacy**: {adequacy}") report.append("") # Cross-track analysis cross_track = results.get("cross_track_analysis", {}) if cross_track: report.append("## 🔄 Cross-Track Consistency Analysis") coverage_analysis = cross_track.get("language_coverage_analysis", {}) for track_name, coverage_info in coverage_analysis.items(): adequacy = ( "✅ Statistically adequate" if coverage_info.get("statistical_adequacy") else "⚠️ Limited statistical power" ) report.append(f"- **{track_name}**: {adequacy}") report.append("") # Baseline comparison if available if baseline_results: report.append("## 📈 Baseline Comparison") # This would include detailed statistical comparisons report.append("*Statistical comparison with baseline models*") report.append("") # Scientific recommendations report.append("## 💡 Scientific Recommendations") total_samples = sum( track_data.get("summary", {}).get("total_samples", 0) for track_data in results.get("tracks", {}).values() if not track_data.get("error") ) if total_samples < SAMPLE_SIZE_RECOMMENDATIONS["publication_quality"]: report.append( "- ⚠️ Consider collecting more evaluation samples for publication-quality results" ) google_track = results.get("tracks", {}).get("google_comparable", {}) if ( not google_track.get("error") and google_track.get("summary", {}).get("total_samples", 0) > 100 ): report.append("- ✅ Sufficient data for comparison with commercial systems") report.append("") return "\n".join(report)