File size: 18,172 Bytes
a963d65
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
#!/usr/bin/env python3
"""
Real Medical Files Testing
Batch test FhirFlame on real medical files with performance metrics
"""

import os
import sys
import time
import asyncio
from pathlib import Path
from typing import List, Dict, Any
from datetime import datetime

# Add src to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src'))

from src.file_processor import local_processor
from src.fhir_validator import FhirValidator
from src.monitoring import monitor
from tests.download_medical_files import MedicalFileDownloader

# Try to import DICOM processor
try:
    from src.dicom_processor import dicom_processor
    DICOM_AVAILABLE = True
except ImportError:
    DICOM_AVAILABLE = False
    dicom_processor = None

class MedicalFileTestFramework:
    """Simple testing framework for medical files"""
    
    def __init__(self):
        self.fhir_validator = FhirValidator()
        self.downloader = MedicalFileDownloader()
        self.results = []
        
        # Performance targets from the plan
        self.targets = {
            'success_rate': 0.90,    # >90% success
            'processing_time': 5.0,  # <5 seconds per file
            'fhir_compliance': 0.95  # >95% compliance
        }
    
    def analyze_mistral_ocr_compatibility(self, file_path: str) -> Dict[str, Any]:
        """Analyze if file is compatible with Mistral OCR"""
        file_path_lower = file_path.lower()
        
        # Image files - fully compatible
        if file_path_lower.endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp')):
            return {
                'compatible': True,
                'confidence': 'high',
                'reason': 'Direct image format - ideal for Mistral OCR',
                'preprocessing_needed': False
            }
        
        # DICOM files - compatible with preprocessing
        elif file_path_lower.endswith(('.dcm', '.dicom')):
            return {
                'compatible': True,
                'confidence': 'medium',
                'reason': 'DICOM contains images but needs pixel data extraction',
                'preprocessing_needed': True
            }
        
        # PDF files - compatible with conversion
        elif file_path_lower.endswith('.pdf'):
            return {
                'compatible': True,
                'confidence': 'medium', 
                'reason': 'PDF can be converted to images for OCR',
                'preprocessing_needed': True
            }
        
        # Text files - not compatible (no OCR needed)
        elif file_path_lower.endswith(('.txt', '.text')):
            return {
                'compatible': False,
                'confidence': 'n/a',
                'reason': 'Plain text files - no OCR needed, process directly',
                'preprocessing_needed': False
            }
        
        # Unknown files
        else:
            return {
                'compatible': False,
                'confidence': 'unknown',
                'reason': 'Unknown file type - cannot determine OCR compatibility',
                'preprocessing_needed': False
            }
    
    def classify_file(self, file_path: str) -> str:
        """Classify file type"""
        file_path_lower = file_path.lower()
        
        if file_path_lower.endswith(('.dcm', '.dicom')):
            return 'dicom'
        elif file_path_lower.endswith(('.txt', '.text')):
            return 'text'
        elif file_path_lower.endswith('.pdf'):
            return 'pdf'
        elif file_path_lower.endswith(('.jpg', '.jpeg', '.png')):
            return 'image'
        else:
            return 'unknown'
    
    async def process_text_file(self, file_path: str) -> Dict[str, Any]:
        """Process text/PDF/image file using existing processor"""
        try:
            start_time = time.time()
            
            # Read file content
            with open(file_path, 'r', encoding='utf-8') as f:
                content = f.read()
            
            # Convert to bytes for processor
            content_bytes = content.encode('utf-8')
            
            # Process with local processor (may use Mistral OCR if enabled)
            result = await local_processor.process_document(
                document_bytes=content_bytes,
                user_id="test-user",
                filename=os.path.basename(file_path)
            )
            
            processing_time = time.time() - start_time
            
            # Validate FHIR bundle
            fhir_validation = self.fhir_validator.validate_fhir_bundle(result['fhir_bundle'])
            
            # Check Mistral OCR compatibility
            ocr_compatibility = self.analyze_mistral_ocr_compatibility(file_path)
            
            return {
                'status': 'success',
                'file_path': file_path,
                'file_type': 'text',
                'processing_time': processing_time,
                'entities_found': result['entities_found'],
                'fhir_valid': fhir_validation['is_valid'],
                'fhir_compliance': fhir_validation['compliance_score'],
                'processor_used': result['processing_mode'],
                'mistral_ocr_compatible': ocr_compatibility['compatible'],
                'mistral_ocr_notes': ocr_compatibility['reason']
            }
            
        except Exception as e:
            processing_time = time.time() - start_time
            return {
                'status': 'error',
                'file_path': file_path,
                'file_type': 'text',
                'processing_time': processing_time,
                'error': str(e)
            }
    
    async def process_dicom_file(self, file_path: str) -> Dict[str, Any]:
        """Process DICOM file using DICOM processor"""
        if not DICOM_AVAILABLE or not dicom_processor:
            return {
                'status': 'error',
                'file_path': file_path,
                'file_type': 'dicom',
                'processing_time': 0.0,
                'error': 'DICOM processor not available - install pydicom',
                'mistral_ocr_compatible': True,
                'mistral_ocr_notes': 'DICOM images are compatible but need preprocessing'
            }
        
        try:
            start_time = time.time()
            
            # Process with DICOM processor
            result = await dicom_processor.process_dicom_file(file_path)
            
            processing_time = time.time() - start_time
            
            # Check Mistral OCR compatibility
            ocr_compatibility = self.analyze_mistral_ocr_compatibility(file_path)
            
            if result['status'] == 'success':
                # Validate FHIR bundle
                fhir_validation = self.fhir_validator.validate_fhir_bundle(result['fhir_bundle'])
                
                return {
                    'status': 'success',
                    'file_path': file_path,
                    'file_type': 'dicom',
                    'processing_time': processing_time,
                    'patient_name': result.get('patient_name', 'Unknown'),
                    'modality': result.get('modality', 'Unknown'),
                    'fhir_valid': fhir_validation['is_valid'],
                    'fhir_compliance': fhir_validation['compliance_score'],
                    'processor_used': 'dicom_processor',
                    'mistral_ocr_compatible': ocr_compatibility['compatible'],
                    'mistral_ocr_notes': ocr_compatibility['reason']
                }
            else:
                return {
                    'status': 'error',
                    'file_path': file_path,
                    'file_type': 'dicom',
                    'processing_time': processing_time,
                    'error': result.get('error', 'Unknown error'),
                    'mistral_ocr_compatible': ocr_compatibility['compatible'],
                    'mistral_ocr_notes': ocr_compatibility['reason']
                }
                
        except Exception as e:
            processing_time = time.time() - start_time
            ocr_compatibility = self.analyze_mistral_ocr_compatibility(file_path)
            return {
                'status': 'error',
                'file_path': file_path,
                'file_type': 'dicom',
                'processing_time': processing_time,
                'error': str(e),
                'mistral_ocr_compatible': ocr_compatibility['compatible'],
                'mistral_ocr_notes': ocr_compatibility['reason']
            }
    
    async def process_single_file(self, file_path: str) -> Dict[str, Any]:
        """Process a single medical file"""
        file_type = self.classify_file(file_path)
        
        print(f"πŸ“„ Processing {os.path.basename(file_path)} ({file_type})...")
        
        if file_type == 'dicom':
            return await self.process_dicom_file(file_path)
        else:
            return await self.process_text_file(file_path)
    
    async def run_batch_test(self, file_limit: int = 20) -> Dict[str, Any]:
        """Run batch test on all medical files"""
        print("πŸ₯ FhirFlame Medical File Batch Testing")
        print("=" * 50)
        
        # Download/prepare medical files
        print("πŸ“₯ Preparing medical files...")
        available_files = self.downloader.download_all_files(limit=file_limit)
        
        if not available_files:
            print("❌ No medical files available for testing!")
            return {"error": "No files available"}
        
        print(f"πŸ“‹ Found {len(available_files)} medical files to test")
        
        # Process each file
        start_time = time.time()
        self.results = []
        
        for i, file_path in enumerate(available_files, 1):
            print(f"\n[{i}/{len(available_files)}] ", end="")
            
            result = await self.process_single_file(file_path)
            self.results.append(result)
            
            # Show quick result
            status_emoji = "βœ…" if result['status'] == 'success' else "❌"
            time_str = f"{result['processing_time']:.2f}s"
            ocr_note = "πŸ”OCRβœ…" if result.get('mistral_ocr_compatible') else "πŸ”OCR❌"
            print(f"{status_emoji} {time_str} {ocr_note}")
        
        total_time = time.time() - start_time
        
        # Generate summary
        summary = self.generate_summary(total_time)
        
        print("\n" + "=" * 50)
        print("πŸ“Š BATCH TESTING RESULTS")
        print("=" * 50)
        
        return summary
    
    def generate_summary(self, total_time: float) -> Dict[str, Any]:
        """Generate test summary and metrics"""
        if not self.results:
            return {"error": "No results to summarize"}
        
        # Calculate metrics
        total_files = len(self.results)
        successful = [r for r in self.results if r['status'] == 'success']
        successful_count = len(successful)
        failed_count = total_files - successful_count
        
        success_rate = successful_count / total_files if total_files > 0 else 0
        
        # Processing time metrics
        processing_times = [r['processing_time'] for r in successful]
        avg_processing_time = sum(processing_times) / len(processing_times) if processing_times else 0
        max_processing_time = max(processing_times) if processing_times else 0
        
        # FHIR compliance metrics
        fhir_compliances = [r.get('fhir_compliance', 0) for r in successful]
        avg_fhir_compliance = sum(fhir_compliances) / len(fhir_compliances) if fhir_compliances else 0
        
        # Mistral OCR compatibility analysis
        ocr_compatible = [r for r in self.results if r.get('mistral_ocr_compatible', False)]
        ocr_incompatible = [r for r in self.results if not r.get('mistral_ocr_compatible', False)]
        
        # File type breakdown
        file_types = {}
        for result in self.results:
            file_type = result.get('file_type', 'unknown')
            if file_type not in file_types:
                file_types[file_type] = {'total': 0, 'successful': 0, 'ocr_compatible': 0}
            file_types[file_type]['total'] += 1
            if result['status'] == 'success':
                file_types[file_type]['successful'] += 1
            if result.get('mistral_ocr_compatible', False):
                file_types[file_type]['ocr_compatible'] += 1
        
        # Performance against targets
        meets_success_target = success_rate >= self.targets['success_rate']
        meets_time_target = avg_processing_time <= self.targets['processing_time']
        meets_compliance_target = avg_fhir_compliance >= self.targets['fhir_compliance']
        
        all_targets_met = meets_success_target and meets_time_target and meets_compliance_target
        
        # Print detailed results
        print(f"πŸ“‹ Files Processed: {total_files}")
        print(f"βœ… Successful: {successful_count} ({success_rate:.1%})")
        print(f"❌ Failed: {failed_count}")
        print(f"⏱️  Average Processing Time: {avg_processing_time:.2f}s")
        print(f"πŸ” Maximum Processing Time: {max_processing_time:.2f}s")
        print(f"πŸ“Š Average FHIR Compliance: {avg_fhir_compliance:.1%}")
        print(f"πŸ• Total Test Time: {total_time:.2f}s")
        
        print(f"\nπŸ” Mistral OCR Compatibility Analysis:")
        print(f"   Compatible files: {len(ocr_compatible)}/{total_files} ({len(ocr_compatible)/total_files*100:.0f}%)")
        print(f"   Incompatible files: {len(ocr_incompatible)}/{total_files} ({len(ocr_incompatible)/total_files*100:.0f}%)")
        
        print(f"\nπŸ“‚ File Type Breakdown:")
        for file_type, stats in file_types.items():
            success_pct = stats['successful'] / stats['total'] * 100 if stats['total'] > 0 else 0
            ocr_pct = stats['ocr_compatible'] / stats['total'] * 100 if stats['total'] > 0 else 0
            print(f"   {file_type}: {stats['successful']}/{stats['total']} success ({success_pct:.0f}%) | OCR compatible: {stats['ocr_compatible']}/{stats['total']} ({ocr_pct:.0f}%)")
        
        print(f"\n🎯 Performance Targets:")
        print(f"   Success Rate: {success_rate:.1%} {'βœ…' if meets_success_target else '❌'} (target: {self.targets['success_rate']:.1%})")
        print(f"   Processing Time: {avg_processing_time:.2f}s {'βœ…' if meets_time_target else '❌'} (target: <{self.targets['processing_time']}s)")
        print(f"   FHIR Compliance: {avg_fhir_compliance:.1%} {'βœ…' if meets_compliance_target else '❌'} (target: {self.targets['fhir_compliance']:.1%})")
        
        print(f"\nπŸ” Mistral OCR Data Type Support:")
        print(f"   βœ… Images (PNG, JPG): Direct compatibility")
        print(f"   βœ… DICOM files: Compatible with preprocessing") 
        print(f"   βœ… PDF files: Compatible with image conversion")
        print(f"   ❌ Plain text: No OCR needed (process directly)")
        
        print(f"\nπŸ† Overall Result: {'βœ… ALL TARGETS MET' if all_targets_met else '❌ Some targets missed'}")
        
        # Show errors if any
        errors = [r for r in self.results if r['status'] == 'error']
        if errors:
            print(f"\n❌ Errors ({len(errors)}):")
            for error in errors[:5]:  # Show first 5 errors
                filename = os.path.basename(error['file_path'])
                print(f"   {filename}: {error['error']}")
            if len(errors) > 5:
                print(f"   ... and {len(errors) - 5} more errors")
        
        return {
            'total_files': total_files,
            'successful_count': successful_count,
            'failed_count': failed_count,
            'success_rate': success_rate,
            'avg_processing_time': avg_processing_time,
            'max_processing_time': max_processing_time,
            'avg_fhir_compliance': avg_fhir_compliance,
            'total_time': total_time,
            'file_types': file_types,
            'mistral_ocr_compatible_count': len(ocr_compatible),
            'mistral_ocr_incompatible_count': len(ocr_incompatible),
            'targets_met': {
                'success_rate': meets_success_target,
                'processing_time': meets_time_target,
                'fhir_compliance': meets_compliance_target,
                'all_targets': all_targets_met
            },
            'detailed_results': self.results
        }

async def main():
    """Main test function"""
    print(f"πŸ• Starting at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    
    # Check if DICOM is available
    if DICOM_AVAILABLE:
        print("βœ… DICOM processing available")
    else:
        print("⚠️  DICOM processing not available (install pydicom)")
    
    # Check Mistral OCR configuration
    mistral_enabled = os.getenv('USE_MISTRAL_FALLBACK', 'false').lower() == 'true'
    mistral_key = os.getenv('MISTRAL_API_KEY')
    
    print(f"πŸ” Mistral OCR Status:")
    print(f"   Enabled: {mistral_enabled}")
    print(f"   API Key: {'βœ… Set' if mistral_key else '❌ Missing'}")
    print(f"   Supported: Images, DICOM (preprocessed), PDF (converted)")
    print(f"   Not needed: Plain text files")
    
    # Run tests
    framework = MedicalFileTestFramework()
    
    try:
        results = await framework.run_batch_test(file_limit=15)
        
        if 'error' not in results:
            print(f"\nπŸ“‹ Summary:")
            print(f"   {results['successful_count']}/{results['total_files']} files processed successfully")
            print(f"   {results['mistral_ocr_compatible_count']} files compatible with Mistral OCR")
            print(f"   Average time: {results['avg_processing_time']:.2f}s per file")
            print(f"   FHIR compliance: {results['avg_fhir_compliance']:.1%}")
            
        print(f"\nπŸŽ‰ Medical file testing completed!")
        return 0
        
    except Exception as e:
        print(f"\nπŸ’₯ Testing failed: {e}")
        return 1

if __name__ == "__main__":
    exit_code = asyncio.run(main())
    sys.exit(exit_code)