leaderboard / src /evaluation.py
akera's picture
Update src/evaluation.py
6ff2c45 verified
raw
history blame
16.9 kB
# src/evaluation.py
import pandas as pd
import numpy as np
from sacrebleu.metrics import BLEU, CHRF
from rouge_score import rouge_scorer
import Levenshtein
from collections import defaultdict
from transformers.models.whisper.english_normalizer import BasicTextNormalizer
from typing import Dict, List, Tuple
from config import ALL_UG40_LANGUAGES, GOOGLE_SUPPORTED_LANGUAGES, METRICS_CONFIG
from src.utils import get_all_language_pairs, get_google_comparable_pairs
def calculate_sentence_metrics(reference: str, prediction: str) -> Dict[str, float]:
"""Calculate all metrics for a single sentence pair - Fixed to match reference implementation."""
# Handle empty predictions
if not prediction or not isinstance(prediction, str):
prediction = ""
if not reference or not isinstance(reference, str):
reference = ""
# Normalize texts
normalizer = BasicTextNormalizer()
pred_norm = normalizer(prediction)
ref_norm = normalizer(reference)
metrics = {}
# BLEU score (keep as 0-100 scale initially)
try:
bleu = BLEU(effective_order=True)
metrics['bleu'] = bleu.sentence_score(pred_norm, [ref_norm]).score
except:
metrics['bleu'] = 0.0
# ChrF score (normalize to 0-1)
try:
chrf = CHRF()
metrics['chrf'] = chrf.sentence_score(pred_norm, [ref_norm]).score / 100.0
except:
metrics['chrf'] = 0.0
# Character Error Rate (CER)
try:
if len(ref_norm) > 0:
metrics['cer'] = Levenshtein.distance(ref_norm, pred_norm) / len(ref_norm)
else:
metrics['cer'] = 1.0 if len(pred_norm) > 0 else 0.0
except:
metrics['cer'] = 1.0
# Word Error Rate (WER)
try:
ref_words = ref_norm.split()
pred_words = pred_norm.split()
if len(ref_words) > 0:
metrics['wer'] = Levenshtein.distance(ref_words, pred_words) / len(ref_words)
else:
metrics['wer'] = 1.0 if len(pred_words) > 0 else 0.0
except:
metrics['wer'] = 1.0
# Length ratio
try:
if len(ref_norm) > 0:
metrics['len_ratio'] = len(pred_norm) / len(ref_norm)
else:
metrics['len_ratio'] = 1.0 if len(pred_norm) == 0 else float('inf')
except:
metrics['len_ratio'] = 1.0
# ROUGE scores
try:
scorer = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True)
rouge_scores = scorer.score(ref_norm, pred_norm)
metrics['rouge1'] = rouge_scores['rouge1'].fmeasure
metrics['rouge2'] = rouge_scores['rouge2'].fmeasure
metrics['rougeL'] = rouge_scores['rougeL'].fmeasure
except:
metrics['rouge1'] = 0.0
metrics['rouge2'] = 0.0
metrics['rougeL'] = 0.0
# Quality score (composite metric) - Fixed to match reference
try:
quality_components = [
metrics['bleu'] / 100.0, # Normalize BLEU to 0-1
metrics['chrf'], # Already 0-1
1.0 - min(metrics['cer'], 1.0), # Invert error rates
1.0 - min(metrics['wer'], 1.0),
metrics['rouge1'],
metrics['rougeL']
]
metrics['quality_score'] = np.mean(quality_components)
except Exception as e:
# Fallback without ROUGE
print(f"Error calculating quality score: {e}")
try:
fallback_components = [
metrics['bleu'] / 100.0,
metrics['chrf'],
1.0 - min(metrics['cer'], 1.0),
1.0 - min(metrics['wer'], 1.0)
]
metrics['quality_score'] = np.mean(fallback_components)
except:
metrics['quality_score'] = 0.0
return metrics
def evaluate_predictions(predictions: pd.DataFrame, test_set: pd.DataFrame) -> Dict:
"""Evaluate predictions against test set targets."""
print("Starting evaluation...")
# Merge predictions with test set (which contains targets)
merged = test_set.merge(
predictions,
on='sample_id',
how='inner',
suffixes=('', '_pred')
)
if len(merged) == 0:
return {
'error': 'No matching samples found between predictions and test set',
'evaluated_samples': 0
}
print(f"Evaluating {len(merged)} samples...")
# Calculate metrics for each sample
sample_metrics = []
for idx, row in merged.iterrows():
metrics = calculate_sentence_metrics(row['target_text'], row['prediction'])
metrics['sample_id'] = row['sample_id']
metrics['source_language'] = row['source_language']
metrics['target_language'] = row['target_language']
metrics['google_comparable'] = row.get('google_comparable', False)
sample_metrics.append(metrics)
sample_df = pd.DataFrame(sample_metrics)
# Aggregate by language pairs - Fixed aggregation
pair_metrics = {}
overall_metrics = defaultdict(list)
google_comparable_metrics = defaultdict(list)
# Calculate metrics for each language pair
for src_lang in ALL_UG40_LANGUAGES:
for tgt_lang in ALL_UG40_LANGUAGES:
if src_lang != tgt_lang:
pair_data = sample_df[
(sample_df['source_language'] == src_lang) &
(sample_df['target_language'] == tgt_lang)
]
if len(pair_data) > 0:
pair_key = f"{src_lang}_to_{tgt_lang}"
pair_metrics[pair_key] = {}
# Calculate averages for this pair
for metric in METRICS_CONFIG['primary_metrics'] + METRICS_CONFIG['secondary_metrics']:
if metric in pair_data.columns:
# Filter out invalid values
valid_values = pair_data[metric].replace([np.inf, -np.inf], np.nan).dropna()
if len(valid_values) > 0:
avg_value = float(valid_values.mean())
pair_metrics[pair_key][metric] = avg_value
# Add to overall averages
overall_metrics[metric].append(avg_value)
# Add to Google comparable if applicable
if (src_lang in GOOGLE_SUPPORTED_LANGUAGES and
tgt_lang in GOOGLE_SUPPORTED_LANGUAGES):
google_comparable_metrics[metric].append(avg_value)
pair_metrics[pair_key]['sample_count'] = len(pair_data)
# Calculate overall averages
averages = {}
for metric in overall_metrics:
if overall_metrics[metric]:
averages[metric] = float(np.mean(overall_metrics[metric]))
else:
averages[metric] = 0.0
# Calculate Google comparable averages
google_averages = {}
for metric in google_comparable_metrics:
if google_comparable_metrics[metric]:
google_averages[metric] = float(np.mean(google_comparable_metrics[metric]))
else:
google_averages[metric] = 0.0
# Generate evaluation summary
summary = {
'total_samples': len(sample_df),
'language_pairs_covered': len([k for k in pair_metrics if pair_metrics[k].get('sample_count', 0) > 0]),
'google_comparable_pairs': len([k for k in pair_metrics
if '_to_' in k and
k.split('_to_')[0] in GOOGLE_SUPPORTED_LANGUAGES and
k.split('_to_')[1] in GOOGLE_SUPPORTED_LANGUAGES and
pair_metrics[k].get('sample_count', 0) > 0]),
'primary_metrics': {metric: averages.get(metric, 0.0)
for metric in METRICS_CONFIG['primary_metrics']},
'secondary_metrics': {metric: averages.get(metric, 0.0)
for metric in METRICS_CONFIG['secondary_metrics']}
}
return {
'sample_metrics': sample_df,
'pair_metrics': pair_metrics,
'averages': averages,
'google_comparable_averages': google_averages,
'summary': summary,
'evaluated_samples': len(sample_df),
'error': None
}
# Keep the rest of the functions unchanged...
def compare_with_baseline(results: Dict, baseline_results: Dict = None) -> Dict:
"""Compare results with baseline (e.g., Google Translate)."""
if baseline_results is None:
return {
'comparison_available': False,
'message': 'No baseline available for comparison'
}
comparison = {
'comparison_available': True,
'overall_comparison': {},
'pair_comparisons': {},
'better_pairs': [],
'worse_pairs': []
}
# Compare overall metrics
for metric in METRICS_CONFIG['primary_metrics']:
if metric in results['averages'] and metric in baseline_results['averages']:
user_score = results['averages'][metric]
baseline_score = baseline_results['averages'][metric]
# For error metrics (cer, wer), lower is better
if metric in ['cer', 'wer']:
improvement = baseline_score - user_score # Positive = improvement
else:
improvement = user_score - baseline_score # Positive = improvement
comparison['overall_comparison'][metric] = {
'user_score': user_score,
'baseline_score': baseline_score,
'improvement': improvement,
'improvement_percent': (improvement / max(baseline_score, 0.001)) * 100
}
# Compare by language pairs (only Google comparable ones)
google_pairs = [k for k in results['pair_metrics']
if '_to_' in k and
k.split('_to_')[0] in GOOGLE_SUPPORTED_LANGUAGES and
k.split('_to_')[1] in GOOGLE_SUPPORTED_LANGUAGES]
for pair in google_pairs:
if pair in baseline_results['pair_metrics']:
pair_comparison = {}
for metric in METRICS_CONFIG['primary_metrics']:
if (metric in results['pair_metrics'][pair] and
metric in baseline_results['pair_metrics'][pair]):
user_score = results['pair_metrics'][pair][metric]
baseline_score = baseline_results['pair_metrics'][pair][metric]
if metric in ['cer', 'wer']:
improvement = baseline_score - user_score
else:
improvement = user_score - baseline_score
pair_comparison[metric] = {
'user_score': user_score,
'baseline_score': baseline_score,
'improvement': improvement
}
comparison['pair_comparisons'][pair] = pair_comparison
# Determine if this pair is better or worse overall
quality_improvement = pair_comparison.get('quality_score', {}).get('improvement', 0)
if quality_improvement > 0.01: # Threshold for significance
comparison['better_pairs'].append(pair)
elif quality_improvement < -0.01:
comparison['worse_pairs'].append(pair)
return comparison
def generate_evaluation_report(results: Dict, model_name: str = "", comparison: Dict = None) -> str:
"""Generate human-readable evaluation report."""
if results.get('error'):
return f"❌ **Evaluation Error**: {results['error']}"
report = []
# Header
report.append(f"## Evaluation Report: {model_name or 'Submission'}")
report.append("")
# Summary
summary = results['summary']
report.append("### πŸ“Š Summary")
report.append(f"- **Total Samples Evaluated**: {summary['total_samples']:,}")
report.append(f"- **Language Pairs Covered**: {summary['language_pairs_covered']}")
report.append(f"- **Google Comparable Pairs**: {summary['google_comparable_pairs']}")
report.append("")
# Primary metrics
report.append("### 🎯 Primary Metrics")
for metric, value in summary['primary_metrics'].items():
formatted_value = f"{value:.4f}" if metric != 'bleu' else f"{value:.2f}"
report.append(f"- **{metric.upper()}**: {formatted_value}")
# Quality ranking (if comparison available)
if comparison and comparison.get('comparison_available'):
quality_comp = comparison['overall_comparison'].get('quality_score', {})
if quality_comp:
improvement = quality_comp.get('improvement', 0)
if improvement > 0.01:
report.append(f" - 🟒 **{improvement:.3f}** better than baseline")
elif improvement < -0.01:
report.append(f" - πŸ”΄ **{abs(improvement):.3f}** worse than baseline")
else:
report.append(f" - 🟑 Similar to baseline")
report.append("")
# Secondary metrics
report.append("### πŸ“ˆ Secondary Metrics")
for metric, value in summary['secondary_metrics'].items():
formatted_value = f"{value:.4f}"
report.append(f"- **{metric.upper()}**: {formatted_value}")
report.append("")
# Language pair performance (top and bottom 5)
pair_metrics = results['pair_metrics']
if pair_metrics:
# Sort pairs by quality score
sorted_pairs = sorted(
[(k, v.get('quality_score', 0)) for k, v in pair_metrics.items() if v.get('sample_count', 0) > 0],
key=lambda x: x[1],
reverse=True
)
if sorted_pairs:
report.append("### πŸ† Best Performing Language Pairs")
for pair, score in sorted_pairs[:5]:
src, tgt = pair.replace('_to_', ' β†’ ').split(' β†’ ')
report.append(f"- **{src} β†’ {tgt}**: {score:.3f}")
if len(sorted_pairs) > 5:
report.append("")
report.append("### πŸ“‰ Challenging Language Pairs")
for pair, score in sorted_pairs[-3:]:
src, tgt = pair.replace('_to_', ' β†’ ').split(' β†’ ')
report.append(f"- **{src} β†’ {tgt}**: {score:.3f}")
# Comparison with baseline
if comparison and comparison.get('comparison_available'):
report.append("")
report.append("### πŸ” Comparison with Baseline")
better_count = len(comparison.get('better_pairs', []))
worse_count = len(comparison.get('worse_pairs', []))
total_comparable = len(comparison.get('pair_comparisons', {}))
if total_comparable > 0:
report.append(f"- **Better than baseline**: {better_count}/{total_comparable} pairs")
report.append(f"- **Worse than baseline**: {worse_count}/{total_comparable} pairs")
if comparison['better_pairs']:
report.append(" - Strong pairs: " + ", ".join(comparison['better_pairs'][:3]))
if comparison['worse_pairs']:
report.append(" - Weak pairs: " + ", ".join(comparison['worse_pairs'][:3]))
return "\n".join(report)
def create_sample_analysis(results: Dict, n_samples: int = 10) -> pd.DataFrame:
"""Create sample analysis showing best and worst translations."""
if 'sample_metrics' not in results:
return pd.DataFrame()
sample_df = results['sample_metrics']
# Get best and worst samples by quality score
best_samples = sample_df.nlargest(n_samples // 2, 'quality_score')
worst_samples = sample_df.nsmallest(n_samples // 2, 'quality_score')
analysis_samples = pd.concat([best_samples, worst_samples])
# Add category
analysis_samples['category'] = ['Best'] * len(best_samples) + ['Worst'] * len(worst_samples)
return analysis_samples[['sample_id', 'source_language', 'target_language',
'quality_score', 'bleu', 'chrf', 'category']]
def get_google_translate_baseline() -> Dict:
"""Get Google Translate baseline results (if available)."""
try:
# This would load pre-computed Google Translate results
# For now, return empty dict - implement when Google Translate baseline is available
return {}
except:
return {}