File size: 10,093 Bytes
4fa2f10
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
# src/validation.py
import pandas as pd
import numpy as np
from typing import Dict, List, Tuple, Optional
import json
import io
from config import PREDICTION_FORMAT

def validate_file_format(file_content: bytes, filename: str) -> Dict:
    """Validate uploaded file format and structure."""
    
    try:
        # Determine file type
        if filename.endswith('.csv'):
            df = pd.read_csv(io.BytesIO(file_content))
        elif filename.endswith('.tsv'):
            df = pd.read_csv(io.BytesIO(file_content), sep='\t')
        elif filename.endswith('.json'):
            data = json.loads(file_content.decode('utf-8'))
            df = pd.DataFrame(data)
        else:
            return {
                'valid': False,
                'error': f"Unsupported file type. Use: {', '.join(PREDICTION_FORMAT['file_types'])}"
            }
        
        # Check required columns
        missing_cols = set(PREDICTION_FORMAT['required_columns']) - set(df.columns)
        if missing_cols:
            return {
                'valid': False,
                'error': f"Missing required columns: {', '.join(missing_cols)}"
            }
        
        # Basic data validation
        if len(df) == 0:
            return {
                'valid': False,
                'error': "File is empty"
            }
        
        # Check for required data
        if df['sample_id'].isna().any():
            return {
                'valid': False,
                'error': "Missing sample_id values found"
            }
        
        if df['prediction'].isna().any():
            na_count = df['prediction'].isna().sum()
            return {
                'valid': False,
                'error': f"Missing prediction values found ({na_count} empty predictions)"
            }
        
        # Check for duplicates
        duplicates = df['sample_id'].duplicated()
        if duplicates.any():
            dup_count = duplicates.sum()
            return {
                'valid': False,
                'error': f"Duplicate sample_id values found ({dup_count} duplicates)"
            }
        
        return {
            'valid': True,
            'dataframe': df,
            'row_count': len(df),
            'columns': list(df.columns)
        }
        
    except Exception as e:
        return {
            'valid': False,
            'error': f"Error parsing file: {str(e)}"
        }

def validate_predictions_content(predictions: pd.DataFrame) -> Dict:
    """Validate prediction content quality."""
    
    issues = []
    warnings = []
    
    # Check prediction text quality
    empty_predictions = predictions['prediction'].str.strip().eq('').sum()
    if empty_predictions > 0:
        issues.append(f"{empty_predictions} empty predictions found")
    
    # Check for suspiciously short predictions
    short_predictions = (predictions['prediction'].str.len() < 3).sum()
    if short_predictions > len(predictions) * 0.1:  # More than 10%
        warnings.append(f"{short_predictions} very short predictions (< 3 characters)")
    
    # Check for suspiciously long predictions
    long_predictions = (predictions['prediction'].str.len() > 500).sum()
    if long_predictions > 0:
        warnings.append(f"{long_predictions} very long predictions (> 500 characters)")
    
    # Check for repeated predictions
    duplicate_predictions = predictions['prediction'].duplicated().sum()
    if duplicate_predictions > len(predictions) * 0.5:  # More than 50%
        warnings.append(f"{duplicate_predictions} duplicate prediction texts")
    
    # Check for non-text content
    non_text_pattern = r'^[A-Za-z\s\'".,!?;:()\-]+$'
    non_text_predictions = ~predictions['prediction'].str.match(non_text_pattern, na=False)
    if non_text_predictions.sum() > 0:
        warnings.append(f"{non_text_predictions.sum()} predictions contain unusual characters")
    
    return {
        'has_issues': len(issues) > 0,
        'issues': issues,
        'warnings': warnings,
        'quality_score': max(0, 1.0 - len(issues) * 0.2 - len(warnings) * 0.1)
    }

def validate_against_test_set(predictions: pd.DataFrame, test_set: pd.DataFrame) -> Dict:
    """Validate predictions against the official test set."""
    
    # Convert IDs to string for comparison
    pred_ids = set(predictions['sample_id'].astype(str))
    test_ids = set(test_set['sample_id'].astype(str))
    
    # Check coverage
    missing_ids = test_ids - pred_ids
    extra_ids = pred_ids - test_ids
    matching_ids = pred_ids & test_ids
    
    coverage = len(matching_ids) / len(test_ids)
    
    # Detailed coverage by language pair
    pair_coverage = {}
    for _, row in test_set.iterrows():
        pair_key = f"{row['source_language']}_{row['target_language']}"
        if pair_key not in pair_coverage:
            pair_coverage[pair_key] = {'total': 0, 'covered': 0}
        
        pair_coverage[pair_key]['total'] += 1
        if str(row['sample_id']) in pred_ids:
            pair_coverage[pair_key]['covered'] += 1
    
    # Calculate pair-wise coverage rates
    for pair_key in pair_coverage:
        pair_info = pair_coverage[pair_key]
        pair_info['coverage_rate'] = pair_info['covered'] / pair_info['total']
    
    return {
        'overall_coverage': coverage,
        'missing_count': len(missing_ids),
        'extra_count': len(extra_ids),
        'matching_count': len(matching_ids),
        'is_complete': coverage == 1.0,
        'pair_coverage': pair_coverage,
        'missing_ids_sample': list(missing_ids)[:10],  # First 10 for display
        'extra_ids_sample': list(extra_ids)[:10]
    }

def generate_validation_report(
    format_result: Dict,
    content_result: Dict, 
    test_set_result: Dict,
    model_name: str = ""
) -> str:
    """Generate human-readable validation report."""
    
    report = []
    
    # Header
    report.append(f"# Validation Report: {model_name or 'Submission'}")
    report.append(f"Generated: {pd.Timestamp.now().strftime('%Y-%m-%d %H:%M:%S')}")
    report.append("")
    
    # File format validation
    if format_result['valid']:
        report.append("βœ… **File Format**: Valid")
        report.append(f"   - Rows: {format_result['row_count']:,}")
        report.append(f"   - Columns: {', '.join(format_result['columns'])}")
    else:
        report.append("❌ **File Format**: Invalid")
        report.append(f"   - Error: {format_result['error']}")
        return "\n".join(report)
    
    # Content validation
    if content_result['has_issues']:
        report.append("⚠️ **Content Quality**: Issues Found")
        for issue in content_result['issues']:
            report.append(f"   - ❌ {issue}")
    else:
        report.append("βœ… **Content Quality**: Good")
    
    if content_result['warnings']:
        for warning in content_result['warnings']:
            report.append(f"   - ⚠️ {warning}")
    
    # Test set validation
    coverage = test_set_result['overall_coverage']
    if coverage == 1.0:
        report.append("βœ… **Test Set Coverage**: Complete")
    elif coverage >= 0.95:
        report.append("⚠️ **Test Set Coverage**: Nearly Complete")
    else:
        report.append("❌ **Test Set Coverage**: Incomplete")
    
    report.append(f"   - Coverage: {coverage:.1%} ({test_set_result['matching_count']:,} / {test_set_result['matching_count'] + test_set_result['missing_count']:,})")
    
    if test_set_result['missing_count'] > 0:
        report.append(f"   - Missing: {test_set_result['missing_count']:,} samples")
    
    if test_set_result['extra_count'] > 0:
        report.append(f"   - Extra: {test_set_result['extra_count']:,} samples")
    
    # Language pair coverage
    pair_cov = test_set_result['pair_coverage']
    incomplete_pairs = [k for k, v in pair_cov.items() if v['coverage_rate'] < 1.0]
    
    if incomplete_pairs:
        report.append("")
        report.append("**Incomplete Language Pairs:**")
        for pair in incomplete_pairs[:5]:  # Show first 5
            info = pair_cov[pair]
            src, tgt = pair.split('_')
            report.append(f"   - {src}β†’{tgt}: {info['covered']}/{info['total']} ({info['coverage_rate']:.1%})")
        
        if len(incomplete_pairs) > 5:
            report.append(f"   - ... and {len(incomplete_pairs) - 5} more pairs")
    
    # Final verdict
    report.append("")
    if format_result['valid'] and coverage >= 0.95 and not content_result['has_issues']:
        report.append("πŸŽ‰ **Overall**: Ready for evaluation!")
    elif format_result['valid'] and coverage >= 0.8:
        report.append("⚠️ **Overall**: Can be evaluated with warnings")
    else:
        report.append("❌ **Overall**: Please fix issues before submission")
    
    return "\n".join(report)

def validate_submission_complete(file_content: bytes, filename: str, test_set: pd.DataFrame, model_name: str = "") -> Dict:
    """Complete validation pipeline for a submission."""
    
    # Step 1: File format validation
    format_result = validate_file_format(file_content, filename)
    if not format_result['valid']:
        return {
            'valid': False,
            'report': generate_validation_report(format_result, {}, {}, model_name),
            'predictions': None
        }
    
    predictions = format_result['dataframe']
    
    # Step 2: Content validation
    content_result = validate_predictions_content(predictions)
    
    # Step 3: Test set validation
    test_set_result = validate_against_test_set(predictions, test_set)
    
    # Step 4: Generate report
    report = generate_validation_report(format_result, content_result, test_set_result, model_name)
    
    # Overall validity
    is_valid = (
        format_result['valid'] and
        not content_result['has_issues'] and
        test_set_result['overall_coverage'] >= 0.95
    )
    
    return {
        'valid': is_valid,
        'coverage': test_set_result['overall_coverage'],
        'report': report,
        'predictions': predictions,
        'pair_coverage': test_set_result['pair_coverage']
    }