File size: 4,969 Bytes
a963d65 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
#!/usr/bin/env python3
"""
Test the Processing Queue Implementation
Quick test to verify the processing queue interface works
"""
import sys
import os
sys.path.append(os.path.join(os.path.dirname(__file__), 'src'))
def test_processing_queue():
"""Test the processing queue functionality"""
print("π§ͺ Testing Processing Queue Implementation...")
try:
# Import the processing queue components
from frontend_ui import ProcessingQueue, ProcessingJob, processing_queue
print("β
Successfully imported processing queue components")
# Test queue initialization
assert len(processing_queue.jobs) > 0, "Queue should have demo data"
print(f"β
Queue initialized with {len(processing_queue.jobs)} demo jobs")
# Test adding a new job
test_job = processing_queue.add_job("test_document.pdf", "Text Processing")
assert test_job.document_name == "test_document.pdf"
print("β
Successfully added new job to queue")
# Test updating job completion
processing_queue.update_job(test_job, True, "Test AI Model", 5)
assert test_job.success == True
assert test_job.entities_found == 5
print("β
Successfully updated job completion status")
# Test getting queue as DataFrame
df = processing_queue.get_queue_dataframe()
assert len(df) > 0, "DataFrame should have data"
print(f"β
Successfully generated DataFrame with {len(df)} rows")
# Test getting session statistics
stats = processing_queue.get_session_statistics()
assert "total_processed" in stats
assert "avg_processing_time" in stats
print("β
Successfully generated session statistics")
print("\nπ All processing queue tests passed!")
return True
except Exception as e:
print(f"β Processing queue test failed: {e}")
import traceback
traceback.print_exc()
return False
def test_gradio_interface():
"""Test that the Gradio interface can be created"""
print("\nπ¨ Testing Gradio Interface Creation...")
try:
import gradio as gr
from frontend_ui import create_processing_queue_tab
# Test creating the processing queue tab
with gr.Blocks() as test_interface:
queue_components = create_processing_queue_tab()
assert "queue_df" in queue_components
assert "stats_json" in queue_components
print("β
Successfully created processing queue Gradio interface")
return True
except Exception as e:
print(f"β Gradio interface test failed: {e}")
import traceback
traceback.print_exc()
return False
def test_integration_functions():
"""Test the workflow integration functions"""
print("\nπ Testing Workflow Integration...")
try:
from frontend_ui import integrate_with_workflow, complete_workflow_job
# Test integration
job = integrate_with_workflow("integration_test.txt", "Integration Test")
assert job.document_name == "integration_test.txt"
print("β
Successfully integrated with workflow")
# Test completion
complete_workflow_job(job, True, "Integration AI", 10)
assert job.success == True
print("β
Successfully completed workflow job")
return True
except Exception as e:
print(f"β Integration test failed: {e}")
import traceback
traceback.print_exc()
return False
def main():
"""Run all tests"""
print("π₯ FhirFlame Processing Queue Test Suite")
print("=" * 50)
tests = [
("Processing Queue Core", test_processing_queue),
("Gradio Interface", test_gradio_interface),
("Workflow Integration", test_integration_functions)
]
passed = 0
total = len(tests)
for test_name, test_func in tests:
print(f"\nπ§ͺ Running {test_name}...")
try:
if test_func():
passed += 1
print(f"β
{test_name} passed")
else:
print(f"β {test_name} failed")
except Exception as e:
print(f"β {test_name} failed with exception: {e}")
print(f"\nπ Test Results: {passed}/{total} tests passed")
if passed == total:
print("π All tests passed! Processing Queue is ready!")
print("\nπ To see the processing queue in action:")
print(" 1. Run: python app.py")
print(" 2. Navigate to the 'π Processing Queue' tab")
print(" 3. Click 'Add Demo Job' to see real-time updates")
return 0
else:
print("β Some tests failed. Check the output above for details.")
return 1
if __name__ == "__main__":
exit(main()) |