leaderboard / src /utils.py
akera's picture
Update src/utils.py
fb1cc27 verified
raw
history blame
23.3 kB
# src/utils.py
import re
import datetime
import pandas as pd
import numpy as np
from typing import Dict, List, Tuple, Set, Optional, Union
from scipy import stats
from config import (
ALL_UG40_LANGUAGES,
GOOGLE_SUPPORTED_LANGUAGES,
LANGUAGE_NAMES,
EVALUATION_TRACKS,
MODEL_CATEGORIES,
STATISTICAL_CONFIG,
METRICS_CONFIG,
SAMPLE_SIZE_RECOMMENDATIONS,
)
def get_all_language_pairs() -> List[Tuple[str, str]]:
"""Get all possible UG40 language pairs."""
pairs = []
for src in ALL_UG40_LANGUAGES:
for tgt in ALL_UG40_LANGUAGES:
if src != tgt:
pairs.append((src, tgt))
return pairs
def get_google_comparable_pairs() -> List[Tuple[str, str]]:
"""Get language pairs that can be compared with Google Translate."""
pairs = []
for src in GOOGLE_SUPPORTED_LANGUAGES:
for tgt in GOOGLE_SUPPORTED_LANGUAGES:
if src != tgt:
pairs.append((src, tgt))
return pairs
def get_track_language_pairs(track: str) -> List[Tuple[str, str]]:
"""Get language pairs for a specific evaluation track."""
if track not in EVALUATION_TRACKS:
return []
track_languages = EVALUATION_TRACKS[track]["languages"]
pairs = []
for src in track_languages:
for tgt in track_languages:
if src != tgt:
pairs.append((src, tgt))
return pairs
def format_language_pair(src: str, tgt: str) -> str:
"""Format language pair for display."""
src_name = LANGUAGE_NAMES.get(src, src.upper())
tgt_name = LANGUAGE_NAMES.get(tgt, tgt.upper())
return f"{src_name}{tgt_name}"
def validate_language_code(lang: str) -> bool:
"""Validate if language code is supported."""
return lang in ALL_UG40_LANGUAGES
def create_submission_id() -> str:
"""Create unique submission ID with timestamp and random component."""
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
random_suffix = str(np.random.randint(1000, 9999))
return f"sub_{timestamp}_{random_suffix}"
def sanitize_model_name(name: str) -> str:
"""Sanitize model name for display and storage with enhanced validation."""
if not name or not isinstance(name, str):
return "Anonymous_Model"
# Remove special characters, limit length
name = re.sub(r"[^\w\-.]", "_", name.strip())
# Remove multiple consecutive underscores
name = re.sub(r"_+", "_", name)
# Remove leading/trailing underscores
name = name.strip("_")
# Ensure minimum length
if len(name) < 3:
name = f"Model_{name}"
# Check for reserved names
reserved_names = ["admin", "test", "baseline", "google", "system"]
if name.lower() in reserved_names:
name = f"User_{name}"
return name[:50] # Limit to 50 characters
def format_metric_value(value: float, metric: str, include_ci: bool = False,
ci_lower: float = None, ci_upper: float = None) -> str:
"""Format metric value for display with optional confidence intervals."""
if pd.isna(value) or value is None:
return "N/A"
try:
precision = METRICS_CONFIG["display_precision"]
if metric == "coverage_rate":
formatted = f"{value:.{precision}%}"
elif metric in ["bleu"]:
formatted = f"{value:.2f}"
elif metric in ["cer", "wer"] and value > 1:
# Cap error rates at 1.0 for display
formatted = f"{min(value, 1.0):.{precision}f}"
else:
formatted = f"{value:.{precision}f}"
# Add confidence interval if requested
if include_ci and ci_lower is not None and ci_upper is not None:
ci_str = f" [{ci_lower:.{precision}f}, {ci_upper:.{precision}f}]"
formatted += ci_str
return formatted
except (ValueError, TypeError):
return str(value)
def calculate_effect_size(values1: List[float], values2: List[float]) -> float:
"""Calculate Cohen's d effect size between two groups."""
if len(values1) < 2 or len(values2) < 2:
return 0.0
try:
values1 = np.array(values1)
values2 = np.array(values2)
# Remove NaN values
values1 = values1[~np.isnan(values1)]
values2 = values2[~np.isnan(values2)]
if len(values1) < 2 or len(values2) < 2:
return 0.0
# Calculate pooled standard deviation
n1, n2 = len(values1), len(values2)
pooled_std = np.sqrt(
((n1 - 1) * np.var(values1, ddof=1) + (n2 - 1) * np.var(values2, ddof=1))
/ (n1 + n2 - 2)
)
if pooled_std == 0:
return 0.0
# Cohen's d
effect_size = (np.mean(values1) - np.mean(values2)) / pooled_std
return abs(effect_size)
except Exception:
return 0.0
def interpret_effect_size(effect_size: float) -> str:
"""Interpret effect size according to Cohen's conventions."""
thresholds = STATISTICAL_CONFIG["effect_size_thresholds"]
if effect_size < thresholds["small"]:
return "negligible"
elif effect_size < thresholds["medium"]:
return "small"
elif effect_size < thresholds["large"]:
return "medium"
else:
return "large"
def calculate_statistical_power(
effect_size: float, n1: int, n2: int, alpha: float = 0.05
) -> float:
"""Estimate statistical power for given effect size and sample sizes."""
if n1 < 2 or n2 < 2:
return 0.0
try:
# Simplified power calculation using t-test
# This is an approximation
df = n1 + n2 - 2
pooled_se = np.sqrt((1/n1) + (1/n2))
# Critical t-value
t_critical = stats.t.ppf(1 - alpha/2, df)
# Non-centrality parameter
ncp = effect_size / pooled_se
# Power (approximate)
power = 1 - stats.t.cdf(t_critical, df, loc=ncp) + stats.t.cdf(-t_critical, df, loc=ncp)
return min(1.0, max(0.0, power))
except Exception:
return 0.0
def get_track_statistics(test_data: pd.DataFrame) -> Dict[str, Dict]:
"""Get comprehensive statistics about test data coverage for each track."""
track_stats = {}
for track_name, track_config in EVALUATION_TRACKS.items():
track_languages = track_config["languages"]
# Filter test data to track languages
track_data = test_data[
(test_data["source_language"].isin(track_languages)) &
(test_data["target_language"].isin(track_languages))
]
if track_data.empty:
track_stats[track_name] = {
"total_samples": 0,
"language_pairs": 0,
"samples_per_pair": {},
"coverage_matrix": {},
"adequacy_assessment": "insufficient",
}
continue
# Calculate pair-wise statistics
pair_counts = {}
for src in track_languages:
for tgt in track_languages:
if src == tgt:
continue
pair_data = track_data[
(track_data["source_language"] == src) &
(track_data["target_language"] == tgt)
]
pair_key = f"{src}_to_{tgt}"
pair_counts[pair_key] = len(pair_data)
# Calculate adequacy
min_required = track_config["min_samples_per_pair"]
adequate_pairs = sum(1 for count in pair_counts.values() if count >= min_required)
total_possible_pairs = len(track_languages) * (len(track_languages) - 1)
adequacy_rate = adequate_pairs / max(total_possible_pairs, 1)
if adequacy_rate >= 0.8:
adequacy = "excellent"
elif adequacy_rate >= 0.6:
adequacy = "good"
elif adequacy_rate >= 0.4:
adequacy = "fair"
else:
adequacy = "insufficient"
track_stats[track_name] = {
"total_samples": len(track_data),
"language_pairs": len([k for k, v in pair_counts.items() if v > 0]),
"samples_per_pair": pair_counts,
"coverage_matrix": pair_counts,
"adequacy_assessment": adequacy,
"adequacy_rate": adequacy_rate,
"min_samples_per_pair": min_required,
}
return track_stats
def validate_submission_completeness_scientific(
predictions: pd.DataFrame, test_set: pd.DataFrame, track: str = None
) -> Dict:
"""Enhanced validation with track-specific analysis."""
if predictions.empty or test_set.empty:
return {
"is_complete": False,
"missing_count": len(test_set) if not test_set.empty else 0,
"extra_count": len(predictions) if not predictions.empty else 0,
"missing_ids": [],
"coverage": 0.0,
"track_analysis": {},
}
# If track specified, filter to track languages
if track and track in EVALUATION_TRACKS:
track_languages = EVALUATION_TRACKS[track]["languages"]
test_set = test_set[
(test_set["source_language"].isin(track_languages)) &
(test_set["target_language"].isin(track_languages))
]
try:
required_ids = set(test_set["sample_id"].astype(str))
provided_ids = set(predictions["sample_id"].astype(str))
missing_ids = required_ids - provided_ids
extra_ids = provided_ids - required_ids
matching_ids = provided_ids & required_ids
base_result = {
"is_complete": len(missing_ids) == 0,
"missing_count": len(missing_ids),
"extra_count": len(extra_ids),
"missing_ids": list(missing_ids)[:10],
"coverage": len(matching_ids) / len(required_ids) if required_ids else 0.0,
}
# Add track-specific analysis if requested
if track:
track_analysis = analyze_track_coverage(predictions, test_set, track)
base_result["track_analysis"] = track_analysis
return base_result
except Exception as e:
print(f"Error in submission completeness validation: {e}")
return {
"is_complete": False,
"missing_count": 0,
"extra_count": 0,
"missing_ids": [],
"coverage": 0.0,
"track_analysis": {},
}
def analyze_track_coverage(
predictions: pd.DataFrame, test_set: pd.DataFrame, track: str
) -> Dict:
"""Analyze coverage for a specific track."""
if track not in EVALUATION_TRACKS:
return {"error": f"Unknown track: {track}"}
track_config = EVALUATION_TRACKS[track]
track_languages = track_config["languages"]
# Filter test set to track languages
track_test_set = test_set[
(test_set["source_language"].isin(track_languages)) &
(test_set["target_language"].isin(track_languages))
]
if track_test_set.empty:
return {"error": f"No test data available for {track} track"}
# Merge with predictions
merged = track_test_set.merge(predictions, on="sample_id", how="left", suffixes=("", "_pred"))
# Analyze by language pair
pair_analysis = {}
for src in track_languages:
for tgt in track_languages:
if src == tgt:
continue
pair_data = merged[
(merged["source_language"] == src) &
(merged["target_language"] == tgt)
]
if len(pair_data) > 0:
covered = pair_data["prediction"].notna().sum()
pair_analysis[f"{src}_to_{tgt}"] = {
"total": len(pair_data),
"covered": covered,
"coverage_rate": covered / len(pair_data),
"meets_minimum": covered >= track_config["min_samples_per_pair"],
}
# Overall track statistics
total_pairs = len(pair_analysis)
adequate_pairs = sum(1 for info in pair_analysis.values() if info["meets_minimum"])
return {
"track_name": track_config["name"],
"total_language_pairs": total_pairs,
"adequate_pairs": adequate_pairs,
"adequacy_rate": adequate_pairs / max(total_pairs, 1),
"pair_analysis": pair_analysis,
"overall_adequate": adequate_pairs >= total_pairs * 0.8, # 80% of pairs adequate
}
def calculate_language_pair_coverage_scientific(
predictions: pd.DataFrame, test_set: pd.DataFrame
) -> Dict:
"""Calculate comprehensive language pair coverage with statistical metrics."""
if predictions.empty or test_set.empty:
return {}
try:
# Merge to get language info
merged = test_set.merge(predictions, on="sample_id", how="left", suffixes=("", "_pred"))
coverage = {}
for src in ALL_UG40_LANGUAGES:
for tgt in ALL_UG40_LANGUAGES:
if src == tgt:
continue
pair_data = merged[
(merged["source_language"] == src) &
(merged["target_language"] == tgt)
]
if len(pair_data) > 0:
predicted_count = pair_data["prediction"].notna().sum()
coverage_rate = predicted_count / len(pair_data)
# Determine which tracks include this pair
tracks_included = []
for track_name, track_config in EVALUATION_TRACKS.items():
if src in track_config["languages"] and tgt in track_config["languages"]:
tracks_included.append(track_name)
coverage[f"{src}_{tgt}"] = {
"total": len(pair_data),
"predicted": predicted_count,
"coverage": coverage_rate,
"display_name": format_language_pair(src, tgt),
"tracks_included": tracks_included,
"google_comparable": (
src in GOOGLE_SUPPORTED_LANGUAGES and
tgt in GOOGLE_SUPPORTED_LANGUAGES
),
"statistical_adequacy": {
track: predicted_count >= EVALUATION_TRACKS[track]["min_samples_per_pair"]
for track in tracks_included
},
}
return coverage
except Exception as e:
print(f"Error calculating language pair coverage: {e}")
return {}
def safe_divide(numerator: float, denominator: float, default: float = 0.0) -> float:
"""Safely divide two numbers, handling edge cases."""
try:
if denominator == 0 or pd.isna(denominator) or pd.isna(numerator):
return default
result = numerator / denominator
if pd.isna(result) or not np.isfinite(result):
return default
return float(result)
except (TypeError, ValueError, ZeroDivisionError):
return default
def clean_text_for_evaluation(text: str) -> str:
"""Clean text for evaluation, handling common encoding issues."""
if not isinstance(text, str):
return str(text) if text is not None else ""
# Remove extra whitespace
text = re.sub(r"\s+", " ", text.strip())
# Handle common encoding issues
text = text.replace("\u00a0", " ") # Non-breaking space
text = text.replace("\u2019", "'") # Right single quotation mark
text = text.replace("\u201c", '"') # Left double quotation mark
text = text.replace("\u201d", '"') # Right double quotation mark
return text
def get_model_summary_stats_scientific(model_results: Dict, track: str = None) -> Dict:
"""Extract comprehensive summary statistics from model evaluation results."""
if not model_results or "tracks" not in model_results:
return {}
tracks = model_results["tracks"]
# If specific track requested
if track and track in tracks:
track_data = tracks[track]
if track_data.get("error"):
return {"error": f"No valid data for {track} track"}
track_averages = track_data.get("track_averages", {})
track_statistics = track_data.get("track_statistics", {})
summary = track_data.get("summary", {})
stats = {
"track": track,
"track_name": EVALUATION_TRACKS[track]["name"],
"quality_score": track_averages.get("quality_score", 0.0),
"bleu": track_averages.get("bleu", 0.0),
"chrf": track_averages.get("chrf", 0.0),
"total_samples": summary.get("total_samples", 0),
"language_pairs": summary.get("language_pairs_evaluated", 0),
"statistical_adequacy": summary.get("total_samples", 0) >= 100, # Simple threshold
}
# Add confidence intervals if available
if "quality_score" in track_statistics:
quality_stats = track_statistics["quality_score"]
stats["confidence_interval"] = [
quality_stats.get("ci_lower", 0.0),
quality_stats.get("ci_upper", 0.0),
]
return stats
# Otherwise, return summary across all tracks
all_tracks_summary = {
"tracks_evaluated": len([t for t in tracks.values() if not t.get("error")]),
"total_tracks": len(EVALUATION_TRACKS),
"by_track": {},
}
for track_name, track_data in tracks.items():
if not track_data.get("error"):
track_averages = track_data.get("track_averages", {})
summary = track_data.get("summary", {})
all_tracks_summary["by_track"][track_name] = {
"quality_score": track_averages.get("quality_score", 0.0),
"samples": summary.get("total_samples", 0),
"pairs": summary.get("language_pairs_evaluated", 0),
}
return all_tracks_summary
def generate_model_identifier_scientific(
model_name: str, author: str, category: str
) -> str:
"""Generate a unique scientific identifier for a model."""
clean_name = sanitize_model_name(model_name)
clean_author = re.sub(r"[^\w\-]", "_", author.strip())[:20] if author else "Anonymous"
clean_category = category[:10] if category in MODEL_CATEGORIES else "community"
timestamp = datetime.datetime.now().strftime("%m%d_%H%M")
return f"{clean_category}_{clean_name}_{clean_author}_{timestamp}"
def validate_dataframe_structure_enhanced(
df: pd.DataFrame, required_columns: List[str], track: str = None
) -> Tuple[bool, List[str]]:
"""Enhanced DataFrame structure validation with track-specific checks."""
if df.empty:
return False, ["DataFrame is empty"]
issues = []
# Check required columns
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
issues.append(f"Missing columns: {', '.join(missing_columns)}")
# Check for track-specific requirements
if track and track in EVALUATION_TRACKS:
track_config = EVALUATION_TRACKS[track]
min_samples = track_config.get("min_samples_per_pair", 10)
# Check sample size adequacy
if len(df) < min_samples * 5: # At least 5 pairs worth of data
issues.append(f"Insufficient samples for {track} track (minimum ~{min_samples * 5})")
# Check data types
if "sample_id" in df.columns:
if not df["sample_id"].dtype == "object":
try:
df["sample_id"] = df["sample_id"].astype(str)
except Exception:
issues.append("Cannot convert sample_id to string")
return len(issues) == 0, issues
def format_duration(seconds: float) -> str:
"""Format duration in seconds to human-readable format."""
if seconds < 60:
return f"{seconds:.1f}s"
elif seconds < 3600:
return f"{seconds/60:.1f}m"
else:
return f"{seconds/3600:.1f}h"
def truncate_text(text: str, max_length: int = 100, suffix: str = "...") -> str:
"""Truncate text to specified length with suffix."""
if not isinstance(text, str):
text = str(text)
if len(text) <= max_length:
return text
return text[: max_length - len(suffix)] + suffix
def calculate_sample_size_recommendation(
desired_power: float = 0.8, effect_size: float = 0.5, alpha: float = 0.05
) -> int:
"""Calculate recommended sample size for statistical analysis."""
try:
# Simplified sample size calculation for t-test
# This is an approximation using Cohen's conventions
z_alpha = stats.norm.ppf(1 - alpha / 2)
z_beta = stats.norm.ppf(desired_power)
# Sample size per group
n_per_group = 2 * ((z_alpha + z_beta) / effect_size) ** 2
# Round up to nearest integer
return max(10, int(np.ceil(n_per_group)))
except Exception:
return 50 # Default fallback
def assess_model_category_appropriateness(
model_name: str, category: str, performance_data: Dict
) -> Dict:
"""Assess if the detected/assigned model category is appropriate."""
assessment = {
"category": category,
"appropriate": True,
"confidence": 1.0,
"recommendations": [],
}
# Check for category mismatches based on performance
if category == "baseline" and performance_data:
# Baselines shouldn't perform too well
quality_scores = []
for track_data in performance_data.get("tracks", {}).values():
if not track_data.get("error"):
quality_scores.append(track_data.get("track_averages", {}).get("quality_score", 0))
if quality_scores and max(quality_scores) > 0.7: # High performance for baseline
assessment["appropriate"] = False
assessment["confidence"] = 0.3
assessment["recommendations"].append(
"High performance suggests this might not be a baseline model"
)
# Check for commercial model expectations
if category == "commercial":
# Commercial models should have good Google-comparable performance
google_track = performance_data.get("tracks", {}).get("google_comparable", {})
if not google_track.get("error"):
quality = google_track.get("track_averages", {}).get("quality_score", 0)
if quality < 0.3: # Poor performance for commercial
assessment["recommendations"].append(
"Low performance unexpected for commercial systems"
)
return assessment