# src/utils.py import re import datetime import pandas as pd import numpy as np from typing import Dict, List, Tuple, Set, Optional, Union from config import ( ALL_UG40_LANGUAGES, GOOGLE_SUPPORTED_LANGUAGES, LANGUAGE_NAMES, EVALUATION_TRACKS, MODEL_CATEGORIES, METRICS_CONFIG, ) def get_all_language_pairs() -> List[Tuple[str, str]]: """Get all possible UG40 language pairs.""" pairs = [] for src in ALL_UG40_LANGUAGES: for tgt in ALL_UG40_LANGUAGES: if src != tgt: pairs.append((src, tgt)) return pairs def get_google_comparable_pairs() -> List[Tuple[str, str]]: """Get language pairs that can be compared with Google Translate.""" pairs = [] for src in GOOGLE_SUPPORTED_LANGUAGES: for tgt in GOOGLE_SUPPORTED_LANGUAGES: if src != tgt: pairs.append((src, tgt)) return pairs def get_track_language_pairs(track: str) -> List[Tuple[str, str]]: """Get language pairs for a specific evaluation track.""" if track not in EVALUATION_TRACKS: return [] track_languages = EVALUATION_TRACKS[track]["languages"] pairs = [] for src in track_languages: for tgt in track_languages: if src != tgt: pairs.append((src, tgt)) return pairs def format_language_pair(src: str, tgt: str) -> str: """Format language pair for display.""" src_name = LANGUAGE_NAMES.get(src, src.upper()) tgt_name = LANGUAGE_NAMES.get(tgt, tgt.upper()) return f"{src_name} → {tgt_name}" def validate_language_code(lang: str) -> bool: """Validate if language code is supported.""" return lang in ALL_UG40_LANGUAGES def create_submission_id() -> str: """Create unique submission ID with timestamp and random component.""" timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") random_suffix = str(np.random.randint(1000, 9999)) return f"sub_{timestamp}_{random_suffix}" def sanitize_model_name(name: str) -> str: """Sanitize model name for display and storage.""" if not name or not isinstance(name, str): return "Anonymous_Model" # Remove special characters, limit length name = re.sub(r"[^\w\-.]", "_", name.strip()) # Remove multiple consecutive underscores name = re.sub(r"_+", "_", name) # Remove leading/trailing underscores name = name.strip("_") # Ensure minimum length if len(name) < 3: name = f"Model_{name}" # Check for reserved names reserved_names = ["admin", "test", "baseline", "google", "system"] if name.lower() in reserved_names: name = f"User_{name}" return name[:50] # Limit to 50 characters def format_metric_value(value: float, metric: str, precision: int = None) -> str: """Format metric value for display.""" if pd.isna(value) or value is None: return "N/A" try: if precision is None: precision = METRICS_CONFIG["display_precision"] if metric == "coverage_rate": return f"{value:.1%}" elif metric in ["bleu"]: return f"{value:.2f}" elif metric in ["cer", "wer"] and value > 1: # Cap error rates at 1.0 for display return f"{min(value, 1.0):.{precision}f}" else: return f"{value:.{precision}f}" except (ValueError, TypeError): return str(value) def safe_divide(numerator: float, denominator: float, default: float = 0.0) -> float: """Safely divide two numbers, handling edge cases.""" try: if denominator == 0 or pd.isna(denominator) or pd.isna(numerator): return default result = numerator / denominator if pd.isna(result) or not np.isfinite(result): return default return float(result) except (TypeError, ValueError, ZeroDivisionError): return default def clean_text_for_evaluation(text: str) -> str: """Clean text for evaluation, handling common encoding issues.""" if not isinstance(text, str): return str(text) if text is not None else "" # Remove extra whitespace text = re.sub(r"\s+", " ", text.strip()) # Handle common encoding issues text = text.replace("\u00a0", " ") # Non-breaking space text = text.replace("\u2019", "'") # Right single quotation mark text = text.replace("\u201c", '"') # Left double quotation mark text = text.replace("\u201d", '"') # Right double quotation mark return text def validate_dataframe_structure( df: pd.DataFrame, required_columns: List[str], track: str = None ) -> Tuple[bool, List[str]]: """Validate DataFrame structure.""" if df.empty: return False, ["DataFrame is empty"] issues = [] # Check required columns missing_columns = [col for col in required_columns if col not in df.columns] if missing_columns: issues.append(f"Missing columns: {', '.join(missing_columns)}") # Check data types if "sample_id" in df.columns: if not df["sample_id"].dtype == "object": try: df["sample_id"] = df["sample_id"].astype(str) except Exception: issues.append("Cannot convert sample_id to string") return len(issues) == 0, issues def calculate_track_coverage(predictions: pd.DataFrame, test_set: pd.DataFrame, track: str) -> Dict: """Calculate coverage statistics for a specific track.""" if track not in EVALUATION_TRACKS: return {"error": f"Unknown track: {track}"} track_config = EVALUATION_TRACKS[track] track_languages = track_config["languages"] # Filter test set to track languages track_test_set = test_set[ (test_set["source_language"].isin(track_languages)) & (test_set["target_language"].isin(track_languages)) ] if track_test_set.empty: return {"error": f"No test data available for {track} track"} # Calculate coverage pred_ids = set(predictions["sample_id"].astype(str)) test_ids = set(track_test_set["sample_id"].astype(str)) matching_ids = pred_ids & test_ids coverage_rate = len(matching_ids) / len(test_ids) # Analyze by language pair pair_analysis = {} for src in track_languages: for tgt in track_languages: if src == tgt: continue pair_test_data = track_test_set[ (track_test_set["source_language"] == src) & (track_test_set["target_language"] == tgt) ] if len(pair_test_data) > 0: pair_test_ids = set(pair_test_data["sample_id"].astype(str)) pair_matching = pred_ids & pair_test_ids pair_analysis[f"{src}_to_{tgt}"] = { "total": len(pair_test_data), "covered": len(pair_matching), "coverage_rate": len(pair_matching) / len(pair_test_data), } return { "track_name": track_config["name"], "total_samples": len(track_test_set), "covered_samples": len(matching_ids), "coverage_rate": coverage_rate, "pair_analysis": pair_analysis, } def generate_model_identifier(model_name: str, author: str, category: str) -> str: """Generate a unique identifier for a model.""" clean_name = sanitize_model_name(model_name) clean_author = re.sub(r"[^\w\-]", "_", author.strip())[:20] if author else "Anonymous" clean_category = category[:10] if category in MODEL_CATEGORIES else "community" timestamp = datetime.datetime.now().strftime("%m%d_%H%M") return f"{clean_category}_{clean_name}_{clean_author}_{timestamp}" def format_duration(seconds: float) -> str: """Format duration in seconds to human-readable format.""" if seconds < 60: return f"{seconds:.1f}s" elif seconds < 3600: return f"{seconds/60:.1f}m" else: return f"{seconds/3600:.1f}h" def truncate_text(text: str, max_length: int = 100, suffix: str = "...") -> str: """Truncate text to specified length with suffix.""" if not isinstance(text, str): text = str(text) if len(text) <= max_length: return text return text[: max_length - len(suffix)] + suffix def get_language_pair_display_name(src: str, tgt: str) -> str: """Get display name for a language pair.""" src_name = LANGUAGE_NAMES.get(src, src.upper()) tgt_name = LANGUAGE_NAMES.get(tgt, tgt.upper()) return f"{src_name} → {tgt_name}" def validate_submission_completeness( predictions: pd.DataFrame, test_set: pd.DataFrame, track: str = None ) -> Dict: """Validate submission completeness.""" if predictions.empty or test_set.empty: return { "is_complete": False, "missing_count": len(test_set) if not test_set.empty else 0, "extra_count": len(predictions) if not predictions.empty else 0, "missing_ids": [], "coverage": 0.0, } # If track specified, filter to track languages if track and track in EVALUATION_TRACKS: track_languages = EVALUATION_TRACKS[track]["languages"] test_set = test_set[ (test_set["source_language"].isin(track_languages)) & (test_set["target_language"].isin(track_languages)) ] try: required_ids = set(test_set["sample_id"].astype(str)) provided_ids = set(predictions["sample_id"].astype(str)) missing_ids = required_ids - provided_ids extra_ids = provided_ids - required_ids matching_ids = provided_ids & required_ids return { "is_complete": len(missing_ids) == 0, "missing_count": len(missing_ids), "extra_count": len(extra_ids), "missing_ids": list(missing_ids)[:10], "coverage": len(matching_ids) / len(required_ids) if required_ids else 0.0, } except Exception as e: print(f"Error in submission completeness validation: {e}") return { "is_complete": False, "missing_count": 0, "extra_count": 0, "missing_ids": [], "coverage": 0.0, } def get_model_summary_stats(model_results: Dict, track: str = None) -> Dict: """Extract summary statistics from model evaluation results.""" if not model_results or "tracks" not in model_results: return {} tracks = model_results["tracks"] # If specific track requested if track and track in tracks: track_data = tracks[track] if track_data.get("error"): return {"error": f"No valid data for {track} track"} track_averages = track_data.get("track_averages", {}) summary = track_data.get("summary", {}) stats = { "track": track, "track_name": EVALUATION_TRACKS[track]["name"], "quality_score": track_averages.get("quality_score", 0.0), "bleu": track_averages.get("bleu", 0.0), "chrf": track_averages.get("chrf", 0.0), "total_samples": summary.get("total_samples", 0), "language_pairs": summary.get("language_pairs_evaluated", 0), } return stats # Otherwise, return summary across all tracks all_tracks_summary = { "tracks_evaluated": len([t for t in tracks.values() if not t.get("error")]), "total_tracks": len(EVALUATION_TRACKS), "by_track": {}, } for track_name, track_data in tracks.items(): if not track_data.get("error"): track_averages = track_data.get("track_averages", {}) summary = track_data.get("summary", {}) all_tracks_summary["by_track"][track_name] = { "quality_score": track_averages.get("quality_score", 0.0), "samples": summary.get("total_samples", 0), "pairs": summary.get("language_pairs_evaluated", 0), } return all_tracks_summary