|
|
|
""" |
|
Real Medical Files Testing |
|
Batch test FhirFlame on real medical files with performance metrics |
|
""" |
|
|
|
import os |
|
import sys |
|
import time |
|
import asyncio |
|
from pathlib import Path |
|
from typing import List, Dict, Any |
|
from datetime import datetime |
|
|
|
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src')) |
|
|
|
from src.file_processor import local_processor |
|
from src.fhir_validator import FhirValidator |
|
from src.monitoring import monitor |
|
from tests.download_medical_files import MedicalFileDownloader |
|
|
|
|
|
try: |
|
from src.dicom_processor import dicom_processor |
|
DICOM_AVAILABLE = True |
|
except ImportError: |
|
DICOM_AVAILABLE = False |
|
dicom_processor = None |
|
|
|
class MedicalFileTestFramework: |
|
"""Simple testing framework for medical files""" |
|
|
|
def __init__(self): |
|
self.fhir_validator = FhirValidator() |
|
self.downloader = MedicalFileDownloader() |
|
self.results = [] |
|
|
|
|
|
self.targets = { |
|
'success_rate': 0.90, |
|
'processing_time': 5.0, |
|
'fhir_compliance': 0.95 |
|
} |
|
|
|
def analyze_mistral_ocr_compatibility(self, file_path: str) -> Dict[str, Any]: |
|
"""Analyze if file is compatible with Mistral OCR""" |
|
file_path_lower = file_path.lower() |
|
|
|
|
|
if file_path_lower.endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp')): |
|
return { |
|
'compatible': True, |
|
'confidence': 'high', |
|
'reason': 'Direct image format - ideal for Mistral OCR', |
|
'preprocessing_needed': False |
|
} |
|
|
|
|
|
elif file_path_lower.endswith(('.dcm', '.dicom')): |
|
return { |
|
'compatible': True, |
|
'confidence': 'medium', |
|
'reason': 'DICOM contains images but needs pixel data extraction', |
|
'preprocessing_needed': True |
|
} |
|
|
|
|
|
elif file_path_lower.endswith('.pdf'): |
|
return { |
|
'compatible': True, |
|
'confidence': 'medium', |
|
'reason': 'PDF can be converted to images for OCR', |
|
'preprocessing_needed': True |
|
} |
|
|
|
|
|
elif file_path_lower.endswith(('.txt', '.text')): |
|
return { |
|
'compatible': False, |
|
'confidence': 'n/a', |
|
'reason': 'Plain text files - no OCR needed, process directly', |
|
'preprocessing_needed': False |
|
} |
|
|
|
|
|
else: |
|
return { |
|
'compatible': False, |
|
'confidence': 'unknown', |
|
'reason': 'Unknown file type - cannot determine OCR compatibility', |
|
'preprocessing_needed': False |
|
} |
|
|
|
def classify_file(self, file_path: str) -> str: |
|
"""Classify file type""" |
|
file_path_lower = file_path.lower() |
|
|
|
if file_path_lower.endswith(('.dcm', '.dicom')): |
|
return 'dicom' |
|
elif file_path_lower.endswith(('.txt', '.text')): |
|
return 'text' |
|
elif file_path_lower.endswith('.pdf'): |
|
return 'pdf' |
|
elif file_path_lower.endswith(('.jpg', '.jpeg', '.png')): |
|
return 'image' |
|
else: |
|
return 'unknown' |
|
|
|
async def process_text_file(self, file_path: str) -> Dict[str, Any]: |
|
"""Process text/PDF/image file using existing processor""" |
|
try: |
|
start_time = time.time() |
|
|
|
|
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
content = f.read() |
|
|
|
|
|
content_bytes = content.encode('utf-8') |
|
|
|
|
|
result = await local_processor.process_document( |
|
document_bytes=content_bytes, |
|
user_id="test-user", |
|
filename=os.path.basename(file_path) |
|
) |
|
|
|
processing_time = time.time() - start_time |
|
|
|
|
|
fhir_validation = self.fhir_validator.validate_fhir_bundle(result['fhir_bundle']) |
|
|
|
|
|
ocr_compatibility = self.analyze_mistral_ocr_compatibility(file_path) |
|
|
|
return { |
|
'status': 'success', |
|
'file_path': file_path, |
|
'file_type': 'text', |
|
'processing_time': processing_time, |
|
'entities_found': result['entities_found'], |
|
'fhir_valid': fhir_validation['is_valid'], |
|
'fhir_compliance': fhir_validation['compliance_score'], |
|
'processor_used': result['processing_mode'], |
|
'mistral_ocr_compatible': ocr_compatibility['compatible'], |
|
'mistral_ocr_notes': ocr_compatibility['reason'] |
|
} |
|
|
|
except Exception as e: |
|
processing_time = time.time() - start_time |
|
return { |
|
'status': 'error', |
|
'file_path': file_path, |
|
'file_type': 'text', |
|
'processing_time': processing_time, |
|
'error': str(e) |
|
} |
|
|
|
async def process_dicom_file(self, file_path: str) -> Dict[str, Any]: |
|
"""Process DICOM file using DICOM processor""" |
|
if not DICOM_AVAILABLE or not dicom_processor: |
|
return { |
|
'status': 'error', |
|
'file_path': file_path, |
|
'file_type': 'dicom', |
|
'processing_time': 0.0, |
|
'error': 'DICOM processor not available - install pydicom', |
|
'mistral_ocr_compatible': True, |
|
'mistral_ocr_notes': 'DICOM images are compatible but need preprocessing' |
|
} |
|
|
|
try: |
|
start_time = time.time() |
|
|
|
|
|
result = await dicom_processor.process_dicom_file(file_path) |
|
|
|
processing_time = time.time() - start_time |
|
|
|
|
|
ocr_compatibility = self.analyze_mistral_ocr_compatibility(file_path) |
|
|
|
if result['status'] == 'success': |
|
|
|
fhir_validation = self.fhir_validator.validate_fhir_bundle(result['fhir_bundle']) |
|
|
|
return { |
|
'status': 'success', |
|
'file_path': file_path, |
|
'file_type': 'dicom', |
|
'processing_time': processing_time, |
|
'patient_name': result.get('patient_name', 'Unknown'), |
|
'modality': result.get('modality', 'Unknown'), |
|
'fhir_valid': fhir_validation['is_valid'], |
|
'fhir_compliance': fhir_validation['compliance_score'], |
|
'processor_used': 'dicom_processor', |
|
'mistral_ocr_compatible': ocr_compatibility['compatible'], |
|
'mistral_ocr_notes': ocr_compatibility['reason'] |
|
} |
|
else: |
|
return { |
|
'status': 'error', |
|
'file_path': file_path, |
|
'file_type': 'dicom', |
|
'processing_time': processing_time, |
|
'error': result.get('error', 'Unknown error'), |
|
'mistral_ocr_compatible': ocr_compatibility['compatible'], |
|
'mistral_ocr_notes': ocr_compatibility['reason'] |
|
} |
|
|
|
except Exception as e: |
|
processing_time = time.time() - start_time |
|
ocr_compatibility = self.analyze_mistral_ocr_compatibility(file_path) |
|
return { |
|
'status': 'error', |
|
'file_path': file_path, |
|
'file_type': 'dicom', |
|
'processing_time': processing_time, |
|
'error': str(e), |
|
'mistral_ocr_compatible': ocr_compatibility['compatible'], |
|
'mistral_ocr_notes': ocr_compatibility['reason'] |
|
} |
|
|
|
async def process_single_file(self, file_path: str) -> Dict[str, Any]: |
|
"""Process a single medical file""" |
|
file_type = self.classify_file(file_path) |
|
|
|
print(f"π Processing {os.path.basename(file_path)} ({file_type})...") |
|
|
|
if file_type == 'dicom': |
|
return await self.process_dicom_file(file_path) |
|
else: |
|
return await self.process_text_file(file_path) |
|
|
|
async def run_batch_test(self, file_limit: int = 20) -> Dict[str, Any]: |
|
"""Run batch test on all medical files""" |
|
print("π₯ FhirFlame Medical File Batch Testing") |
|
print("=" * 50) |
|
|
|
|
|
print("π₯ Preparing medical files...") |
|
available_files = self.downloader.download_all_files(limit=file_limit) |
|
|
|
if not available_files: |
|
print("β No medical files available for testing!") |
|
return {"error": "No files available"} |
|
|
|
print(f"π Found {len(available_files)} medical files to test") |
|
|
|
|
|
start_time = time.time() |
|
self.results = [] |
|
|
|
for i, file_path in enumerate(available_files, 1): |
|
print(f"\n[{i}/{len(available_files)}] ", end="") |
|
|
|
result = await self.process_single_file(file_path) |
|
self.results.append(result) |
|
|
|
|
|
status_emoji = "β
" if result['status'] == 'success' else "β" |
|
time_str = f"{result['processing_time']:.2f}s" |
|
ocr_note = "πOCRβ
" if result.get('mistral_ocr_compatible') else "πOCRβ" |
|
print(f"{status_emoji} {time_str} {ocr_note}") |
|
|
|
total_time = time.time() - start_time |
|
|
|
|
|
summary = self.generate_summary(total_time) |
|
|
|
print("\n" + "=" * 50) |
|
print("π BATCH TESTING RESULTS") |
|
print("=" * 50) |
|
|
|
return summary |
|
|
|
def generate_summary(self, total_time: float) -> Dict[str, Any]: |
|
"""Generate test summary and metrics""" |
|
if not self.results: |
|
return {"error": "No results to summarize"} |
|
|
|
|
|
total_files = len(self.results) |
|
successful = [r for r in self.results if r['status'] == 'success'] |
|
successful_count = len(successful) |
|
failed_count = total_files - successful_count |
|
|
|
success_rate = successful_count / total_files if total_files > 0 else 0 |
|
|
|
|
|
processing_times = [r['processing_time'] for r in successful] |
|
avg_processing_time = sum(processing_times) / len(processing_times) if processing_times else 0 |
|
max_processing_time = max(processing_times) if processing_times else 0 |
|
|
|
|
|
fhir_compliances = [r.get('fhir_compliance', 0) for r in successful] |
|
avg_fhir_compliance = sum(fhir_compliances) / len(fhir_compliances) if fhir_compliances else 0 |
|
|
|
|
|
ocr_compatible = [r for r in self.results if r.get('mistral_ocr_compatible', False)] |
|
ocr_incompatible = [r for r in self.results if not r.get('mistral_ocr_compatible', False)] |
|
|
|
|
|
file_types = {} |
|
for result in self.results: |
|
file_type = result.get('file_type', 'unknown') |
|
if file_type not in file_types: |
|
file_types[file_type] = {'total': 0, 'successful': 0, 'ocr_compatible': 0} |
|
file_types[file_type]['total'] += 1 |
|
if result['status'] == 'success': |
|
file_types[file_type]['successful'] += 1 |
|
if result.get('mistral_ocr_compatible', False): |
|
file_types[file_type]['ocr_compatible'] += 1 |
|
|
|
|
|
meets_success_target = success_rate >= self.targets['success_rate'] |
|
meets_time_target = avg_processing_time <= self.targets['processing_time'] |
|
meets_compliance_target = avg_fhir_compliance >= self.targets['fhir_compliance'] |
|
|
|
all_targets_met = meets_success_target and meets_time_target and meets_compliance_target |
|
|
|
|
|
print(f"π Files Processed: {total_files}") |
|
print(f"β
Successful: {successful_count} ({success_rate:.1%})") |
|
print(f"β Failed: {failed_count}") |
|
print(f"β±οΈ Average Processing Time: {avg_processing_time:.2f}s") |
|
print(f"π Maximum Processing Time: {max_processing_time:.2f}s") |
|
print(f"π Average FHIR Compliance: {avg_fhir_compliance:.1%}") |
|
print(f"π Total Test Time: {total_time:.2f}s") |
|
|
|
print(f"\nπ Mistral OCR Compatibility Analysis:") |
|
print(f" Compatible files: {len(ocr_compatible)}/{total_files} ({len(ocr_compatible)/total_files*100:.0f}%)") |
|
print(f" Incompatible files: {len(ocr_incompatible)}/{total_files} ({len(ocr_incompatible)/total_files*100:.0f}%)") |
|
|
|
print(f"\nπ File Type Breakdown:") |
|
for file_type, stats in file_types.items(): |
|
success_pct = stats['successful'] / stats['total'] * 100 if stats['total'] > 0 else 0 |
|
ocr_pct = stats['ocr_compatible'] / stats['total'] * 100 if stats['total'] > 0 else 0 |
|
print(f" {file_type}: {stats['successful']}/{stats['total']} success ({success_pct:.0f}%) | OCR compatible: {stats['ocr_compatible']}/{stats['total']} ({ocr_pct:.0f}%)") |
|
|
|
print(f"\nπ― Performance Targets:") |
|
print(f" Success Rate: {success_rate:.1%} {'β
' if meets_success_target else 'β'} (target: {self.targets['success_rate']:.1%})") |
|
print(f" Processing Time: {avg_processing_time:.2f}s {'β
' if meets_time_target else 'β'} (target: <{self.targets['processing_time']}s)") |
|
print(f" FHIR Compliance: {avg_fhir_compliance:.1%} {'β
' if meets_compliance_target else 'β'} (target: {self.targets['fhir_compliance']:.1%})") |
|
|
|
print(f"\nπ Mistral OCR Data Type Support:") |
|
print(f" β
Images (PNG, JPG): Direct compatibility") |
|
print(f" β
DICOM files: Compatible with preprocessing") |
|
print(f" β
PDF files: Compatible with image conversion") |
|
print(f" β Plain text: No OCR needed (process directly)") |
|
|
|
print(f"\nπ Overall Result: {'β
ALL TARGETS MET' if all_targets_met else 'β Some targets missed'}") |
|
|
|
|
|
errors = [r for r in self.results if r['status'] == 'error'] |
|
if errors: |
|
print(f"\nβ Errors ({len(errors)}):") |
|
for error in errors[:5]: |
|
filename = os.path.basename(error['file_path']) |
|
print(f" {filename}: {error['error']}") |
|
if len(errors) > 5: |
|
print(f" ... and {len(errors) - 5} more errors") |
|
|
|
return { |
|
'total_files': total_files, |
|
'successful_count': successful_count, |
|
'failed_count': failed_count, |
|
'success_rate': success_rate, |
|
'avg_processing_time': avg_processing_time, |
|
'max_processing_time': max_processing_time, |
|
'avg_fhir_compliance': avg_fhir_compliance, |
|
'total_time': total_time, |
|
'file_types': file_types, |
|
'mistral_ocr_compatible_count': len(ocr_compatible), |
|
'mistral_ocr_incompatible_count': len(ocr_incompatible), |
|
'targets_met': { |
|
'success_rate': meets_success_target, |
|
'processing_time': meets_time_target, |
|
'fhir_compliance': meets_compliance_target, |
|
'all_targets': all_targets_met |
|
}, |
|
'detailed_results': self.results |
|
} |
|
|
|
async def main(): |
|
"""Main test function""" |
|
print(f"π Starting at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") |
|
|
|
|
|
if DICOM_AVAILABLE: |
|
print("β
DICOM processing available") |
|
else: |
|
print("β οΈ DICOM processing not available (install pydicom)") |
|
|
|
|
|
mistral_enabled = os.getenv('USE_MISTRAL_FALLBACK', 'false').lower() == 'true' |
|
mistral_key = os.getenv('MISTRAL_API_KEY') |
|
|
|
print(f"π Mistral OCR Status:") |
|
print(f" Enabled: {mistral_enabled}") |
|
print(f" API Key: {'β
Set' if mistral_key else 'β Missing'}") |
|
print(f" Supported: Images, DICOM (preprocessed), PDF (converted)") |
|
print(f" Not needed: Plain text files") |
|
|
|
|
|
framework = MedicalFileTestFramework() |
|
|
|
try: |
|
results = await framework.run_batch_test(file_limit=15) |
|
|
|
if 'error' not in results: |
|
print(f"\nπ Summary:") |
|
print(f" {results['successful_count']}/{results['total_files']} files processed successfully") |
|
print(f" {results['mistral_ocr_compatible_count']} files compatible with Mistral OCR") |
|
print(f" Average time: {results['avg_processing_time']:.2f}s per file") |
|
print(f" FHIR compliance: {results['avg_fhir_compliance']:.1%}") |
|
|
|
print(f"\nπ Medical file testing completed!") |
|
return 0 |
|
|
|
except Exception as e: |
|
print(f"\nπ₯ Testing failed: {e}") |
|
return 1 |
|
|
|
if __name__ == "__main__": |
|
exit_code = asyncio.run(main()) |
|
sys.exit(exit_code) |