File size: 10,351 Bytes
a963d65 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 |
#!/usr/bin/env python3
"""
π₯ FhirFlame Integrated Workflow Test
Complete integration test: Mistral OCR β CodeLlama Agent β FHIR Generation
"""
import asyncio
import os
import sys
import time
import base64
from datetime import datetime
# Add src to path (from tests directory)
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src'))
from src.workflow_orchestrator import workflow_orchestrator
from src.monitoring import monitor
from src.fhir_validator import FhirValidator
def create_medical_document_pdf_bytes() -> bytes:
"""Create mock PDF document bytes for testing"""
# This is a minimal PDF header - in real scenarios this would be actual PDF bytes
pdf_header = b'%PDF-1.4\n1 0 obj\n<<\n/Type /Catalog\n/Pages 2 0 R\n>>\nendobj\n2 0 obj\n<<\n/Type /Pages\n/Kids [3 0 R]\n/Count 1\n>>\nendobj\n3 0 obj\n<<\n/Type /Page\n/Parent 2 0 R\n/MediaBox [0 0 612 792]\n>>\nendobj\nxref\n0 4\n0000000000 65535 f \n0000000010 00000 n \n0000000079 00000 n \n0000000173 00000 n \ntrailer\n<<\n/Size 4\n/Root 1 0 R\n>>\nstartxref\n253\n%%EOF'
return pdf_header
def create_medical_image_bytes() -> bytes:
"""Create mock medical image bytes for testing"""
# Simple PNG header for a 1x1 pixel image
png_bytes = b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01\x08\x06\x00\x00\x00\x1f\x15\xc4\x89\x00\x00\x00\rIDATx\xdac\x00\x01\x00\x00\x05\x00\x01\r\n-\xdb\x00\x00\x00\x00IEND\xaeB`\x82'
return png_bytes
async def test_complete_workflow_integration():
"""Test complete workflow: Document OCR β Medical Analysis β FHIR Generation"""
print("π₯ FhirFlame Complete Workflow Integration Test")
print("=" * 60)
print(f"π Starting at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# Check workflow status
status = workflow_orchestrator.get_workflow_status()
print(f"\nπ§ Workflow Configuration:")
print(f" Mistral OCR: {'β
Enabled' if status['mistral_ocr_enabled'] else 'β Disabled'}")
print(f" API Key: {'β
Set' if status['mistral_api_key_configured'] else 'β Missing'}")
print(f" CodeLlama: {'β
Ready' if status['codellama_processor_ready'] else 'β Not Ready'}")
print(f" Monitoring: {'β
Active' if status['monitoring_enabled'] else 'β Disabled'}")
print(f" Pipeline: {' β '.join(status['workflow_components'])}")
# Test Case 1: Document with OCR Processing
print(f"\nπ TEST CASE 1: Document OCR β Agent Workflow")
print("-" * 50)
try:
document_bytes = create_medical_document_pdf_bytes()
print(f"π Document: Medical report PDF ({len(document_bytes)} bytes)")
start_time = time.time()
# Process complete workflow
result = await workflow_orchestrator.process_complete_workflow(
document_bytes=document_bytes,
user_id="test-integration-user",
filename="medical_report.pdf",
document_type="clinical_report"
)
processing_time = time.time() - start_time
# Display results
print(f"β
Workflow completed in {processing_time:.2f}s")
print(f"π Processing pipeline: {result['workflow_metadata']['stages_completed']}")
print(f"π OCR used: {result['workflow_metadata']['mistral_ocr_used']}")
print(f"π Text extracted: {result['text_extraction']['full_text_length']} chars")
print(f"π― Entities found: {result['medical_analysis']['entities_found']}")
print(f"π Quality score: {result['medical_analysis']['quality_score']:.2f}")
# Show extraction method
extraction_method = result['text_extraction']['extraction_method']
print(f"π¬ Extraction method: {extraction_method}")
# Display FHIR validation results from workflow
if result.get('fhir_validation'):
fhir_validation = result['fhir_validation']
print(f"π FHIR validation: {'β
Valid' if fhir_validation['is_valid'] else 'β Invalid'}")
print(f"π Compliance score: {fhir_validation['compliance_score']:.1%}")
print(f"π¬ Validation level: {fhir_validation['validation_level']}")
elif result.get('fhir_bundle'):
# Fallback validation if not done in workflow
validator = FhirValidator()
fhir_validation = validator.validate_fhir_bundle(result['fhir_bundle'])
print(f"π FHIR validation (fallback): {'β
Valid' if fhir_validation['is_valid'] else 'β Invalid'}")
print(f"π Compliance score: {fhir_validation['compliance_score']:.1%}")
# Display extracted text preview
if result['text_extraction']['extracted_text']:
preview = result['text_extraction']['extracted_text'][:200]
print(f"\nπ Extracted text preview:")
print(f" {preview}...")
except Exception as e:
print(f"β Document workflow test failed: {e}")
return False
# Test Case 2: Direct Text Processing
print(f"\nπ TEST CASE 2: Direct Text β Agent Workflow")
print("-" * 50)
try:
medical_text = """
MEDICAL RECORD - PATIENT: SARAH JOHNSON
DOB: 1985-03-15 | MRN: MR789456
CHIEF COMPLAINT: Follow-up for Type 2 Diabetes
CURRENT MEDICATIONS:
- Metformin 1000mg twice daily
- Glipizide 5mg once daily
- Lisinopril 10mg daily for hypertension
VITAL SIGNS:
- Blood Pressure: 135/82 mmHg
- Weight: 172 lbs
- HbA1c: 7.2%
ASSESSMENT: Type 2 Diabetes - needs optimization
PLAN: Increase Metformin to 1500mg twice daily
"""
start_time = time.time()
result = await workflow_orchestrator.process_complete_workflow(
medical_text=medical_text,
user_id="test-text-user",
document_type="follow_up_note"
)
processing_time = time.time() - start_time
print(f"β
Text workflow completed in {processing_time:.2f}s")
print(f"π OCR used: {result['workflow_metadata']['mistral_ocr_used']}")
print(f"π― Entities found: {result['medical_analysis']['entities_found']}")
print(f"π Quality score: {result['medical_analysis']['quality_score']:.2f}")
# Check that OCR was NOT used for direct text
if not result['workflow_metadata']['mistral_ocr_used']:
print("β
Correctly bypassed OCR for direct text input")
else:
print("β οΈ OCR was unexpectedly used for direct text")
except Exception as e:
print(f"β Text workflow test failed: {e}")
return False
# Test Case 3: Image Document Processing
print(f"\nπΌοΈ TEST CASE 3: Medical Image β OCR β Agent Workflow")
print("-" * 50)
try:
image_bytes = create_medical_image_bytes()
print(f"πΌοΈ Document: Medical image PNG ({len(image_bytes)} bytes)")
start_time = time.time()
result = await workflow_orchestrator.process_medical_document_with_ocr(
document_bytes=image_bytes,
user_id="test-image-user",
filename="lab_report.png"
)
processing_time = time.time() - start_time
print(f"β
Image workflow completed in {processing_time:.2f}s")
print(f"π OCR processing: {result['workflow_metadata']['mistral_ocr_used']}")
print(f"π Pipeline: {' β '.join(result['workflow_metadata']['stages_completed'])}")
# Check integration metadata
medical_metadata = result['medical_analysis'].get('model_used', 'Unknown')
print(f"π€ Medical AI model: {medical_metadata}")
if 'source_metadata' in result.get('medical_analysis', {}):
print("β
OCR metadata properly passed to medical analysis")
except Exception as e:
print(f"β Image workflow test failed: {e}")
return False
return True
async def test_workflow_error_handling():
"""Test workflow error handling and fallbacks"""
print(f"\nπ οΈ TESTING ERROR HANDLING & FALLBACKS")
print("-" * 50)
try:
# Test with invalid document bytes
invalid_bytes = b'invalid document content'
result = await workflow_orchestrator.process_complete_workflow(
document_bytes=invalid_bytes,
user_id="test-error-user",
filename="invalid.doc"
)
print(f"β
Error handling test: Processed with fallback")
print(f"π Fallback mode: {result['text_extraction']['extraction_method']}")
except Exception as e:
print(f"β οΈ Error handling test: {e}")
return True
async def main():
"""Main test execution"""
try:
# Run integration tests
print("π Starting comprehensive workflow integration tests...")
# Test 1: Complete workflow integration
integration_success = await test_complete_workflow_integration()
# Test 2: Error handling
error_handling_success = await test_workflow_error_handling()
# Summary
print(f"\nπ― INTEGRATION TEST SUMMARY")
print("=" * 60)
print(f"β
Workflow Integration: {'PASSED' if integration_success else 'FAILED'}")
print(f"β
Error Handling: {'PASSED' if error_handling_success else 'FAILED'}")
# Check monitoring
if monitor.langfuse:
print(f"\nπ Langfuse Monitoring Summary:")
print(f" Session ID: {monitor.session_id}")
print(f" Events logged: β
")
print(f" Workflow traces: β
")
success = integration_success and error_handling_success
if success:
print(f"\nπ All integration tests PASSED!")
print(f"β
Mistral OCR output is properly integrated with agent workflow")
return 0
else:
print(f"\nπ₯ Some integration tests FAILED!")
return 1
except Exception as e:
print(f"\nπ₯ Integration test suite failed: {e}")
return 1
if __name__ == "__main__":
exit_code = asyncio.run(main())
sys.exit(exit_code) |