Spaces:
Running
Running
# src/evaluation.py | |
import pandas as pd | |
import numpy as np | |
from sacrebleu.metrics import BLEU, CHRF | |
from rouge_score import rouge_scorer | |
import Levenshtein | |
from collections import defaultdict | |
from transformers.models.whisper.english_normalizer import BasicTextNormalizer | |
from typing import Dict, List, Tuple | |
from config import ALL_UG40_LANGUAGES, GOOGLE_SUPPORTED_LANGUAGES, METRICS_CONFIG | |
from src.utils import get_all_language_pairs, get_google_comparable_pairs | |
def calculate_sentence_metrics(reference: str, prediction: str) -> Dict[str, float]: | |
"""Calculate all metrics for a single sentence pair - Fixed to match reference implementation.""" | |
# Handle empty predictions | |
if not prediction or not isinstance(prediction, str): | |
prediction = "" | |
if not reference or not isinstance(reference, str): | |
reference = "" | |
# Normalize texts | |
normalizer = BasicTextNormalizer() | |
pred_norm = normalizer(prediction) | |
ref_norm = normalizer(reference) | |
metrics = {} | |
# BLEU score (keep as 0-100 scale initially) | |
try: | |
bleu = BLEU(effective_order=True) | |
metrics['bleu'] = bleu.sentence_score(pred_norm, [ref_norm]).score | |
except: | |
metrics['bleu'] = 0.0 | |
# ChrF score (normalize to 0-1) | |
try: | |
chrf = CHRF() | |
metrics['chrf'] = chrf.sentence_score(pred_norm, [ref_norm]).score / 100.0 | |
except: | |
metrics['chrf'] = 0.0 | |
# Character Error Rate (CER) | |
try: | |
if len(ref_norm) > 0: | |
metrics['cer'] = Levenshtein.distance(ref_norm, pred_norm) / len(ref_norm) | |
else: | |
metrics['cer'] = 1.0 if len(pred_norm) > 0 else 0.0 | |
except: | |
metrics['cer'] = 1.0 | |
# Word Error Rate (WER) | |
try: | |
ref_words = ref_norm.split() | |
pred_words = pred_norm.split() | |
if len(ref_words) > 0: | |
metrics['wer'] = Levenshtein.distance(ref_words, pred_words) / len(ref_words) | |
else: | |
metrics['wer'] = 1.0 if len(pred_words) > 0 else 0.0 | |
except: | |
metrics['wer'] = 1.0 | |
# Length ratio | |
try: | |
if len(ref_norm) > 0: | |
metrics['len_ratio'] = len(pred_norm) / len(ref_norm) | |
else: | |
metrics['len_ratio'] = 1.0 if len(pred_norm) == 0 else float('inf') | |
except: | |
metrics['len_ratio'] = 1.0 | |
# ROUGE scores | |
try: | |
scorer = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True) | |
rouge_scores = scorer.score(ref_norm, pred_norm) | |
metrics['rouge1'] = rouge_scores['rouge1'].fmeasure | |
metrics['rouge2'] = rouge_scores['rouge2'].fmeasure | |
metrics['rougeL'] = rouge_scores['rougeL'].fmeasure | |
except: | |
metrics['rouge1'] = 0.0 | |
metrics['rouge2'] = 0.0 | |
metrics['rougeL'] = 0.0 | |
# Quality score (composite metric) - Fixed to match reference | |
try: | |
quality_components = [ | |
metrics['bleu'] / 100.0, # Normalize BLEU to 0-1 | |
metrics['chrf'], # Already 0-1 | |
1.0 - min(metrics['cer'], 1.0), # Invert error rates | |
1.0 - min(metrics['wer'], 1.0), | |
metrics['rouge1'], | |
metrics['rougeL'] | |
] | |
metrics['quality_score'] = np.mean(quality_components) | |
except Exception as e: | |
# Fallback without ROUGE | |
print(f"Error calculating quality score: {e}") | |
try: | |
fallback_components = [ | |
metrics['bleu'] / 100.0, | |
metrics['chrf'], | |
1.0 - min(metrics['cer'], 1.0), | |
1.0 - min(metrics['wer'], 1.0) | |
] | |
metrics['quality_score'] = np.mean(fallback_components) | |
except: | |
metrics['quality_score'] = 0.0 | |
return metrics | |
def evaluate_predictions(predictions: pd.DataFrame, test_set: pd.DataFrame) -> Dict: | |
"""Evaluate predictions against test set targets.""" | |
print("Starting evaluation...") | |
# Merge predictions with test set (which contains targets) | |
merged = test_set.merge( | |
predictions, | |
on='sample_id', | |
how='inner', | |
suffixes=('', '_pred') | |
) | |
if len(merged) == 0: | |
return { | |
'error': 'No matching samples found between predictions and test set', | |
'evaluated_samples': 0 | |
} | |
print(f"Evaluating {len(merged)} samples...") | |
# Calculate metrics for each sample | |
sample_metrics = [] | |
for idx, row in merged.iterrows(): | |
metrics = calculate_sentence_metrics(row['target_text'], row['prediction']) | |
metrics['sample_id'] = row['sample_id'] | |
metrics['source_language'] = row['source_language'] | |
metrics['target_language'] = row['target_language'] | |
metrics['google_comparable'] = row.get('google_comparable', False) | |
sample_metrics.append(metrics) | |
sample_df = pd.DataFrame(sample_metrics) | |
# Aggregate by language pairs - Fixed aggregation | |
pair_metrics = {} | |
overall_metrics = defaultdict(list) | |
google_comparable_metrics = defaultdict(list) | |
# Calculate metrics for each language pair | |
for src_lang in ALL_UG40_LANGUAGES: | |
for tgt_lang in ALL_UG40_LANGUAGES: | |
if src_lang != tgt_lang: | |
pair_data = sample_df[ | |
(sample_df['source_language'] == src_lang) & | |
(sample_df['target_language'] == tgt_lang) | |
] | |
if len(pair_data) > 0: | |
pair_key = f"{src_lang}_to_{tgt_lang}" | |
pair_metrics[pair_key] = {} | |
# Calculate averages for this pair | |
for metric in METRICS_CONFIG['primary_metrics'] + METRICS_CONFIG['secondary_metrics']: | |
if metric in pair_data.columns: | |
# Filter out invalid values | |
valid_values = pair_data[metric].replace([np.inf, -np.inf], np.nan).dropna() | |
if len(valid_values) > 0: | |
avg_value = float(valid_values.mean()) | |
pair_metrics[pair_key][metric] = avg_value | |
# Add to overall averages | |
overall_metrics[metric].append(avg_value) | |
# Add to Google comparable if applicable | |
if (src_lang in GOOGLE_SUPPORTED_LANGUAGES and | |
tgt_lang in GOOGLE_SUPPORTED_LANGUAGES): | |
google_comparable_metrics[metric].append(avg_value) | |
pair_metrics[pair_key]['sample_count'] = len(pair_data) | |
# Calculate overall averages | |
averages = {} | |
for metric in overall_metrics: | |
if overall_metrics[metric]: | |
averages[metric] = float(np.mean(overall_metrics[metric])) | |
else: | |
averages[metric] = 0.0 | |
# Calculate Google comparable averages | |
google_averages = {} | |
for metric in google_comparable_metrics: | |
if google_comparable_metrics[metric]: | |
google_averages[metric] = float(np.mean(google_comparable_metrics[metric])) | |
else: | |
google_averages[metric] = 0.0 | |
# Generate evaluation summary | |
summary = { | |
'total_samples': len(sample_df), | |
'language_pairs_covered': len([k for k in pair_metrics if pair_metrics[k].get('sample_count', 0) > 0]), | |
'google_comparable_pairs': len([k for k in pair_metrics | |
if '_to_' in k and | |
k.split('_to_')[0] in GOOGLE_SUPPORTED_LANGUAGES and | |
k.split('_to_')[1] in GOOGLE_SUPPORTED_LANGUAGES and | |
pair_metrics[k].get('sample_count', 0) > 0]), | |
'primary_metrics': {metric: averages.get(metric, 0.0) | |
for metric in METRICS_CONFIG['primary_metrics']}, | |
'secondary_metrics': {metric: averages.get(metric, 0.0) | |
for metric in METRICS_CONFIG['secondary_metrics']} | |
} | |
return { | |
'sample_metrics': sample_df, | |
'pair_metrics': pair_metrics, | |
'averages': averages, | |
'google_comparable_averages': google_averages, | |
'summary': summary, | |
'evaluated_samples': len(sample_df), | |
'error': None | |
} | |
# Keep the rest of the functions unchanged... | |
def compare_with_baseline(results: Dict, baseline_results: Dict = None) -> Dict: | |
"""Compare results with baseline (e.g., Google Translate).""" | |
if baseline_results is None: | |
return { | |
'comparison_available': False, | |
'message': 'No baseline available for comparison' | |
} | |
comparison = { | |
'comparison_available': True, | |
'overall_comparison': {}, | |
'pair_comparisons': {}, | |
'better_pairs': [], | |
'worse_pairs': [] | |
} | |
# Compare overall metrics | |
for metric in METRICS_CONFIG['primary_metrics']: | |
if metric in results['averages'] and metric in baseline_results['averages']: | |
user_score = results['averages'][metric] | |
baseline_score = baseline_results['averages'][metric] | |
# For error metrics (cer, wer), lower is better | |
if metric in ['cer', 'wer']: | |
improvement = baseline_score - user_score # Positive = improvement | |
else: | |
improvement = user_score - baseline_score # Positive = improvement | |
comparison['overall_comparison'][metric] = { | |
'user_score': user_score, | |
'baseline_score': baseline_score, | |
'improvement': improvement, | |
'improvement_percent': (improvement / max(baseline_score, 0.001)) * 100 | |
} | |
# Compare by language pairs (only Google comparable ones) | |
google_pairs = [k for k in results['pair_metrics'] | |
if '_to_' in k and | |
k.split('_to_')[0] in GOOGLE_SUPPORTED_LANGUAGES and | |
k.split('_to_')[1] in GOOGLE_SUPPORTED_LANGUAGES] | |
for pair in google_pairs: | |
if pair in baseline_results['pair_metrics']: | |
pair_comparison = {} | |
for metric in METRICS_CONFIG['primary_metrics']: | |
if (metric in results['pair_metrics'][pair] and | |
metric in baseline_results['pair_metrics'][pair]): | |
user_score = results['pair_metrics'][pair][metric] | |
baseline_score = baseline_results['pair_metrics'][pair][metric] | |
if metric in ['cer', 'wer']: | |
improvement = baseline_score - user_score | |
else: | |
improvement = user_score - baseline_score | |
pair_comparison[metric] = { | |
'user_score': user_score, | |
'baseline_score': baseline_score, | |
'improvement': improvement | |
} | |
comparison['pair_comparisons'][pair] = pair_comparison | |
# Determine if this pair is better or worse overall | |
quality_improvement = pair_comparison.get('quality_score', {}).get('improvement', 0) | |
if quality_improvement > 0.01: # Threshold for significance | |
comparison['better_pairs'].append(pair) | |
elif quality_improvement < -0.01: | |
comparison['worse_pairs'].append(pair) | |
return comparison | |
def generate_evaluation_report(results: Dict, model_name: str = "", comparison: Dict = None) -> str: | |
"""Generate human-readable evaluation report.""" | |
if results.get('error'): | |
return f"β **Evaluation Error**: {results['error']}" | |
report = [] | |
# Header | |
report.append(f"## Evaluation Report: {model_name or 'Submission'}") | |
report.append("") | |
# Summary | |
summary = results['summary'] | |
report.append("### π Summary") | |
report.append(f"- **Total Samples Evaluated**: {summary['total_samples']:,}") | |
report.append(f"- **Language Pairs Covered**: {summary['language_pairs_covered']}") | |
report.append(f"- **Google Comparable Pairs**: {summary['google_comparable_pairs']}") | |
report.append("") | |
# Primary metrics | |
report.append("### π― Primary Metrics") | |
for metric, value in summary['primary_metrics'].items(): | |
formatted_value = f"{value:.4f}" if metric != 'bleu' else f"{value:.2f}" | |
report.append(f"- **{metric.upper()}**: {formatted_value}") | |
# Quality ranking (if comparison available) | |
if comparison and comparison.get('comparison_available'): | |
quality_comp = comparison['overall_comparison'].get('quality_score', {}) | |
if quality_comp: | |
improvement = quality_comp.get('improvement', 0) | |
if improvement > 0.01: | |
report.append(f" - π’ **{improvement:.3f}** better than baseline") | |
elif improvement < -0.01: | |
report.append(f" - π΄ **{abs(improvement):.3f}** worse than baseline") | |
else: | |
report.append(f" - π‘ Similar to baseline") | |
report.append("") | |
# Secondary metrics | |
report.append("### π Secondary Metrics") | |
for metric, value in summary['secondary_metrics'].items(): | |
formatted_value = f"{value:.4f}" | |
report.append(f"- **{metric.upper()}**: {formatted_value}") | |
report.append("") | |
# Language pair performance (top and bottom 5) | |
pair_metrics = results['pair_metrics'] | |
if pair_metrics: | |
# Sort pairs by quality score | |
sorted_pairs = sorted( | |
[(k, v.get('quality_score', 0)) for k, v in pair_metrics.items() if v.get('sample_count', 0) > 0], | |
key=lambda x: x[1], | |
reverse=True | |
) | |
if sorted_pairs: | |
report.append("### π Best Performing Language Pairs") | |
for pair, score in sorted_pairs[:5]: | |
src, tgt = pair.replace('_to_', ' β ').split(' β ') | |
report.append(f"- **{src} β {tgt}**: {score:.3f}") | |
if len(sorted_pairs) > 5: | |
report.append("") | |
report.append("### π Challenging Language Pairs") | |
for pair, score in sorted_pairs[-3:]: | |
src, tgt = pair.replace('_to_', ' β ').split(' β ') | |
report.append(f"- **{src} β {tgt}**: {score:.3f}") | |
# Comparison with baseline | |
if comparison and comparison.get('comparison_available'): | |
report.append("") | |
report.append("### π Comparison with Baseline") | |
better_count = len(comparison.get('better_pairs', [])) | |
worse_count = len(comparison.get('worse_pairs', [])) | |
total_comparable = len(comparison.get('pair_comparisons', {})) | |
if total_comparable > 0: | |
report.append(f"- **Better than baseline**: {better_count}/{total_comparable} pairs") | |
report.append(f"- **Worse than baseline**: {worse_count}/{total_comparable} pairs") | |
if comparison['better_pairs']: | |
report.append(" - Strong pairs: " + ", ".join(comparison['better_pairs'][:3])) | |
if comparison['worse_pairs']: | |
report.append(" - Weak pairs: " + ", ".join(comparison['worse_pairs'][:3])) | |
return "\n".join(report) | |
def create_sample_analysis(results: Dict, n_samples: int = 10) -> pd.DataFrame: | |
"""Create sample analysis showing best and worst translations.""" | |
if 'sample_metrics' not in results: | |
return pd.DataFrame() | |
sample_df = results['sample_metrics'] | |
# Get best and worst samples by quality score | |
best_samples = sample_df.nlargest(n_samples // 2, 'quality_score') | |
worst_samples = sample_df.nsmallest(n_samples // 2, 'quality_score') | |
analysis_samples = pd.concat([best_samples, worst_samples]) | |
# Add category | |
analysis_samples['category'] = ['Best'] * len(best_samples) + ['Worst'] * len(worst_samples) | |
return analysis_samples[['sample_id', 'source_language', 'target_language', | |
'quality_score', 'bleu', 'chrf', 'category']] | |
def get_google_translate_baseline() -> Dict: | |
"""Get Google Translate baseline results (if available).""" | |
try: | |
# This would load pre-computed Google Translate results | |
# For now, return empty dict - implement when Google Translate baseline is available | |
return {} | |
except: | |
return {} |