Spaces:
Sleeping
Sleeping
# src/utils.py | |
import re | |
import datetime | |
import pandas as pd | |
import numpy as np | |
from typing import Dict, List, Tuple, Set, Optional, Union | |
from scipy import stats | |
from config import ( | |
ALL_UG40_LANGUAGES, | |
GOOGLE_SUPPORTED_LANGUAGES, | |
LANGUAGE_NAMES, | |
EVALUATION_TRACKS, | |
MODEL_CATEGORIES, | |
STATISTICAL_CONFIG, | |
METRICS_CONFIG, | |
SAMPLE_SIZE_RECOMMENDATIONS, | |
) | |
def get_all_language_pairs() -> List[Tuple[str, str]]: | |
"""Get all possible UG40 language pairs.""" | |
pairs = [] | |
for src in ALL_UG40_LANGUAGES: | |
for tgt in ALL_UG40_LANGUAGES: | |
if src != tgt: | |
pairs.append((src, tgt)) | |
return pairs | |
def get_google_comparable_pairs() -> List[Tuple[str, str]]: | |
"""Get language pairs that can be compared with Google Translate.""" | |
pairs = [] | |
for src in GOOGLE_SUPPORTED_LANGUAGES: | |
for tgt in GOOGLE_SUPPORTED_LANGUAGES: | |
if src != tgt: | |
pairs.append((src, tgt)) | |
return pairs | |
def get_track_language_pairs(track: str) -> List[Tuple[str, str]]: | |
"""Get language pairs for a specific evaluation track.""" | |
if track not in EVALUATION_TRACKS: | |
return [] | |
track_languages = EVALUATION_TRACKS[track]["languages"] | |
pairs = [] | |
for src in track_languages: | |
for tgt in track_languages: | |
if src != tgt: | |
pairs.append((src, tgt)) | |
return pairs | |
def format_language_pair(src: str, tgt: str) -> str: | |
"""Format language pair for display.""" | |
src_name = LANGUAGE_NAMES.get(src, src.upper()) | |
tgt_name = LANGUAGE_NAMES.get(tgt, tgt.upper()) | |
return f"{src_name} → {tgt_name}" | |
def validate_language_code(lang: str) -> bool: | |
"""Validate if language code is supported.""" | |
return lang in ALL_UG40_LANGUAGES | |
def create_submission_id() -> str: | |
"""Create unique submission ID with timestamp and random component.""" | |
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") | |
random_suffix = str(np.random.randint(1000, 9999)) | |
return f"sub_{timestamp}_{random_suffix}" | |
def sanitize_model_name(name: str) -> str: | |
"""Sanitize model name for display and storage with enhanced validation.""" | |
if not name or not isinstance(name, str): | |
return "Anonymous_Model" | |
# Remove special characters, limit length | |
name = re.sub(r"[^\w\-.]", "_", name.strip()) | |
# Remove multiple consecutive underscores | |
name = re.sub(r"_+", "_", name) | |
# Remove leading/trailing underscores | |
name = name.strip("_") | |
# Ensure minimum length | |
if len(name) < 3: | |
name = f"Model_{name}" | |
# Check for reserved names | |
reserved_names = ["admin", "test", "baseline", "google", "system"] | |
if name.lower() in reserved_names: | |
name = f"User_{name}" | |
return name[:50] # Limit to 50 characters | |
def format_metric_value(value: float, metric: str, include_ci: bool = False, | |
ci_lower: float = None, ci_upper: float = None) -> str: | |
"""Format metric value for display with optional confidence intervals.""" | |
if pd.isna(value) or value is None: | |
return "N/A" | |
try: | |
precision = METRICS_CONFIG["display_precision"] | |
if metric == "coverage_rate": | |
formatted = f"{value:.{precision}%}" | |
elif metric in ["bleu"]: | |
formatted = f"{value:.2f}" | |
elif metric in ["cer", "wer"] and value > 1: | |
# Cap error rates at 1.0 for display | |
formatted = f"{min(value, 1.0):.{precision}f}" | |
else: | |
formatted = f"{value:.{precision}f}" | |
# Add confidence interval if requested | |
if include_ci and ci_lower is not None and ci_upper is not None: | |
ci_str = f" [{ci_lower:.{precision}f}, {ci_upper:.{precision}f}]" | |
formatted += ci_str | |
return formatted | |
except (ValueError, TypeError): | |
return str(value) | |
def calculate_effect_size(values1: List[float], values2: List[float]) -> float: | |
"""Calculate Cohen's d effect size between two groups.""" | |
if len(values1) < 2 or len(values2) < 2: | |
return 0.0 | |
try: | |
values1 = np.array(values1) | |
values2 = np.array(values2) | |
# Remove NaN values | |
values1 = values1[~np.isnan(values1)] | |
values2 = values2[~np.isnan(values2)] | |
if len(values1) < 2 or len(values2) < 2: | |
return 0.0 | |
# Calculate pooled standard deviation | |
n1, n2 = len(values1), len(values2) | |
pooled_std = np.sqrt( | |
((n1 - 1) * np.var(values1, ddof=1) + (n2 - 1) * np.var(values2, ddof=1)) | |
/ (n1 + n2 - 2) | |
) | |
if pooled_std == 0: | |
return 0.0 | |
# Cohen's d | |
effect_size = (np.mean(values1) - np.mean(values2)) / pooled_std | |
return abs(effect_size) | |
except Exception: | |
return 0.0 | |
def interpret_effect_size(effect_size: float) -> str: | |
"""Interpret effect size according to Cohen's conventions.""" | |
thresholds = STATISTICAL_CONFIG["effect_size_thresholds"] | |
if effect_size < thresholds["small"]: | |
return "negligible" | |
elif effect_size < thresholds["medium"]: | |
return "small" | |
elif effect_size < thresholds["large"]: | |
return "medium" | |
else: | |
return "large" | |
def calculate_statistical_power( | |
effect_size: float, n1: int, n2: int, alpha: float = 0.05 | |
) -> float: | |
"""Estimate statistical power for given effect size and sample sizes.""" | |
if n1 < 2 or n2 < 2: | |
return 0.0 | |
try: | |
# Simplified power calculation using t-test | |
# This is an approximation | |
df = n1 + n2 - 2 | |
pooled_se = np.sqrt((1/n1) + (1/n2)) | |
# Critical t-value | |
t_critical = stats.t.ppf(1 - alpha/2, df) | |
# Non-centrality parameter | |
ncp = effect_size / pooled_se | |
# Power (approximate) | |
power = 1 - stats.t.cdf(t_critical, df, loc=ncp) + stats.t.cdf(-t_critical, df, loc=ncp) | |
return min(1.0, max(0.0, power)) | |
except Exception: | |
return 0.0 | |
def get_track_statistics(test_data: pd.DataFrame) -> Dict[str, Dict]: | |
"""Get comprehensive statistics about test data coverage for each track.""" | |
track_stats = {} | |
for track_name, track_config in EVALUATION_TRACKS.items(): | |
track_languages = track_config["languages"] | |
# Filter test data to track languages | |
track_data = test_data[ | |
(test_data["source_language"].isin(track_languages)) & | |
(test_data["target_language"].isin(track_languages)) | |
] | |
if track_data.empty: | |
track_stats[track_name] = { | |
"total_samples": 0, | |
"language_pairs": 0, | |
"samples_per_pair": {}, | |
"coverage_matrix": {}, | |
"adequacy_assessment": "insufficient", | |
} | |
continue | |
# Calculate pair-wise statistics | |
pair_counts = {} | |
for src in track_languages: | |
for tgt in track_languages: | |
if src == tgt: | |
continue | |
pair_data = track_data[ | |
(track_data["source_language"] == src) & | |
(track_data["target_language"] == tgt) | |
] | |
pair_key = f"{src}_to_{tgt}" | |
pair_counts[pair_key] = len(pair_data) | |
# Calculate adequacy | |
min_required = track_config["min_samples_per_pair"] | |
adequate_pairs = sum(1 for count in pair_counts.values() if count >= min_required) | |
total_possible_pairs = len(track_languages) * (len(track_languages) - 1) | |
adequacy_rate = adequate_pairs / max(total_possible_pairs, 1) | |
if adequacy_rate >= 0.8: | |
adequacy = "excellent" | |
elif adequacy_rate >= 0.6: | |
adequacy = "good" | |
elif adequacy_rate >= 0.4: | |
adequacy = "fair" | |
else: | |
adequacy = "insufficient" | |
track_stats[track_name] = { | |
"total_samples": len(track_data), | |
"language_pairs": len([k for k, v in pair_counts.items() if v > 0]), | |
"samples_per_pair": pair_counts, | |
"coverage_matrix": pair_counts, | |
"adequacy_assessment": adequacy, | |
"adequacy_rate": adequacy_rate, | |
"min_samples_per_pair": min_required, | |
} | |
return track_stats | |
def validate_submission_completeness_scientific( | |
predictions: pd.DataFrame, test_set: pd.DataFrame, track: str = None | |
) -> Dict: | |
"""Enhanced validation with track-specific analysis.""" | |
if predictions.empty or test_set.empty: | |
return { | |
"is_complete": False, | |
"missing_count": len(test_set) if not test_set.empty else 0, | |
"extra_count": len(predictions) if not predictions.empty else 0, | |
"missing_ids": [], | |
"coverage": 0.0, | |
"track_analysis": {}, | |
} | |
# If track specified, filter to track languages | |
if track and track in EVALUATION_TRACKS: | |
track_languages = EVALUATION_TRACKS[track]["languages"] | |
test_set = test_set[ | |
(test_set["source_language"].isin(track_languages)) & | |
(test_set["target_language"].isin(track_languages)) | |
] | |
try: | |
required_ids = set(test_set["sample_id"].astype(str)) | |
provided_ids = set(predictions["sample_id"].astype(str)) | |
missing_ids = required_ids - provided_ids | |
extra_ids = provided_ids - required_ids | |
matching_ids = provided_ids & required_ids | |
base_result = { | |
"is_complete": len(missing_ids) == 0, | |
"missing_count": len(missing_ids), | |
"extra_count": len(extra_ids), | |
"missing_ids": list(missing_ids)[:10], | |
"coverage": len(matching_ids) / len(required_ids) if required_ids else 0.0, | |
} | |
# Add track-specific analysis if requested | |
if track: | |
track_analysis = analyze_track_coverage(predictions, test_set, track) | |
base_result["track_analysis"] = track_analysis | |
return base_result | |
except Exception as e: | |
print(f"Error in submission completeness validation: {e}") | |
return { | |
"is_complete": False, | |
"missing_count": 0, | |
"extra_count": 0, | |
"missing_ids": [], | |
"coverage": 0.0, | |
"track_analysis": {}, | |
} | |
def analyze_track_coverage( | |
predictions: pd.DataFrame, test_set: pd.DataFrame, track: str | |
) -> Dict: | |
"""Analyze coverage for a specific track.""" | |
if track not in EVALUATION_TRACKS: | |
return {"error": f"Unknown track: {track}"} | |
track_config = EVALUATION_TRACKS[track] | |
track_languages = track_config["languages"] | |
# Filter test set to track languages | |
track_test_set = test_set[ | |
(test_set["source_language"].isin(track_languages)) & | |
(test_set["target_language"].isin(track_languages)) | |
] | |
if track_test_set.empty: | |
return {"error": f"No test data available for {track} track"} | |
# Merge with predictions | |
merged = track_test_set.merge(predictions, on="sample_id", how="left", suffixes=("", "_pred")) | |
# Analyze by language pair | |
pair_analysis = {} | |
for src in track_languages: | |
for tgt in track_languages: | |
if src == tgt: | |
continue | |
pair_data = merged[ | |
(merged["source_language"] == src) & | |
(merged["target_language"] == tgt) | |
] | |
if len(pair_data) > 0: | |
covered = pair_data["prediction"].notna().sum() | |
pair_analysis[f"{src}_to_{tgt}"] = { | |
"total": len(pair_data), | |
"covered": covered, | |
"coverage_rate": covered / len(pair_data), | |
"meets_minimum": covered >= track_config["min_samples_per_pair"], | |
} | |
# Overall track statistics | |
total_pairs = len(pair_analysis) | |
adequate_pairs = sum(1 for info in pair_analysis.values() if info["meets_minimum"]) | |
return { | |
"track_name": track_config["name"], | |
"total_language_pairs": total_pairs, | |
"adequate_pairs": adequate_pairs, | |
"adequacy_rate": adequate_pairs / max(total_pairs, 1), | |
"pair_analysis": pair_analysis, | |
"overall_adequate": adequate_pairs >= total_pairs * 0.8, # 80% of pairs adequate | |
} | |
def calculate_language_pair_coverage_scientific( | |
predictions: pd.DataFrame, test_set: pd.DataFrame | |
) -> Dict: | |
"""Calculate comprehensive language pair coverage with statistical metrics.""" | |
if predictions.empty or test_set.empty: | |
return {} | |
try: | |
# Merge to get language info | |
merged = test_set.merge(predictions, on="sample_id", how="left", suffixes=("", "_pred")) | |
coverage = {} | |
for src in ALL_UG40_LANGUAGES: | |
for tgt in ALL_UG40_LANGUAGES: | |
if src == tgt: | |
continue | |
pair_data = merged[ | |
(merged["source_language"] == src) & | |
(merged["target_language"] == tgt) | |
] | |
if len(pair_data) > 0: | |
predicted_count = pair_data["prediction"].notna().sum() | |
coverage_rate = predicted_count / len(pair_data) | |
# Determine which tracks include this pair | |
tracks_included = [] | |
for track_name, track_config in EVALUATION_TRACKS.items(): | |
if src in track_config["languages"] and tgt in track_config["languages"]: | |
tracks_included.append(track_name) | |
coverage[f"{src}_{tgt}"] = { | |
"total": len(pair_data), | |
"predicted": predicted_count, | |
"coverage": coverage_rate, | |
"display_name": format_language_pair(src, tgt), | |
"tracks_included": tracks_included, | |
"google_comparable": ( | |
src in GOOGLE_SUPPORTED_LANGUAGES and | |
tgt in GOOGLE_SUPPORTED_LANGUAGES | |
), | |
"statistical_adequacy": { | |
track: predicted_count >= EVALUATION_TRACKS[track]["min_samples_per_pair"] | |
for track in tracks_included | |
}, | |
} | |
return coverage | |
except Exception as e: | |
print(f"Error calculating language pair coverage: {e}") | |
return {} | |
def safe_divide(numerator: float, denominator: float, default: float = 0.0) -> float: | |
"""Safely divide two numbers, handling edge cases.""" | |
try: | |
if denominator == 0 or pd.isna(denominator) or pd.isna(numerator): | |
return default | |
result = numerator / denominator | |
if pd.isna(result) or not np.isfinite(result): | |
return default | |
return float(result) | |
except (TypeError, ValueError, ZeroDivisionError): | |
return default | |
def clean_text_for_evaluation(text: str) -> str: | |
"""Clean text for evaluation, handling common encoding issues.""" | |
if not isinstance(text, str): | |
return str(text) if text is not None else "" | |
# Remove extra whitespace | |
text = re.sub(r"\s+", " ", text.strip()) | |
# Handle common encoding issues | |
text = text.replace("\u00a0", " ") # Non-breaking space | |
text = text.replace("\u2019", "'") # Right single quotation mark | |
text = text.replace("\u201c", '"') # Left double quotation mark | |
text = text.replace("\u201d", '"') # Right double quotation mark | |
return text | |
def get_model_summary_stats_scientific(model_results: Dict, track: str = None) -> Dict: | |
"""Extract comprehensive summary statistics from model evaluation results.""" | |
if not model_results or "tracks" not in model_results: | |
return {} | |
tracks = model_results["tracks"] | |
# If specific track requested | |
if track and track in tracks: | |
track_data = tracks[track] | |
if track_data.get("error"): | |
return {"error": f"No valid data for {track} track"} | |
track_averages = track_data.get("track_averages", {}) | |
track_statistics = track_data.get("track_statistics", {}) | |
summary = track_data.get("summary", {}) | |
stats = { | |
"track": track, | |
"track_name": EVALUATION_TRACKS[track]["name"], | |
"quality_score": track_averages.get("quality_score", 0.0), | |
"bleu": track_averages.get("bleu", 0.0), | |
"chrf": track_averages.get("chrf", 0.0), | |
"total_samples": summary.get("total_samples", 0), | |
"language_pairs": summary.get("language_pairs_evaluated", 0), | |
"statistical_adequacy": summary.get("total_samples", 0) >= 100, # Simple threshold | |
} | |
# Add confidence intervals if available | |
if "quality_score" in track_statistics: | |
quality_stats = track_statistics["quality_score"] | |
stats["confidence_interval"] = [ | |
quality_stats.get("ci_lower", 0.0), | |
quality_stats.get("ci_upper", 0.0), | |
] | |
return stats | |
# Otherwise, return summary across all tracks | |
all_tracks_summary = { | |
"tracks_evaluated": len([t for t in tracks.values() if not t.get("error")]), | |
"total_tracks": len(EVALUATION_TRACKS), | |
"by_track": {}, | |
} | |
for track_name, track_data in tracks.items(): | |
if not track_data.get("error"): | |
track_averages = track_data.get("track_averages", {}) | |
summary = track_data.get("summary", {}) | |
all_tracks_summary["by_track"][track_name] = { | |
"quality_score": track_averages.get("quality_score", 0.0), | |
"samples": summary.get("total_samples", 0), | |
"pairs": summary.get("language_pairs_evaluated", 0), | |
} | |
return all_tracks_summary | |
def generate_model_identifier_scientific( | |
model_name: str, author: str, category: str | |
) -> str: | |
"""Generate a unique scientific identifier for a model.""" | |
clean_name = sanitize_model_name(model_name) | |
clean_author = re.sub(r"[^\w\-]", "_", author.strip())[:20] if author else "Anonymous" | |
clean_category = category[:10] if category in MODEL_CATEGORIES else "community" | |
timestamp = datetime.datetime.now().strftime("%m%d_%H%M") | |
return f"{clean_category}_{clean_name}_{clean_author}_{timestamp}" | |
def validate_dataframe_structure_enhanced( | |
df: pd.DataFrame, required_columns: List[str], track: str = None | |
) -> Tuple[bool, List[str]]: | |
"""Enhanced DataFrame structure validation with track-specific checks.""" | |
if df.empty: | |
return False, ["DataFrame is empty"] | |
issues = [] | |
# Check required columns | |
missing_columns = [col for col in required_columns if col not in df.columns] | |
if missing_columns: | |
issues.append(f"Missing columns: {', '.join(missing_columns)}") | |
# Check for track-specific requirements | |
if track and track in EVALUATION_TRACKS: | |
track_config = EVALUATION_TRACKS[track] | |
min_samples = track_config.get("min_samples_per_pair", 10) | |
# Check sample size adequacy | |
if len(df) < min_samples * 5: # At least 5 pairs worth of data | |
issues.append(f"Insufficient samples for {track} track (minimum ~{min_samples * 5})") | |
# Check data types | |
if "sample_id" in df.columns: | |
if not df["sample_id"].dtype == "object": | |
try: | |
df["sample_id"] = df["sample_id"].astype(str) | |
except Exception: | |
issues.append("Cannot convert sample_id to string") | |
return len(issues) == 0, issues | |
def format_duration(seconds: float) -> str: | |
"""Format duration in seconds to human-readable format.""" | |
if seconds < 60: | |
return f"{seconds:.1f}s" | |
elif seconds < 3600: | |
return f"{seconds/60:.1f}m" | |
else: | |
return f"{seconds/3600:.1f}h" | |
def truncate_text(text: str, max_length: int = 100, suffix: str = "...") -> str: | |
"""Truncate text to specified length with suffix.""" | |
if not isinstance(text, str): | |
text = str(text) | |
if len(text) <= max_length: | |
return text | |
return text[: max_length - len(suffix)] + suffix | |
def calculate_sample_size_recommendation( | |
desired_power: float = 0.8, effect_size: float = 0.5, alpha: float = 0.05 | |
) -> int: | |
"""Calculate recommended sample size for statistical analysis.""" | |
try: | |
# Simplified sample size calculation for t-test | |
# This is an approximation using Cohen's conventions | |
z_alpha = stats.norm.ppf(1 - alpha / 2) | |
z_beta = stats.norm.ppf(desired_power) | |
# Sample size per group | |
n_per_group = 2 * ((z_alpha + z_beta) / effect_size) ** 2 | |
# Round up to nearest integer | |
return max(10, int(np.ceil(n_per_group))) | |
except Exception: | |
return 50 # Default fallback | |
def assess_model_category_appropriateness( | |
model_name: str, category: str, performance_data: Dict | |
) -> Dict: | |
"""Assess if the detected/assigned model category is appropriate.""" | |
assessment = { | |
"category": category, | |
"appropriate": True, | |
"confidence": 1.0, | |
"recommendations": [], | |
} | |
# Check for category mismatches based on performance | |
if category == "baseline" and performance_data: | |
# Baselines shouldn't perform too well | |
quality_scores = [] | |
for track_data in performance_data.get("tracks", {}).values(): | |
if not track_data.get("error"): | |
quality_scores.append(track_data.get("track_averages", {}).get("quality_score", 0)) | |
if quality_scores and max(quality_scores) > 0.7: # High performance for baseline | |
assessment["appropriate"] = False | |
assessment["confidence"] = 0.3 | |
assessment["recommendations"].append( | |
"High performance suggests this might not be a baseline model" | |
) | |
# Check for commercial model expectations | |
if category == "commercial": | |
# Commercial models should have good Google-comparable performance | |
google_track = performance_data.get("tracks", {}).get("google_comparable", {}) | |
if not google_track.get("error"): | |
quality = google_track.get("track_averages", {}).get("quality_score", 0) | |
if quality < 0.3: # Poor performance for commercial | |
assessment["recommendations"].append( | |
"Low performance unexpected for commercial systems" | |
) | |
return assessment |