Spaces:
Running
Running
#!/usr/bin/env python3 | |
""" | |
Validation script to compare optimized vs original mapper output | |
Compares the following columns: 出力_科目, 出力_中科目, 出力_標準名称, 出力_項目名, 出力_標準単位 | |
""" | |
import pandas as pd | |
import numpy as np | |
from typing import List, Dict, Tuple | |
import os | |
import sys | |
from datetime import datetime | |
# Add the meisai-check-ai directory to Python path | |
sys.path.append(os.path.join(os.path.dirname(__file__), 'meisai-check-ai')) | |
class OptimizationValidator: | |
def __init__(self, original_file_path: str): | |
""" | |
Initialize validator with original output file | |
Args: | |
original_file_path: Path to outputData_original.csv | |
""" | |
self.original_file_path = original_file_path | |
self.comparison_columns = [ | |
'出力_科目', | |
'出力_中科目', | |
'出力_標準名称', | |
'出力_項目名', | |
'出力_標準単位' | |
] | |
def load_original_data(self) -> pd.DataFrame: | |
"""Load original output data""" | |
try: | |
df_original = pd.read_csv(self.original_file_path) | |
print(f"✓ Loaded original data: {len(df_original)} rows") | |
return df_original | |
except Exception as e: | |
print(f"✗ Error loading original data: {e}") | |
raise | |
def compare_dataframes(self, df_original: pd.DataFrame, df_optimized: pd.DataFrame) -> Dict: | |
""" | |
Compare original vs optimized dataframes | |
Returns: | |
Dict with comparison results | |
""" | |
results = { | |
'total_rows': len(df_original), | |
'columns_compared': self.comparison_columns, | |
'differences': {}, | |
'summary': {} | |
} | |
# Check if dataframes have same length | |
if len(df_original) != len(df_optimized): | |
results['length_mismatch'] = { | |
'original': len(df_original), | |
'optimized': len(df_optimized) | |
} | |
print(f"⚠ Warning: Different number of rows - Original: {len(df_original)}, Optimized: {len(df_optimized)}") | |
# Compare each column | |
for col in self.comparison_columns: | |
if col not in df_original.columns: | |
results['differences'][col] = f"Column not found in original data" | |
continue | |
if col not in df_optimized.columns: | |
results['differences'][col] = f"Column not found in optimized data" | |
continue | |
# Fill NaN values with empty string for comparison | |
original_values = df_original[col].fillna('') | |
optimized_values = df_optimized[col].fillna('') | |
# Compare values | |
differences = original_values != optimized_values | |
diff_count = differences.sum() | |
results['differences'][col] = { | |
'total_differences': int(diff_count), | |
'accuracy_percentage': round((1 - diff_count / len(df_original)) * 100, 2), | |
'different_indices': differences[differences].index.tolist()[:10] # Show first 10 different indices | |
} | |
if diff_count > 0: | |
print(f"⚠ {col}: {diff_count} differences ({results['differences'][col]['accuracy_percentage']}% accuracy)") | |
else: | |
print(f"✓ {col}: Perfect match (100% accuracy)") | |
# Overall summary | |
total_differences = sum([results['differences'][col]['total_differences'] | |
for col in self.comparison_columns | |
if isinstance(results['differences'][col], dict)]) | |
overall_accuracy = round((1 - total_differences / (len(df_original) * len(self.comparison_columns))) * 100, 2) | |
results['summary'] = { | |
'total_differences': total_differences, | |
'overall_accuracy': overall_accuracy, | |
'perfect_match': total_differences == 0 | |
} | |
return results | |
def generate_difference_report(self, df_original: pd.DataFrame, df_optimized: pd.DataFrame, | |
output_file: str = None) -> str: | |
""" | |
Generate detailed difference report | |
Args: | |
df_original: Original dataframe | |
df_optimized: Optimized dataframe | |
output_file: Optional output file path | |
Returns: | |
Report string | |
""" | |
report_lines = [] | |
report_lines.append("=" * 80) | |
report_lines.append(f"OPTIMIZATION VALIDATION REPORT") | |
report_lines.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") | |
report_lines.append("=" * 80) | |
# Basic info | |
report_lines.append(f"Original data rows: {len(df_original)}") | |
report_lines.append(f"Optimized data rows: {len(df_optimized)}") | |
report_lines.append(f"Columns compared: {', '.join(self.comparison_columns)}") | |
report_lines.append("") | |
# Compare each column | |
for col in self.comparison_columns: | |
if col not in df_original.columns or col not in df_optimized.columns: | |
report_lines.append(f"❌ {col}: Column missing") | |
continue | |
original_values = df_original[col].fillna('') | |
optimized_values = df_optimized[col].fillna('') | |
differences = original_values != optimized_values | |
diff_count = differences.sum() | |
accuracy = round((1 - diff_count / len(df_original)) * 100, 2) | |
status = "✅" if diff_count == 0 else "⚠️" | |
report_lines.append(f"{status} {col}: {diff_count} differences ({accuracy}% accuracy)") | |
if diff_count > 0: | |
# Show some examples of differences | |
diff_indices = differences[differences].index[:5] | |
report_lines.append(f" Sample differences (first 5):") | |
for idx in diff_indices: | |
orig_val = str(original_values.iloc[idx])[:50] | |
opt_val = str(optimized_values.iloc[idx])[:50] | |
report_lines.append(f" Row {idx}: '{orig_val}' → '{opt_val}'") | |
report_lines.append("") | |
# Overall summary | |
total_comparisons = len(df_original) * len(self.comparison_columns) | |
total_differences = sum([ | |
(df_original[col].fillna('') != df_optimized[col].fillna('')).sum() | |
for col in self.comparison_columns | |
if col in df_original.columns and col in df_optimized.columns | |
]) | |
overall_accuracy = round((1 - total_differences / total_comparisons) * 100, 2) | |
report_lines.append("=" * 80) | |
report_lines.append(f"OVERALL RESULTS:") | |
report_lines.append(f"Total differences: {total_differences}") | |
report_lines.append(f"Overall accuracy: {overall_accuracy}%") | |
report_lines.append(f"Perfect match: {'Yes' if total_differences == 0 else 'No'}") | |
report_lines.append("=" * 80) | |
report_text = "\n".join(report_lines) | |
if output_file: | |
with open(output_file, 'w', encoding='utf-8') as f: | |
f.write(report_text) | |
print(f"📄 Report saved to: {output_file}") | |
return report_text | |
def validate_optimization(self, optimized_mapper_function, input_data: pd.DataFrame, | |
report_file: str = None) -> bool: | |
""" | |
Run full validation process | |
Args: | |
optimized_mapper_function: Function that takes input_data and returns optimized output | |
input_data: Input dataframe to process | |
report_file: Optional report file path | |
Returns: | |
True if validation passes (100% accuracy) | |
""" | |
print("🔍 Starting optimization validation...") | |
# Load original data | |
df_original = self.load_original_data() | |
# Run optimized mapper | |
print("🚀 Running optimized mapper...") | |
try: | |
df_optimized = optimized_mapper_function(input_data) | |
print(f"✓ Optimized processing completed: {len(df_optimized)} rows") | |
except Exception as e: | |
print(f"✗ Error in optimized processing: {e}") | |
return False | |
# Compare results | |
print("📊 Comparing results...") | |
results = self.compare_dataframes(df_original, df_optimized) | |
# Generate report | |
if report_file: | |
self.generate_difference_report(df_original, df_optimized, report_file) | |
# Print summary | |
print("\n" + "="*50) | |
print("🎯 VALIDATION SUMMARY") | |
print("="*50) | |
print(f"Overall accuracy: {results['summary']['overall_accuracy']}%") | |
print(f"Perfect match: {'Yes' if results['summary']['perfect_match'] else 'No'}") | |
print(f"Total differences: {results['summary']['total_differences']}") | |
return results['summary']['perfect_match'] | |
def compare_two_files(self, optimized_file_path: str, report_file: str = None) -> bool: | |
""" | |
Compare two CSV files directly | |
Args: | |
optimized_file_path: Path to optimized output CSV | |
report_file: Optional report file path | |
Returns: | |
True if validation passes (100% accuracy) | |
""" | |
print("🔍 Starting file comparison validation...") | |
# Load original data | |
df_original = self.load_original_data() | |
# Load optimized data | |
try: | |
df_optimized = pd.read_csv(optimized_file_path) | |
print(f"✓ Loaded optimized data: {len(df_optimized)} rows") | |
except Exception as e: | |
print(f"✗ Error loading optimized data: {e}") | |
return False | |
# Compare results | |
print("📊 Comparing results...") | |
results = self.compare_dataframes(df_original, df_optimized) | |
# Generate report | |
if report_file: | |
self.generate_difference_report(df_original, df_optimized, report_file) | |
# Print summary | |
print("\n" + "="*50) | |
print("🎯 VALIDATION SUMMARY") | |
print("="*50) | |
print(f"Overall accuracy: {results['summary']['overall_accuracy']}%") | |
print(f"Perfect match: {'Yes' if results['summary']['perfect_match'] else 'No'}") | |
print(f"Total differences: {results['summary']['total_differences']}") | |
return results['summary']['perfect_match'] | |
def main(): | |
"""Example usage""" | |
# Example paths - update these according to your setup | |
original_file = "data/outputData_original.csv" | |
input_file = "data/outputData_api.csv" | |
if not os.path.exists(original_file): | |
print(f"❌ Original file not found: {original_file}") | |
print("Please ensure outputData_original.csv exists in the current directory") | |
return | |
# Initialize validator | |
validator = OptimizationValidator(original_file) | |
# Example of how to use with your mapper | |
def example_optimized_mapper(input_data): | |
# This is where you would call your optimized mapper | |
# For now, return a copy of input_data as example | |
df_result = input_data.copy() | |
# Add expected output columns with dummy data for demo | |
df_result['出力_科目'] = df_result.get('科目', '') | |
df_result['出力_中科目'] = df_result.get('中科目', '') | |
df_result['出力_標準名称'] = df_result.get('名称', '') | |
df_result['出力_項目名'] = df_result.get('名称', '') | |
df_result['出力_標準単位'] = df_result.get('単位', '') | |
return df_result | |
# Load input data | |
if os.path.exists(input_file): | |
input_data = pd.read_csv(input_file) | |
# Run validation | |
is_valid = validator.validate_optimization( | |
example_optimized_mapper, | |
input_data, | |
"optimization_validation_report.txt" | |
) | |
if is_valid: | |
print("🎉 Validation PASSED! Optimization maintains accuracy.") | |
else: | |
print("❌ Validation FAILED! Check the report for details.") | |
else: | |
print(f"❌ Input file not found: {input_file}") | |
print("You can also compare two CSV files directly:") | |
print("validator.compare_two_files('optimized_output.csv', 'report.txt')") | |
if __name__ == "__main__": | |
main() |