|
|
|
""" |
|
Comprehensive Batch Processing Demo Analysis |
|
Deep analysis of Modal scaling implementation and batch processing capabilities |
|
""" |
|
|
|
import asyncio |
|
import sys |
|
import os |
|
import time |
|
import json |
|
from datetime import datetime |
|
|
|
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'fhirflame', 'src')) |
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'fhirflame')) |
|
|
|
def test_heavy_workload_demo_import(): |
|
"""Test 1: Heavy Workload Demo Import and Initialization""" |
|
print("π TEST 1: Heavy Workload Demo Import") |
|
print("-" * 50) |
|
|
|
try: |
|
from fhirflame.src.heavy_workload_demo import ModalContainerScalingDemo, RealTimeBatchProcessor |
|
print("β
Successfully imported ModalContainerScalingDemo") |
|
print("β
Successfully imported RealTimeBatchProcessor") |
|
|
|
|
|
demo = ModalContainerScalingDemo() |
|
processor = RealTimeBatchProcessor() |
|
|
|
print(f"β
Modal demo initialized with {len(demo.regions)} regions") |
|
print(f"β
Batch processor initialized with {len(processor.medical_datasets)} datasets") |
|
|
|
|
|
print(f" Scaling tiers: {len(demo.scaling_tiers)}") |
|
print(f" Workload configs: {len(demo.workload_configs)}") |
|
print(f" Default region: {demo.default_region}") |
|
|
|
return True, demo, processor |
|
|
|
except Exception as e: |
|
print(f"β Heavy workload demo import failed: {e}") |
|
import traceback |
|
traceback.print_exc() |
|
return False, None, None |
|
|
|
async def test_modal_scaling_simulation(demo): |
|
"""Test 2: Modal Container Scaling Simulation""" |
|
print("\nπ TEST 2: Modal Container Scaling Simulation") |
|
print("-" * 50) |
|
|
|
try: |
|
|
|
result = await demo.start_modal_scaling_demo() |
|
print(f"β
Modal scaling demo started: {result}") |
|
|
|
|
|
print("π Running Modal scaling simulation for 10 seconds...") |
|
await asyncio.sleep(10) |
|
|
|
|
|
stats = demo.get_demo_statistics() |
|
print(f"π Demo Status: {stats['demo_status']}") |
|
print(f"π Active Containers: {stats['active_containers']}") |
|
print(f"β‘ Requests/sec: {stats['requests_per_second']}") |
|
print(f"π¦ Total Processed: {stats['total_requests_processed']}") |
|
print(f"π Concurrent Requests: {stats['concurrent_requests']}") |
|
print(f"π° Cost per Request: {stats['cost_per_request']}") |
|
print(f"π― Scaling Strategy: {stats['scaling_strategy']}") |
|
|
|
|
|
containers = demo.get_container_details() |
|
print(f"π Container Details: {len(containers)} containers active") |
|
|
|
if containers: |
|
print(" Top 3 Container Details:") |
|
for i, container in enumerate(containers[:3]): |
|
print(f" [{i+1}] {container['Container ID']}: {container['Status']} - {container['Requests/sec']} RPS") |
|
|
|
|
|
demo.stop_demo() |
|
print("β
Modal scaling demo stopped successfully") |
|
|
|
return True |
|
|
|
except Exception as e: |
|
print(f"β Modal scaling simulation failed: {e}") |
|
import traceback |
|
traceback.print_exc() |
|
return False |
|
|
|
def test_batch_processor_datasets(processor): |
|
"""Test 3: Batch Processor Medical Datasets""" |
|
print("\nπ TEST 3: Batch Processor Medical Datasets") |
|
print("-" * 50) |
|
|
|
try: |
|
datasets = processor.medical_datasets |
|
|
|
for dataset_name, documents in datasets.items(): |
|
print(f"π Dataset: {dataset_name}") |
|
print(f" Documents: {len(documents)}") |
|
print(f" Avg length: {sum(len(doc) for doc in documents) // len(documents)} chars") |
|
|
|
|
|
if documents: |
|
sample = documents[0][:100].replace('\n', ' ').strip() |
|
print(f" Sample: {sample}...") |
|
|
|
print("β
All medical datasets validated") |
|
return True |
|
|
|
except Exception as e: |
|
print(f"β Batch processor dataset test failed: {e}") |
|
return False |
|
|
|
async def test_real_time_batch_processing(processor): |
|
"""Test 4: Real-Time Batch Processing""" |
|
print("\nπ TEST 4: Real-Time Batch Processing") |
|
print("-" * 50) |
|
|
|
try: |
|
|
|
workflows_to_test = [ |
|
("clinical_fhir", 3), |
|
("lab_entities", 2), |
|
("mixed_workflow", 2) |
|
] |
|
|
|
results = {} |
|
|
|
for workflow_type, batch_size in workflows_to_test: |
|
print(f"\n㪠Testing workflow: {workflow_type} (batch size: {batch_size})") |
|
|
|
|
|
success = processor.start_processing(workflow_type, batch_size) |
|
|
|
if not success: |
|
print(f"β Failed to start processing for {workflow_type}") |
|
continue |
|
|
|
|
|
start_time = time.time() |
|
while processor.processing: |
|
status = processor.get_status() |
|
if status['status'] == 'processing': |
|
print(f" Progress: {status['progress']:.1f}% - {status['processed']}/{status['total']}") |
|
await asyncio.sleep(2) |
|
elif status['status'] == 'completed': |
|
break |
|
else: |
|
break |
|
|
|
|
|
if time.time() - start_time > 30: |
|
processor.stop_processing() |
|
break |
|
|
|
|
|
final_status = processor.get_status() |
|
results[workflow_type] = final_status |
|
|
|
if final_status['status'] == 'completed': |
|
print(f"β
{workflow_type} completed: {final_status['processed']} documents") |
|
print(f" Total time: {final_status['total_time']:.2f}s") |
|
else: |
|
print(f"β οΈ {workflow_type} did not complete fully") |
|
|
|
print(f"\nπ Batch Processing Summary:") |
|
for workflow, result in results.items(): |
|
status = result.get('status', 'unknown') |
|
processed = result.get('processed', 0) |
|
total_time = result.get('total_time', 0) |
|
print(f" {workflow}: {status} - {processed} docs in {total_time:.2f}s") |
|
|
|
return True |
|
|
|
except Exception as e: |
|
print(f"β Real-time batch processing test failed: {e}") |
|
import traceback |
|
traceback.print_exc() |
|
return False |
|
|
|
def test_modal_integration_components(): |
|
"""Test 5: Modal Integration Components""" |
|
print("\nπ TEST 5: Modal Integration Components") |
|
print("-" * 50) |
|
|
|
try: |
|
|
|
try: |
|
from fhirflame.cloud_modal.functions import calculate_real_modal_cost |
|
print("β
Modal functions imported successfully") |
|
|
|
|
|
cost_1s = calculate_real_modal_cost(1.0, "L4") |
|
cost_10s = calculate_real_modal_cost(10.0, "L4") |
|
|
|
print(f" L4 GPU cost (1s): ${cost_1s:.6f}") |
|
print(f" L4 GPU cost (10s): ${cost_10s:.6f}") |
|
|
|
if cost_10s > cost_1s: |
|
print("β
Cost calculation scaling works correctly") |
|
else: |
|
print("β οΈ Cost calculation may have issues") |
|
|
|
except ImportError as e: |
|
print(f"β οΈ Modal functions not available: {e}") |
|
|
|
|
|
try: |
|
from fhirflame.modal_deployments.fhirflame_modal_app import app, GPU_CONFIGS |
|
print("β
Modal deployment app imported successfully") |
|
print(f" GPU configs available: {list(GPU_CONFIGS.keys())}") |
|
|
|
except ImportError as e: |
|
print(f"β οΈ Modal deployment not available: {e}") |
|
|
|
|
|
try: |
|
from fhirflame.src.enhanced_codellama_processor import EnhancedCodeLlamaProcessor |
|
processor = EnhancedCodeLlamaProcessor() |
|
print("β
Enhanced CodeLlama processor initialized") |
|
print(f" Modal available: {processor.router.modal_available}") |
|
print(f" Ollama available: {processor.router.ollama_available}") |
|
print(f" HuggingFace available: {processor.router.hf_available}") |
|
|
|
except Exception as e: |
|
print(f"β οΈ Enhanced CodeLlama processor issues: {e}") |
|
|
|
return True |
|
|
|
except Exception as e: |
|
print(f"β Modal integration test failed: {e}") |
|
return False |
|
|
|
def test_frontend_integration(): |
|
"""Test 6: Frontend Integration""" |
|
print("\nπ TEST 6: Frontend Integration") |
|
print("-" * 50) |
|
|
|
try: |
|
from fhirflame.frontend_ui import heavy_workload_demo, batch_processor |
|
print("β
Frontend UI integration working") |
|
|
|
|
|
if heavy_workload_demo is not None: |
|
print("β
Heavy workload demo available in frontend") |
|
else: |
|
print("β οΈ Heavy workload demo not properly initialized in frontend") |
|
|
|
if batch_processor is not None: |
|
print("β
Batch processor available in frontend") |
|
else: |
|
print("β οΈ Batch processor not properly initialized in frontend") |
|
|
|
return True |
|
|
|
except Exception as e: |
|
print(f"β Frontend integration test failed: {e}") |
|
return False |
|
|
|
async def main(): |
|
"""Main comprehensive test execution""" |
|
print("π₯ FHIRFLAME BATCH PROCESSING COMPREHENSIVE ANALYSIS") |
|
print("=" * 60) |
|
print(f"π Starting at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") |
|
print() |
|
|
|
|
|
test_results = {} |
|
|
|
|
|
success, demo, processor = test_heavy_workload_demo_import() |
|
test_results["Heavy Workload Demo Import"] = success |
|
|
|
if not success: |
|
print("β Critical import failure - cannot continue with tests") |
|
return 1 |
|
|
|
|
|
if demo: |
|
success = await test_modal_scaling_simulation(demo) |
|
test_results["Modal Scaling Simulation"] = success |
|
|
|
|
|
if processor: |
|
success = test_batch_processor_datasets(processor) |
|
test_results["Batch Processor Datasets"] = success |
|
|
|
|
|
if processor: |
|
success = await test_real_time_batch_processing(processor) |
|
test_results["Real-Time Batch Processing"] = success |
|
|
|
|
|
success = test_modal_integration_components() |
|
test_results["Modal Integration Components"] = success |
|
|
|
|
|
success = test_frontend_integration() |
|
test_results["Frontend Integration"] = success |
|
|
|
|
|
print("\n" + "=" * 60) |
|
print("π COMPREHENSIVE ANALYSIS RESULTS") |
|
print("=" * 60) |
|
|
|
passed = sum(1 for result in test_results.values() if result) |
|
total = len(test_results) |
|
|
|
for test_name, result in test_results.items(): |
|
status = "β
PASS" if result else "β FAIL" |
|
print(f"{test_name}: {status}") |
|
|
|
print(f"\nOverall Score: {passed}/{total} tests passed ({passed/total*100:.1f}%)") |
|
|
|
|
|
print(f"\nπ― BATCH PROCESSING IMPLEMENTATION ANALYSIS:") |
|
print(f"=" * 60) |
|
|
|
if passed >= total * 0.8: |
|
print("π EXCELLENT: Batch processing implementation is comprehensive and working") |
|
print("β
Modal scaling demo is properly implemented") |
|
print("β
Real-time batch processing is functional") |
|
print("β
Integration between components is solid") |
|
print("β
Frontend integration is working") |
|
print("\nπ READY FOR PRODUCTION DEMONSTRATION") |
|
elif passed >= total * 0.6: |
|
print("π GOOD: Batch processing implementation is mostly working") |
|
print("β
Core functionality is implemented") |
|
print("β οΈ Some integration issues may exist") |
|
print("\nπ§ MINOR FIXES RECOMMENDED") |
|
else: |
|
print("β οΈ ISSUES DETECTED: Batch processing implementation needs attention") |
|
print("β Critical components may not be working properly") |
|
print("β Integration issues present") |
|
print("\nπ οΈ SIGNIFICANT FIXES REQUIRED") |
|
|
|
print(f"\nπ RECOMMENDATIONS:") |
|
|
|
if not test_results.get("Modal Scaling Simulation", True): |
|
print("- Fix Modal container scaling simulation") |
|
|
|
if not test_results.get("Real-Time Batch Processing", True): |
|
print("- Debug real-time batch processing workflow") |
|
|
|
if not test_results.get("Modal Integration Components", True): |
|
print("- Ensure Modal integration components are properly configured") |
|
|
|
if not test_results.get("Frontend Integration", True): |
|
print("- Fix frontend UI integration issues") |
|
|
|
print(f"\nπ Analysis completed at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") |
|
|
|
return 0 if passed >= total * 0.8 else 1 |
|
|
|
if __name__ == "__main__": |
|
try: |
|
exit_code = asyncio.run(main()) |
|
sys.exit(exit_code) |
|
except KeyboardInterrupt: |
|
print("\nπ Analysis interrupted by user") |
|
sys.exit(1) |
|
except Exception as e: |
|
print(f"\nπ₯ Analysis failed with error: {e}") |
|
import traceback |
|
traceback.print_exc() |
|
sys.exit(1) |