|
|
|
""" |
|
Test script to verify job cancellation and task management fixes |
|
""" |
|
|
|
import sys |
|
import time |
|
import asyncio |
|
from unittest.mock import Mock, patch |
|
|
|
|
|
sys.path.insert(0, '.') |
|
|
|
def test_cancellation_mechanism(): |
|
"""Test the enhanced cancellation mechanism""" |
|
print("🧪 Testing Job Cancellation and Task Queue Management") |
|
print("=" * 60) |
|
|
|
try: |
|
|
|
import app |
|
|
|
|
|
print("\n1️⃣ Testing basic cancellation flags...") |
|
|
|
|
|
app.cancellation_flags["text_task"] = False |
|
app.running_tasks["text_task"] = None |
|
app.active_jobs["text_task"] = None |
|
|
|
print(f" Initial cancellation flag: {app.cancellation_flags['text_task']}") |
|
print(f" Initial running task: {app.running_tasks['text_task']}") |
|
print(f" Initial active job: {app.active_jobs['text_task']}") |
|
|
|
|
|
print("\n2️⃣ Testing job creation and tracking...") |
|
|
|
|
|
job_id = app.job_manager.add_processing_job("text", "Test medical text", {"test": True}) |
|
app.active_jobs["text_task"] = job_id |
|
|
|
print(f" Created job ID: {job_id}") |
|
print(f" Active tasks count: {app.job_manager.dashboard_state['active_tasks']}") |
|
print(f" Active job tracking: {app.active_jobs['text_task']}") |
|
|
|
|
|
print("\n3️⃣ Testing cancel_current_task function...") |
|
|
|
|
|
mock_task = Mock() |
|
app.running_tasks["text_task"] = mock_task |
|
|
|
|
|
result = app.cancel_current_task("text_task") |
|
|
|
print(f" Cancel result: {result}") |
|
print(f" Cancellation flag after cancel: {app.cancellation_flags['text_task']}") |
|
print(f" Running task after cancel: {app.running_tasks['text_task']}") |
|
print(f" Active job after cancel: {app.active_jobs['text_task']}") |
|
print(f" Active tasks count after cancel: {app.job_manager.dashboard_state['active_tasks']}") |
|
|
|
|
|
mock_task.cancel.assert_called_once() |
|
|
|
|
|
print("\n4️⃣ Testing job completion tracking...") |
|
|
|
|
|
history = app.job_manager.get_jobs_history() |
|
print(f" Jobs in history: {len(history)}") |
|
if history: |
|
latest_job = history[-1] |
|
print(f" Latest job status: {latest_job[2]}") |
|
|
|
|
|
print("\n5️⃣ Testing dashboard metrics...") |
|
|
|
metrics = app.job_manager.get_dashboard_metrics() |
|
queue_stats = app.job_manager.get_processing_queue() |
|
|
|
print(f" Dashboard metrics: {metrics}") |
|
print(f" Queue statistics: {queue_stats}") |
|
|
|
print("\n✅ All cancellation mechanism tests passed!") |
|
pass |
|
|
|
except Exception as e: |
|
print(f"\n❌ Test failed with error: {e}") |
|
import traceback |
|
traceback.print_exc() |
|
assert False, f"Test failed with error: {e}" |
|
|
|
def test_task_queue_management(): |
|
"""Test task queue management functionality""" |
|
print("\n🔄 Testing Task Queue Management") |
|
print("=" * 40) |
|
|
|
try: |
|
import app |
|
|
|
|
|
print(f"Text task queue: {app.task_queues['text_task']}") |
|
print(f"File task queue: {app.task_queues['file_task']}") |
|
print(f"DICOM task queue: {app.task_queues['dicom_task']}") |
|
|
|
|
|
app.task_queues["text_task"] = ["task1", "task2", "task3"] |
|
print(f"Added mock tasks to text queue: {len(app.task_queues['text_task'])}") |
|
|
|
|
|
app.cancel_current_task("text_task") |
|
print(f"Queue after cancellation: {len(app.task_queues['text_task'])}") |
|
|
|
print("✅ Task queue management tests passed!") |
|
pass |
|
|
|
except Exception as e: |
|
print(f"❌ Task queue test failed: {e}") |
|
assert False, f"Task queue test failed: {e}" |
|
|
|
if __name__ == "__main__": |
|
print("🔥 FhirFlame Cancellation Mechanism Test Suite") |
|
print("Testing enhanced job cancellation and task management...") |
|
|
|
|
|
test1_passed = test_cancellation_mechanism() |
|
test2_passed = test_task_queue_management() |
|
|
|
if test1_passed and test2_passed: |
|
print("\n🎉 All tests passed! Cancellation mechanism is working correctly.") |
|
sys.exit(0) |
|
else: |
|
print("\n❌ Some tests failed. Please check the implementation.") |
|
sys.exit(1) |