#!/usr/bin/env python3 """ Quick test to verify batch processing fixes Tests the threading/asyncio conflict resolution """ import sys import os import time import asyncio # Add src to path for imports sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src')) sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) def test_batch_processing_fix(): """Test the fixed batch processing implementation""" print("šŸ” TESTING BATCH PROCESSING FIXES") print("=" * 50) try: from src.heavy_workload_demo import RealTimeBatchProcessor print("āœ… Successfully imported RealTimeBatchProcessor") # Initialize processor processor = RealTimeBatchProcessor() print("āœ… Processor initialized successfully") # Test 1: Check datasets are available print(f"\nšŸ“‹ Available datasets: {len(processor.medical_datasets)}") for name, docs in processor.medical_datasets.items(): print(f" {name}: {len(docs)} documents") # Test 2: Start small batch processing test print(f"\nšŸ”¬ Starting test batch processing (3 documents)...") success = processor.start_processing( workflow_type="clinical_fhir", batch_size=3, progress_callback=None ) if success: print("āœ… Batch processing started successfully") # Monitor progress for 15 seconds for i in range(15): status = processor.get_status() print(f"Status: {status['status']} - {status.get('processed', 0)}/{status.get('total', 0)}") if status['status'] in ['completed', 'cancelled']: break time.sleep(1) # Final status final_status = processor.get_status() print(f"\nšŸ“Š Final Status: {final_status['status']}") print(f" Processed: {final_status.get('processed', 0)}/{final_status.get('total', 0)}") print(f" Results: {len(final_status.get('results', []))}") if final_status['status'] == 'completed': print("šŸŽ‰ Batch processing completed successfully!") print("āœ… Threading/AsyncIO conflict RESOLVED") else: processor.stop_processing() print("āš ļø Processing didn't complete in test time - but no threading errors!") else: print("āŒ Failed to start batch processing") return False return True except Exception as e: print(f"āŒ Test failed with error: {e}") import traceback traceback.print_exc() return False def test_frontend_integration(): """Test frontend timer integration""" print(f"\nšŸŽ® TESTING FRONTEND INTEGRATION") print("=" * 50) try: from frontend_ui import update_batch_status_realtime, create_empty_results_summary print("āœ… Successfully imported frontend functions") # Test empty status status, log, results = update_batch_status_realtime() print(f"āœ… Real-time status function works: {status[:30]}...") # Test empty results empty_results = create_empty_results_summary() print(f"āœ… Empty results structure: {list(empty_results.keys())}") return True except Exception as e: print(f"āŒ Frontend test failed: {e}") return False if __name__ == "__main__": print("šŸ”„ FHIRFLAME BATCH PROCESSING FIX VERIFICATION") print("=" * 60) # Run tests batch_test = test_batch_processing_fix() frontend_test = test_frontend_integration() print(f"\n" + "=" * 60) print("šŸ“‹ TEST RESULTS SUMMARY") print("=" * 60) print(f"Batch Processing Fix: {'āœ… PASS' if batch_test else 'āŒ FAIL'}") print(f"Frontend Integration: {'āœ… PASS' if frontend_test else 'āŒ FAIL'}") if batch_test and frontend_test: print(f"\nšŸŽ‰ ALL TESTS PASSED!") print("āœ… Threading/AsyncIO conflicts resolved") print("āœ… Real-time UI updates implemented") print("āœ… Batch processing should now work correctly") print("\nšŸš€ Ready to test in the UI!") else: print(f"\nāš ļø Some tests failed - check implementation") print(f"\nTo test in UI:") print(f"1. Start the app: python app.py") print(f"2. Go to 'Batch Processing Demo' tab") print(f"3. Set batch size to 5-10 documents") print(f"4. Click 'Start Live Processing'") print(f"5. Watch for real-time progress updates every 2 seconds")