fhirflame / tests /test_real_medical_files.py
leksval
initial commit
a963d65
#!/usr/bin/env python3
"""
Real Medical Files Testing
Batch test FhirFlame on real medical files with performance metrics
"""
import os
import sys
import time
import asyncio
from pathlib import Path
from typing import List, Dict, Any
from datetime import datetime
# Add src to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src'))
from src.file_processor import local_processor
from src.fhir_validator import FhirValidator
from src.monitoring import monitor
from tests.download_medical_files import MedicalFileDownloader
# Try to import DICOM processor
try:
from src.dicom_processor import dicom_processor
DICOM_AVAILABLE = True
except ImportError:
DICOM_AVAILABLE = False
dicom_processor = None
class MedicalFileTestFramework:
"""Simple testing framework for medical files"""
def __init__(self):
self.fhir_validator = FhirValidator()
self.downloader = MedicalFileDownloader()
self.results = []
# Performance targets from the plan
self.targets = {
'success_rate': 0.90, # >90% success
'processing_time': 5.0, # <5 seconds per file
'fhir_compliance': 0.95 # >95% compliance
}
def analyze_mistral_ocr_compatibility(self, file_path: str) -> Dict[str, Any]:
"""Analyze if file is compatible with Mistral OCR"""
file_path_lower = file_path.lower()
# Image files - fully compatible
if file_path_lower.endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp')):
return {
'compatible': True,
'confidence': 'high',
'reason': 'Direct image format - ideal for Mistral OCR',
'preprocessing_needed': False
}
# DICOM files - compatible with preprocessing
elif file_path_lower.endswith(('.dcm', '.dicom')):
return {
'compatible': True,
'confidence': 'medium',
'reason': 'DICOM contains images but needs pixel data extraction',
'preprocessing_needed': True
}
# PDF files - compatible with conversion
elif file_path_lower.endswith('.pdf'):
return {
'compatible': True,
'confidence': 'medium',
'reason': 'PDF can be converted to images for OCR',
'preprocessing_needed': True
}
# Text files - not compatible (no OCR needed)
elif file_path_lower.endswith(('.txt', '.text')):
return {
'compatible': False,
'confidence': 'n/a',
'reason': 'Plain text files - no OCR needed, process directly',
'preprocessing_needed': False
}
# Unknown files
else:
return {
'compatible': False,
'confidence': 'unknown',
'reason': 'Unknown file type - cannot determine OCR compatibility',
'preprocessing_needed': False
}
def classify_file(self, file_path: str) -> str:
"""Classify file type"""
file_path_lower = file_path.lower()
if file_path_lower.endswith(('.dcm', '.dicom')):
return 'dicom'
elif file_path_lower.endswith(('.txt', '.text')):
return 'text'
elif file_path_lower.endswith('.pdf'):
return 'pdf'
elif file_path_lower.endswith(('.jpg', '.jpeg', '.png')):
return 'image'
else:
return 'unknown'
async def process_text_file(self, file_path: str) -> Dict[str, Any]:
"""Process text/PDF/image file using existing processor"""
try:
start_time = time.time()
# Read file content
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Convert to bytes for processor
content_bytes = content.encode('utf-8')
# Process with local processor (may use Mistral OCR if enabled)
result = await local_processor.process_document(
document_bytes=content_bytes,
user_id="test-user",
filename=os.path.basename(file_path)
)
processing_time = time.time() - start_time
# Validate FHIR bundle
fhir_validation = self.fhir_validator.validate_fhir_bundle(result['fhir_bundle'])
# Check Mistral OCR compatibility
ocr_compatibility = self.analyze_mistral_ocr_compatibility(file_path)
return {
'status': 'success',
'file_path': file_path,
'file_type': 'text',
'processing_time': processing_time,
'entities_found': result['entities_found'],
'fhir_valid': fhir_validation['is_valid'],
'fhir_compliance': fhir_validation['compliance_score'],
'processor_used': result['processing_mode'],
'mistral_ocr_compatible': ocr_compatibility['compatible'],
'mistral_ocr_notes': ocr_compatibility['reason']
}
except Exception as e:
processing_time = time.time() - start_time
return {
'status': 'error',
'file_path': file_path,
'file_type': 'text',
'processing_time': processing_time,
'error': str(e)
}
async def process_dicom_file(self, file_path: str) -> Dict[str, Any]:
"""Process DICOM file using DICOM processor"""
if not DICOM_AVAILABLE or not dicom_processor:
return {
'status': 'error',
'file_path': file_path,
'file_type': 'dicom',
'processing_time': 0.0,
'error': 'DICOM processor not available - install pydicom',
'mistral_ocr_compatible': True,
'mistral_ocr_notes': 'DICOM images are compatible but need preprocessing'
}
try:
start_time = time.time()
# Process with DICOM processor
result = await dicom_processor.process_dicom_file(file_path)
processing_time = time.time() - start_time
# Check Mistral OCR compatibility
ocr_compatibility = self.analyze_mistral_ocr_compatibility(file_path)
if result['status'] == 'success':
# Validate FHIR bundle
fhir_validation = self.fhir_validator.validate_fhir_bundle(result['fhir_bundle'])
return {
'status': 'success',
'file_path': file_path,
'file_type': 'dicom',
'processing_time': processing_time,
'patient_name': result.get('patient_name', 'Unknown'),
'modality': result.get('modality', 'Unknown'),
'fhir_valid': fhir_validation['is_valid'],
'fhir_compliance': fhir_validation['compliance_score'],
'processor_used': 'dicom_processor',
'mistral_ocr_compatible': ocr_compatibility['compatible'],
'mistral_ocr_notes': ocr_compatibility['reason']
}
else:
return {
'status': 'error',
'file_path': file_path,
'file_type': 'dicom',
'processing_time': processing_time,
'error': result.get('error', 'Unknown error'),
'mistral_ocr_compatible': ocr_compatibility['compatible'],
'mistral_ocr_notes': ocr_compatibility['reason']
}
except Exception as e:
processing_time = time.time() - start_time
ocr_compatibility = self.analyze_mistral_ocr_compatibility(file_path)
return {
'status': 'error',
'file_path': file_path,
'file_type': 'dicom',
'processing_time': processing_time,
'error': str(e),
'mistral_ocr_compatible': ocr_compatibility['compatible'],
'mistral_ocr_notes': ocr_compatibility['reason']
}
async def process_single_file(self, file_path: str) -> Dict[str, Any]:
"""Process a single medical file"""
file_type = self.classify_file(file_path)
print(f"πŸ“„ Processing {os.path.basename(file_path)} ({file_type})...")
if file_type == 'dicom':
return await self.process_dicom_file(file_path)
else:
return await self.process_text_file(file_path)
async def run_batch_test(self, file_limit: int = 20) -> Dict[str, Any]:
"""Run batch test on all medical files"""
print("πŸ₯ FhirFlame Medical File Batch Testing")
print("=" * 50)
# Download/prepare medical files
print("πŸ“₯ Preparing medical files...")
available_files = self.downloader.download_all_files(limit=file_limit)
if not available_files:
print("❌ No medical files available for testing!")
return {"error": "No files available"}
print(f"πŸ“‹ Found {len(available_files)} medical files to test")
# Process each file
start_time = time.time()
self.results = []
for i, file_path in enumerate(available_files, 1):
print(f"\n[{i}/{len(available_files)}] ", end="")
result = await self.process_single_file(file_path)
self.results.append(result)
# Show quick result
status_emoji = "βœ…" if result['status'] == 'success' else "❌"
time_str = f"{result['processing_time']:.2f}s"
ocr_note = "πŸ”OCRβœ…" if result.get('mistral_ocr_compatible') else "πŸ”OCR❌"
print(f"{status_emoji} {time_str} {ocr_note}")
total_time = time.time() - start_time
# Generate summary
summary = self.generate_summary(total_time)
print("\n" + "=" * 50)
print("πŸ“Š BATCH TESTING RESULTS")
print("=" * 50)
return summary
def generate_summary(self, total_time: float) -> Dict[str, Any]:
"""Generate test summary and metrics"""
if not self.results:
return {"error": "No results to summarize"}
# Calculate metrics
total_files = len(self.results)
successful = [r for r in self.results if r['status'] == 'success']
successful_count = len(successful)
failed_count = total_files - successful_count
success_rate = successful_count / total_files if total_files > 0 else 0
# Processing time metrics
processing_times = [r['processing_time'] for r in successful]
avg_processing_time = sum(processing_times) / len(processing_times) if processing_times else 0
max_processing_time = max(processing_times) if processing_times else 0
# FHIR compliance metrics
fhir_compliances = [r.get('fhir_compliance', 0) for r in successful]
avg_fhir_compliance = sum(fhir_compliances) / len(fhir_compliances) if fhir_compliances else 0
# Mistral OCR compatibility analysis
ocr_compatible = [r for r in self.results if r.get('mistral_ocr_compatible', False)]
ocr_incompatible = [r for r in self.results if not r.get('mistral_ocr_compatible', False)]
# File type breakdown
file_types = {}
for result in self.results:
file_type = result.get('file_type', 'unknown')
if file_type not in file_types:
file_types[file_type] = {'total': 0, 'successful': 0, 'ocr_compatible': 0}
file_types[file_type]['total'] += 1
if result['status'] == 'success':
file_types[file_type]['successful'] += 1
if result.get('mistral_ocr_compatible', False):
file_types[file_type]['ocr_compatible'] += 1
# Performance against targets
meets_success_target = success_rate >= self.targets['success_rate']
meets_time_target = avg_processing_time <= self.targets['processing_time']
meets_compliance_target = avg_fhir_compliance >= self.targets['fhir_compliance']
all_targets_met = meets_success_target and meets_time_target and meets_compliance_target
# Print detailed results
print(f"πŸ“‹ Files Processed: {total_files}")
print(f"βœ… Successful: {successful_count} ({success_rate:.1%})")
print(f"❌ Failed: {failed_count}")
print(f"⏱️ Average Processing Time: {avg_processing_time:.2f}s")
print(f"πŸ” Maximum Processing Time: {max_processing_time:.2f}s")
print(f"πŸ“Š Average FHIR Compliance: {avg_fhir_compliance:.1%}")
print(f"πŸ• Total Test Time: {total_time:.2f}s")
print(f"\nπŸ” Mistral OCR Compatibility Analysis:")
print(f" Compatible files: {len(ocr_compatible)}/{total_files} ({len(ocr_compatible)/total_files*100:.0f}%)")
print(f" Incompatible files: {len(ocr_incompatible)}/{total_files} ({len(ocr_incompatible)/total_files*100:.0f}%)")
print(f"\nπŸ“‚ File Type Breakdown:")
for file_type, stats in file_types.items():
success_pct = stats['successful'] / stats['total'] * 100 if stats['total'] > 0 else 0
ocr_pct = stats['ocr_compatible'] / stats['total'] * 100 if stats['total'] > 0 else 0
print(f" {file_type}: {stats['successful']}/{stats['total']} success ({success_pct:.0f}%) | OCR compatible: {stats['ocr_compatible']}/{stats['total']} ({ocr_pct:.0f}%)")
print(f"\n🎯 Performance Targets:")
print(f" Success Rate: {success_rate:.1%} {'βœ…' if meets_success_target else '❌'} (target: {self.targets['success_rate']:.1%})")
print(f" Processing Time: {avg_processing_time:.2f}s {'βœ…' if meets_time_target else '❌'} (target: <{self.targets['processing_time']}s)")
print(f" FHIR Compliance: {avg_fhir_compliance:.1%} {'βœ…' if meets_compliance_target else '❌'} (target: {self.targets['fhir_compliance']:.1%})")
print(f"\nπŸ” Mistral OCR Data Type Support:")
print(f" βœ… Images (PNG, JPG): Direct compatibility")
print(f" βœ… DICOM files: Compatible with preprocessing")
print(f" βœ… PDF files: Compatible with image conversion")
print(f" ❌ Plain text: No OCR needed (process directly)")
print(f"\nπŸ† Overall Result: {'βœ… ALL TARGETS MET' if all_targets_met else '❌ Some targets missed'}")
# Show errors if any
errors = [r for r in self.results if r['status'] == 'error']
if errors:
print(f"\n❌ Errors ({len(errors)}):")
for error in errors[:5]: # Show first 5 errors
filename = os.path.basename(error['file_path'])
print(f" {filename}: {error['error']}")
if len(errors) > 5:
print(f" ... and {len(errors) - 5} more errors")
return {
'total_files': total_files,
'successful_count': successful_count,
'failed_count': failed_count,
'success_rate': success_rate,
'avg_processing_time': avg_processing_time,
'max_processing_time': max_processing_time,
'avg_fhir_compliance': avg_fhir_compliance,
'total_time': total_time,
'file_types': file_types,
'mistral_ocr_compatible_count': len(ocr_compatible),
'mistral_ocr_incompatible_count': len(ocr_incompatible),
'targets_met': {
'success_rate': meets_success_target,
'processing_time': meets_time_target,
'fhir_compliance': meets_compliance_target,
'all_targets': all_targets_met
},
'detailed_results': self.results
}
async def main():
"""Main test function"""
print(f"πŸ• Starting at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# Check if DICOM is available
if DICOM_AVAILABLE:
print("βœ… DICOM processing available")
else:
print("⚠️ DICOM processing not available (install pydicom)")
# Check Mistral OCR configuration
mistral_enabled = os.getenv('USE_MISTRAL_FALLBACK', 'false').lower() == 'true'
mistral_key = os.getenv('MISTRAL_API_KEY')
print(f"πŸ” Mistral OCR Status:")
print(f" Enabled: {mistral_enabled}")
print(f" API Key: {'βœ… Set' if mistral_key else '❌ Missing'}")
print(f" Supported: Images, DICOM (preprocessed), PDF (converted)")
print(f" Not needed: Plain text files")
# Run tests
framework = MedicalFileTestFramework()
try:
results = await framework.run_batch_test(file_limit=15)
if 'error' not in results:
print(f"\nπŸ“‹ Summary:")
print(f" {results['successful_count']}/{results['total_files']} files processed successfully")
print(f" {results['mistral_ocr_compatible_count']} files compatible with Mistral OCR")
print(f" Average time: {results['avg_processing_time']:.2f}s per file")
print(f" FHIR compliance: {results['avg_fhir_compliance']:.1%}")
print(f"\nπŸŽ‰ Medical file testing completed!")
return 0
except Exception as e:
print(f"\nπŸ’₯ Testing failed: {e}")
return 1
if __name__ == "__main__":
exit_code = asyncio.run(main())
sys.exit(exit_code)