# src/utils.py import re import datetime import pandas as pd import numpy as np from typing import Dict, List, Tuple, Set, Optional, Union from scipy import stats from config import ( ALL_UG40_LANGUAGES, GOOGLE_SUPPORTED_LANGUAGES, LANGUAGE_NAMES, EVALUATION_TRACKS, MODEL_CATEGORIES, STATISTICAL_CONFIG, METRICS_CONFIG, SAMPLE_SIZE_RECOMMENDATIONS, ) def get_all_language_pairs() -> List[Tuple[str, str]]: """Get all possible UG40 language pairs.""" pairs = [] for src in ALL_UG40_LANGUAGES: for tgt in ALL_UG40_LANGUAGES: if src != tgt: pairs.append((src, tgt)) return pairs def get_google_comparable_pairs() -> List[Tuple[str, str]]: """Get language pairs that can be compared with Google Translate.""" pairs = [] for src in GOOGLE_SUPPORTED_LANGUAGES: for tgt in GOOGLE_SUPPORTED_LANGUAGES: if src != tgt: pairs.append((src, tgt)) return pairs def get_track_language_pairs(track: str) -> List[Tuple[str, str]]: """Get language pairs for a specific evaluation track.""" if track not in EVALUATION_TRACKS: return [] track_languages = EVALUATION_TRACKS[track]["languages"] pairs = [] for src in track_languages: for tgt in track_languages: if src != tgt: pairs.append((src, tgt)) return pairs def format_language_pair(src: str, tgt: str) -> str: """Format language pair for display.""" src_name = LANGUAGE_NAMES.get(src, src.upper()) tgt_name = LANGUAGE_NAMES.get(tgt, tgt.upper()) return f"{src_name} → {tgt_name}" def validate_language_code(lang: str) -> bool: """Validate if language code is supported.""" return lang in ALL_UG40_LANGUAGES def create_submission_id() -> str: """Create unique submission ID with timestamp and random component.""" timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") random_suffix = str(np.random.randint(1000, 9999)) return f"sub_{timestamp}_{random_suffix}" def sanitize_model_name(name: str) -> str: """Sanitize model name for display and storage with enhanced validation.""" if not name or not isinstance(name, str): return "Anonymous_Model" # Remove special characters, limit length name = re.sub(r"[^\w\-.]", "_", name.strip()) # Remove multiple consecutive underscores name = re.sub(r"_+", "_", name) # Remove leading/trailing underscores name = name.strip("_") # Ensure minimum length if len(name) < 3: name = f"Model_{name}" # Check for reserved names reserved_names = ["admin", "test", "baseline", "google", "system"] if name.lower() in reserved_names: name = f"User_{name}" return name[:50] # Limit to 50 characters def format_metric_value(value: float, metric: str, include_ci: bool = False, ci_lower: float = None, ci_upper: float = None) -> str: """Format metric value for display with optional confidence intervals.""" if pd.isna(value) or value is None: return "N/A" try: precision = METRICS_CONFIG["display_precision"] if metric == "coverage_rate": formatted = f"{value:.{precision}%}" elif metric in ["bleu"]: formatted = f"{value:.2f}" elif metric in ["cer", "wer"] and value > 1: # Cap error rates at 1.0 for display formatted = f"{min(value, 1.0):.{precision}f}" else: formatted = f"{value:.{precision}f}" # Add confidence interval if requested if include_ci and ci_lower is not None and ci_upper is not None: ci_str = f" [{ci_lower:.{precision}f}, {ci_upper:.{precision}f}]" formatted += ci_str return formatted except (ValueError, TypeError): return str(value) def calculate_effect_size(values1: List[float], values2: List[float]) -> float: """Calculate Cohen's d effect size between two groups.""" if len(values1) < 2 or len(values2) < 2: return 0.0 try: values1 = np.array(values1) values2 = np.array(values2) # Remove NaN values values1 = values1[~np.isnan(values1)] values2 = values2[~np.isnan(values2)] if len(values1) < 2 or len(values2) < 2: return 0.0 # Calculate pooled standard deviation n1, n2 = len(values1), len(values2) pooled_std = np.sqrt( ((n1 - 1) * np.var(values1, ddof=1) + (n2 - 1) * np.var(values2, ddof=1)) / (n1 + n2 - 2) ) if pooled_std == 0: return 0.0 # Cohen's d effect_size = (np.mean(values1) - np.mean(values2)) / pooled_std return abs(effect_size) except Exception: return 0.0 def interpret_effect_size(effect_size: float) -> str: """Interpret effect size according to Cohen's conventions.""" thresholds = STATISTICAL_CONFIG["effect_size_thresholds"] if effect_size < thresholds["small"]: return "negligible" elif effect_size < thresholds["medium"]: return "small" elif effect_size < thresholds["large"]: return "medium" else: return "large" def calculate_statistical_power( effect_size: float, n1: int, n2: int, alpha: float = 0.05 ) -> float: """Estimate statistical power for given effect size and sample sizes.""" if n1 < 2 or n2 < 2: return 0.0 try: # Simplified power calculation using t-test # This is an approximation df = n1 + n2 - 2 pooled_se = np.sqrt((1/n1) + (1/n2)) # Critical t-value t_critical = stats.t.ppf(1 - alpha/2, df) # Non-centrality parameter ncp = effect_size / pooled_se # Power (approximate) power = 1 - stats.t.cdf(t_critical, df, loc=ncp) + stats.t.cdf(-t_critical, df, loc=ncp) return min(1.0, max(0.0, power)) except Exception: return 0.0 def get_track_statistics(test_data: pd.DataFrame) -> Dict[str, Dict]: """Get comprehensive statistics about test data coverage for each track.""" track_stats = {} for track_name, track_config in EVALUATION_TRACKS.items(): track_languages = track_config["languages"] # Filter test data to track languages track_data = test_data[ (test_data["source_language"].isin(track_languages)) & (test_data["target_language"].isin(track_languages)) ] if track_data.empty: track_stats[track_name] = { "total_samples": 0, "language_pairs": 0, "samples_per_pair": {}, "coverage_matrix": {}, "adequacy_assessment": "insufficient", } continue # Calculate pair-wise statistics pair_counts = {} for src in track_languages: for tgt in track_languages: if src == tgt: continue pair_data = track_data[ (track_data["source_language"] == src) & (track_data["target_language"] == tgt) ] pair_key = f"{src}_to_{tgt}" pair_counts[pair_key] = len(pair_data) # Calculate adequacy min_required = track_config["min_samples_per_pair"] adequate_pairs = sum(1 for count in pair_counts.values() if count >= min_required) total_possible_pairs = len(track_languages) * (len(track_languages) - 1) adequacy_rate = adequate_pairs / max(total_possible_pairs, 1) if adequacy_rate >= 0.8: adequacy = "excellent" elif adequacy_rate >= 0.6: adequacy = "good" elif adequacy_rate >= 0.4: adequacy = "fair" else: adequacy = "insufficient" track_stats[track_name] = { "total_samples": len(track_data), "language_pairs": len([k for k, v in pair_counts.items() if v > 0]), "samples_per_pair": pair_counts, "coverage_matrix": pair_counts, "adequacy_assessment": adequacy, "adequacy_rate": adequacy_rate, "min_samples_per_pair": min_required, } return track_stats def validate_submission_completeness_scientific( predictions: pd.DataFrame, test_set: pd.DataFrame, track: str = None ) -> Dict: """Enhanced validation with track-specific analysis.""" if predictions.empty or test_set.empty: return { "is_complete": False, "missing_count": len(test_set) if not test_set.empty else 0, "extra_count": len(predictions) if not predictions.empty else 0, "missing_ids": [], "coverage": 0.0, "track_analysis": {}, } # If track specified, filter to track languages if track and track in EVALUATION_TRACKS: track_languages = EVALUATION_TRACKS[track]["languages"] test_set = test_set[ (test_set["source_language"].isin(track_languages)) & (test_set["target_language"].isin(track_languages)) ] try: required_ids = set(test_set["sample_id"].astype(str)) provided_ids = set(predictions["sample_id"].astype(str)) missing_ids = required_ids - provided_ids extra_ids = provided_ids - required_ids matching_ids = provided_ids & required_ids base_result = { "is_complete": len(missing_ids) == 0, "missing_count": len(missing_ids), "extra_count": len(extra_ids), "missing_ids": list(missing_ids)[:10], "coverage": len(matching_ids) / len(required_ids) if required_ids else 0.0, } # Add track-specific analysis if requested if track: track_analysis = analyze_track_coverage(predictions, test_set, track) base_result["track_analysis"] = track_analysis return base_result except Exception as e: print(f"Error in submission completeness validation: {e}") return { "is_complete": False, "missing_count": 0, "extra_count": 0, "missing_ids": [], "coverage": 0.0, "track_analysis": {}, } def analyze_track_coverage( predictions: pd.DataFrame, test_set: pd.DataFrame, track: str ) -> Dict: """Analyze coverage for a specific track.""" if track not in EVALUATION_TRACKS: return {"error": f"Unknown track: {track}"} track_config = EVALUATION_TRACKS[track] track_languages = track_config["languages"] # Filter test set to track languages track_test_set = test_set[ (test_set["source_language"].isin(track_languages)) & (test_set["target_language"].isin(track_languages)) ] if track_test_set.empty: return {"error": f"No test data available for {track} track"} # Merge with predictions merged = track_test_set.merge(predictions, on="sample_id", how="left", suffixes=("", "_pred")) # Analyze by language pair pair_analysis = {} for src in track_languages: for tgt in track_languages: if src == tgt: continue pair_data = merged[ (merged["source_language"] == src) & (merged["target_language"] == tgt) ] if len(pair_data) > 0: covered = pair_data["prediction"].notna().sum() pair_analysis[f"{src}_to_{tgt}"] = { "total": len(pair_data), "covered": covered, "coverage_rate": covered / len(pair_data), "meets_minimum": covered >= track_config["min_samples_per_pair"], } # Overall track statistics total_pairs = len(pair_analysis) adequate_pairs = sum(1 for info in pair_analysis.values() if info["meets_minimum"]) return { "track_name": track_config["name"], "total_language_pairs": total_pairs, "adequate_pairs": adequate_pairs, "adequacy_rate": adequate_pairs / max(total_pairs, 1), "pair_analysis": pair_analysis, "overall_adequate": adequate_pairs >= total_pairs * 0.8, # 80% of pairs adequate } def calculate_language_pair_coverage_scientific( predictions: pd.DataFrame, test_set: pd.DataFrame ) -> Dict: """Calculate comprehensive language pair coverage with statistical metrics.""" if predictions.empty or test_set.empty: return {} try: # Merge to get language info merged = test_set.merge(predictions, on="sample_id", how="left", suffixes=("", "_pred")) coverage = {} for src in ALL_UG40_LANGUAGES: for tgt in ALL_UG40_LANGUAGES: if src == tgt: continue pair_data = merged[ (merged["source_language"] == src) & (merged["target_language"] == tgt) ] if len(pair_data) > 0: predicted_count = pair_data["prediction"].notna().sum() coverage_rate = predicted_count / len(pair_data) # Determine which tracks include this pair tracks_included = [] for track_name, track_config in EVALUATION_TRACKS.items(): if src in track_config["languages"] and tgt in track_config["languages"]: tracks_included.append(track_name) coverage[f"{src}_{tgt}"] = { "total": len(pair_data), "predicted": predicted_count, "coverage": coverage_rate, "display_name": format_language_pair(src, tgt), "tracks_included": tracks_included, "google_comparable": ( src in GOOGLE_SUPPORTED_LANGUAGES and tgt in GOOGLE_SUPPORTED_LANGUAGES ), "statistical_adequacy": { track: predicted_count >= EVALUATION_TRACKS[track]["min_samples_per_pair"] for track in tracks_included }, } return coverage except Exception as e: print(f"Error calculating language pair coverage: {e}") return {} def safe_divide(numerator: float, denominator: float, default: float = 0.0) -> float: """Safely divide two numbers, handling edge cases.""" try: if denominator == 0 or pd.isna(denominator) or pd.isna(numerator): return default result = numerator / denominator if pd.isna(result) or not np.isfinite(result): return default return float(result) except (TypeError, ValueError, ZeroDivisionError): return default def clean_text_for_evaluation(text: str) -> str: """Clean text for evaluation, handling common encoding issues.""" if not isinstance(text, str): return str(text) if text is not None else "" # Remove extra whitespace text = re.sub(r"\s+", " ", text.strip()) # Handle common encoding issues text = text.replace("\u00a0", " ") # Non-breaking space text = text.replace("\u2019", "'") # Right single quotation mark text = text.replace("\u201c", '"') # Left double quotation mark text = text.replace("\u201d", '"') # Right double quotation mark return text def get_model_summary_stats_scientific(model_results: Dict, track: str = None) -> Dict: """Extract comprehensive summary statistics from model evaluation results.""" if not model_results or "tracks" not in model_results: return {} tracks = model_results["tracks"] # If specific track requested if track and track in tracks: track_data = tracks[track] if track_data.get("error"): return {"error": f"No valid data for {track} track"} track_averages = track_data.get("track_averages", {}) track_statistics = track_data.get("track_statistics", {}) summary = track_data.get("summary", {}) stats = { "track": track, "track_name": EVALUATION_TRACKS[track]["name"], "quality_score": track_averages.get("quality_score", 0.0), "bleu": track_averages.get("bleu", 0.0), "chrf": track_averages.get("chrf", 0.0), "total_samples": summary.get("total_samples", 0), "language_pairs": summary.get("language_pairs_evaluated", 0), "statistical_adequacy": summary.get("total_samples", 0) >= 100, # Simple threshold } # Add confidence intervals if available if "quality_score" in track_statistics: quality_stats = track_statistics["quality_score"] stats["confidence_interval"] = [ quality_stats.get("ci_lower", 0.0), quality_stats.get("ci_upper", 0.0), ] return stats # Otherwise, return summary across all tracks all_tracks_summary = { "tracks_evaluated": len([t for t in tracks.values() if not t.get("error")]), "total_tracks": len(EVALUATION_TRACKS), "by_track": {}, } for track_name, track_data in tracks.items(): if not track_data.get("error"): track_averages = track_data.get("track_averages", {}) summary = track_data.get("summary", {}) all_tracks_summary["by_track"][track_name] = { "quality_score": track_averages.get("quality_score", 0.0), "samples": summary.get("total_samples", 0), "pairs": summary.get("language_pairs_evaluated", 0), } return all_tracks_summary def generate_model_identifier_scientific( model_name: str, author: str, category: str ) -> str: """Generate a unique scientific identifier for a model.""" clean_name = sanitize_model_name(model_name) clean_author = re.sub(r"[^\w\-]", "_", author.strip())[:20] if author else "Anonymous" clean_category = category[:10] if category in MODEL_CATEGORIES else "community" timestamp = datetime.datetime.now().strftime("%m%d_%H%M") return f"{clean_category}_{clean_name}_{clean_author}_{timestamp}" def validate_dataframe_structure_enhanced( df: pd.DataFrame, required_columns: List[str], track: str = None ) -> Tuple[bool, List[str]]: """Enhanced DataFrame structure validation with track-specific checks.""" if df.empty: return False, ["DataFrame is empty"] issues = [] # Check required columns missing_columns = [col for col in required_columns if col not in df.columns] if missing_columns: issues.append(f"Missing columns: {', '.join(missing_columns)}") # Check for track-specific requirements if track and track in EVALUATION_TRACKS: track_config = EVALUATION_TRACKS[track] min_samples = track_config.get("min_samples_per_pair", 10) # Check sample size adequacy if len(df) < min_samples * 5: # At least 5 pairs worth of data issues.append(f"Insufficient samples for {track} track (minimum ~{min_samples * 5})") # Check data types if "sample_id" in df.columns: if not df["sample_id"].dtype == "object": try: df["sample_id"] = df["sample_id"].astype(str) except Exception: issues.append("Cannot convert sample_id to string") return len(issues) == 0, issues def format_duration(seconds: float) -> str: """Format duration in seconds to human-readable format.""" if seconds < 60: return f"{seconds:.1f}s" elif seconds < 3600: return f"{seconds/60:.1f}m" else: return f"{seconds/3600:.1f}h" def truncate_text(text: str, max_length: int = 100, suffix: str = "...") -> str: """Truncate text to specified length with suffix.""" if not isinstance(text, str): text = str(text) if len(text) <= max_length: return text return text[: max_length - len(suffix)] + suffix def calculate_sample_size_recommendation( desired_power: float = 0.8, effect_size: float = 0.5, alpha: float = 0.05 ) -> int: """Calculate recommended sample size for statistical analysis.""" try: # Simplified sample size calculation for t-test # This is an approximation using Cohen's conventions z_alpha = stats.norm.ppf(1 - alpha / 2) z_beta = stats.norm.ppf(desired_power) # Sample size per group n_per_group = 2 * ((z_alpha + z_beta) / effect_size) ** 2 # Round up to nearest integer return max(10, int(np.ceil(n_per_group))) except Exception: return 50 # Default fallback def assess_model_category_appropriateness( model_name: str, category: str, performance_data: Dict ) -> Dict: """Assess if the detected/assigned model category is appropriate.""" assessment = { "category": category, "appropriate": True, "confidence": 1.0, "recommendations": [], } # Check for category mismatches based on performance if category == "baseline" and performance_data: # Baselines shouldn't perform too well quality_scores = [] for track_data in performance_data.get("tracks", {}).values(): if not track_data.get("error"): quality_scores.append(track_data.get("track_averages", {}).get("quality_score", 0)) if quality_scores and max(quality_scores) > 0.7: # High performance for baseline assessment["appropriate"] = False assessment["confidence"] = 0.3 assessment["recommendations"].append( "High performance suggests this might not be a baseline model" ) # Check for commercial model expectations if category == "commercial": # Commercial models should have good Google-comparable performance google_track = performance_data.get("tracks", {}).get("google_comparable", {}) if not google_track.get("error"): quality = google_track.get("track_averages", {}).get("quality_score", 0) if quality < 0.3: # Poor performance for commercial assessment["recommendations"].append( "Low performance unexpected for commercial systems" ) return assessment