|
|
|
""" |
|
π₯ FhirFlame Integrated Workflow Test |
|
Complete integration test: Mistral OCR β CodeLlama Agent β FHIR Generation |
|
""" |
|
|
|
import asyncio |
|
import os |
|
import sys |
|
import time |
|
import base64 |
|
from datetime import datetime |
|
|
|
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src')) |
|
|
|
from src.workflow_orchestrator import workflow_orchestrator |
|
from src.monitoring import monitor |
|
from src.fhir_validator import FhirValidator |
|
|
|
def create_medical_document_pdf_bytes() -> bytes: |
|
"""Create mock PDF document bytes for testing""" |
|
|
|
pdf_header = b'%PDF-1.4\n1 0 obj\n<<\n/Type /Catalog\n/Pages 2 0 R\n>>\nendobj\n2 0 obj\n<<\n/Type /Pages\n/Kids [3 0 R]\n/Count 1\n>>\nendobj\n3 0 obj\n<<\n/Type /Page\n/Parent 2 0 R\n/MediaBox [0 0 612 792]\n>>\nendobj\nxref\n0 4\n0000000000 65535 f \n0000000010 00000 n \n0000000079 00000 n \n0000000173 00000 n \ntrailer\n<<\n/Size 4\n/Root 1 0 R\n>>\nstartxref\n253\n%%EOF' |
|
return pdf_header |
|
|
|
def create_medical_image_bytes() -> bytes: |
|
"""Create mock medical image bytes for testing""" |
|
|
|
png_bytes = b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01\x08\x06\x00\x00\x00\x1f\x15\xc4\x89\x00\x00\x00\rIDATx\xdac\x00\x01\x00\x00\x05\x00\x01\r\n-\xdb\x00\x00\x00\x00IEND\xaeB`\x82' |
|
return png_bytes |
|
|
|
async def test_complete_workflow_integration(): |
|
"""Test complete workflow: Document OCR β Medical Analysis β FHIR Generation""" |
|
|
|
print("π₯ FhirFlame Complete Workflow Integration Test") |
|
print("=" * 60) |
|
print(f"π Starting at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") |
|
|
|
|
|
status = workflow_orchestrator.get_workflow_status() |
|
print(f"\nπ§ Workflow Configuration:") |
|
print(f" Mistral OCR: {'β
Enabled' if status['mistral_ocr_enabled'] else 'β Disabled'}") |
|
print(f" API Key: {'β
Set' if status['mistral_api_key_configured'] else 'β Missing'}") |
|
print(f" CodeLlama: {'β
Ready' if status['codellama_processor_ready'] else 'β Not Ready'}") |
|
print(f" Monitoring: {'β
Active' if status['monitoring_enabled'] else 'β Disabled'}") |
|
print(f" Pipeline: {' β '.join(status['workflow_components'])}") |
|
|
|
|
|
print(f"\nπ TEST CASE 1: Document OCR β Agent Workflow") |
|
print("-" * 50) |
|
|
|
try: |
|
document_bytes = create_medical_document_pdf_bytes() |
|
print(f"π Document: Medical report PDF ({len(document_bytes)} bytes)") |
|
|
|
start_time = time.time() |
|
|
|
|
|
result = await workflow_orchestrator.process_complete_workflow( |
|
document_bytes=document_bytes, |
|
user_id="test-integration-user", |
|
filename="medical_report.pdf", |
|
document_type="clinical_report" |
|
) |
|
|
|
processing_time = time.time() - start_time |
|
|
|
|
|
print(f"β
Workflow completed in {processing_time:.2f}s") |
|
print(f"π Processing pipeline: {result['workflow_metadata']['stages_completed']}") |
|
print(f"π OCR used: {result['workflow_metadata']['mistral_ocr_used']}") |
|
print(f"π Text extracted: {result['text_extraction']['full_text_length']} chars") |
|
print(f"π― Entities found: {result['medical_analysis']['entities_found']}") |
|
print(f"π Quality score: {result['medical_analysis']['quality_score']:.2f}") |
|
|
|
|
|
extraction_method = result['text_extraction']['extraction_method'] |
|
print(f"π¬ Extraction method: {extraction_method}") |
|
|
|
|
|
if result.get('fhir_validation'): |
|
fhir_validation = result['fhir_validation'] |
|
print(f"π FHIR validation: {'β
Valid' if fhir_validation['is_valid'] else 'β Invalid'}") |
|
print(f"π Compliance score: {fhir_validation['compliance_score']:.1%}") |
|
print(f"π¬ Validation level: {fhir_validation['validation_level']}") |
|
elif result.get('fhir_bundle'): |
|
|
|
validator = FhirValidator() |
|
fhir_validation = validator.validate_fhir_bundle(result['fhir_bundle']) |
|
print(f"π FHIR validation (fallback): {'β
Valid' if fhir_validation['is_valid'] else 'β Invalid'}") |
|
print(f"π Compliance score: {fhir_validation['compliance_score']:.1%}") |
|
|
|
|
|
if result['text_extraction']['extracted_text']: |
|
preview = result['text_extraction']['extracted_text'][:200] |
|
print(f"\nπ Extracted text preview:") |
|
print(f" {preview}...") |
|
|
|
except Exception as e: |
|
print(f"β Document workflow test failed: {e}") |
|
return False |
|
|
|
|
|
print(f"\nπ TEST CASE 2: Direct Text β Agent Workflow") |
|
print("-" * 50) |
|
|
|
try: |
|
medical_text = """ |
|
MEDICAL RECORD - PATIENT: SARAH JOHNSON |
|
DOB: 1985-03-15 | MRN: MR789456 |
|
|
|
CHIEF COMPLAINT: Follow-up for Type 2 Diabetes |
|
|
|
CURRENT MEDICATIONS: |
|
- Metformin 1000mg twice daily |
|
- Glipizide 5mg once daily |
|
- Lisinopril 10mg daily for hypertension |
|
|
|
VITAL SIGNS: |
|
- Blood Pressure: 135/82 mmHg |
|
- Weight: 172 lbs |
|
- HbA1c: 7.2% |
|
|
|
ASSESSMENT: Type 2 Diabetes - needs optimization |
|
PLAN: Increase Metformin to 1500mg twice daily |
|
""" |
|
|
|
start_time = time.time() |
|
|
|
result = await workflow_orchestrator.process_complete_workflow( |
|
medical_text=medical_text, |
|
user_id="test-text-user", |
|
document_type="follow_up_note" |
|
) |
|
|
|
processing_time = time.time() - start_time |
|
|
|
print(f"β
Text workflow completed in {processing_time:.2f}s") |
|
print(f"π OCR used: {result['workflow_metadata']['mistral_ocr_used']}") |
|
print(f"π― Entities found: {result['medical_analysis']['entities_found']}") |
|
print(f"π Quality score: {result['medical_analysis']['quality_score']:.2f}") |
|
|
|
|
|
if not result['workflow_metadata']['mistral_ocr_used']: |
|
print("β
Correctly bypassed OCR for direct text input") |
|
else: |
|
print("β οΈ OCR was unexpectedly used for direct text") |
|
|
|
except Exception as e: |
|
print(f"β Text workflow test failed: {e}") |
|
return False |
|
|
|
|
|
print(f"\nπΌοΈ TEST CASE 3: Medical Image β OCR β Agent Workflow") |
|
print("-" * 50) |
|
|
|
try: |
|
image_bytes = create_medical_image_bytes() |
|
print(f"πΌοΈ Document: Medical image PNG ({len(image_bytes)} bytes)") |
|
|
|
start_time = time.time() |
|
|
|
result = await workflow_orchestrator.process_medical_document_with_ocr( |
|
document_bytes=image_bytes, |
|
user_id="test-image-user", |
|
filename="lab_report.png" |
|
) |
|
|
|
processing_time = time.time() - start_time |
|
|
|
print(f"β
Image workflow completed in {processing_time:.2f}s") |
|
print(f"π OCR processing: {result['workflow_metadata']['mistral_ocr_used']}") |
|
print(f"π Pipeline: {' β '.join(result['workflow_metadata']['stages_completed'])}") |
|
|
|
|
|
medical_metadata = result['medical_analysis'].get('model_used', 'Unknown') |
|
print(f"π€ Medical AI model: {medical_metadata}") |
|
|
|
if 'source_metadata' in result.get('medical_analysis', {}): |
|
print("β
OCR metadata properly passed to medical analysis") |
|
|
|
except Exception as e: |
|
print(f"β Image workflow test failed: {e}") |
|
return False |
|
|
|
return True |
|
|
|
async def test_workflow_error_handling(): |
|
"""Test workflow error handling and fallbacks""" |
|
|
|
print(f"\nπ οΈ TESTING ERROR HANDLING & FALLBACKS") |
|
print("-" * 50) |
|
|
|
try: |
|
|
|
invalid_bytes = b'invalid document content' |
|
|
|
result = await workflow_orchestrator.process_complete_workflow( |
|
document_bytes=invalid_bytes, |
|
user_id="test-error-user", |
|
filename="invalid.doc" |
|
) |
|
|
|
print(f"β
Error handling test: Processed with fallback") |
|
print(f"π Fallback mode: {result['text_extraction']['extraction_method']}") |
|
|
|
except Exception as e: |
|
print(f"β οΈ Error handling test: {e}") |
|
|
|
return True |
|
|
|
async def main(): |
|
"""Main test execution""" |
|
|
|
try: |
|
|
|
print("π Starting comprehensive workflow integration tests...") |
|
|
|
|
|
integration_success = await test_complete_workflow_integration() |
|
|
|
|
|
error_handling_success = await test_workflow_error_handling() |
|
|
|
|
|
print(f"\nπ― INTEGRATION TEST SUMMARY") |
|
print("=" * 60) |
|
print(f"β
Workflow Integration: {'PASSED' if integration_success else 'FAILED'}") |
|
print(f"β
Error Handling: {'PASSED' if error_handling_success else 'FAILED'}") |
|
|
|
|
|
if monitor.langfuse: |
|
print(f"\nπ Langfuse Monitoring Summary:") |
|
print(f" Session ID: {monitor.session_id}") |
|
print(f" Events logged: β
") |
|
print(f" Workflow traces: β
") |
|
|
|
success = integration_success and error_handling_success |
|
|
|
if success: |
|
print(f"\nπ All integration tests PASSED!") |
|
print(f"β
Mistral OCR output is properly integrated with agent workflow") |
|
return 0 |
|
else: |
|
print(f"\nπ₯ Some integration tests FAILED!") |
|
return 1 |
|
|
|
except Exception as e: |
|
print(f"\nπ₯ Integration test suite failed: {e}") |
|
return 1 |
|
|
|
if __name__ == "__main__": |
|
exit_code = asyncio.run(main()) |
|
sys.exit(exit_code) |