File size: 4,977 Bytes
a963d65
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#!/usr/bin/env python3
"""
Test script to verify job cancellation and task management fixes
"""

import sys
import time
import asyncio
from unittest.mock import Mock, patch

# Add the current directory to the path so we can import app
sys.path.insert(0, '.')

def test_cancellation_mechanism():
    """Test the enhanced cancellation mechanism"""
    print("🧪 Testing Job Cancellation and Task Queue Management")
    print("=" * 60)
    
    try:
        # Import the app module
        import app
        
        # Test 1: Basic cancellation flag management
        print("\n1️⃣ Testing basic cancellation flags...")
        
        # Reset flags
        app.cancellation_flags["text_task"] = False
        app.running_tasks["text_task"] = None
        app.active_jobs["text_task"] = None
        
        print(f"   Initial cancellation flag: {app.cancellation_flags['text_task']}")
        print(f"   Initial running task: {app.running_tasks['text_task']}")
        print(f"   Initial active job: {app.active_jobs['text_task']}")
        
        # Test 2: Job manager creation and tracking
        print("\n2️⃣ Testing job creation and tracking...")
        
        # Create a test job
        job_id = app.job_manager.add_processing_job("text", "Test medical text", {"test": True})
        app.active_jobs["text_task"] = job_id
        
        print(f"   Created job ID: {job_id}")
        print(f"   Active tasks count: {app.job_manager.dashboard_state['active_tasks']}")
        print(f"   Active job tracking: {app.active_jobs['text_task']}")
        
        # Test 3: Cancel task functionality
        print("\n3️⃣ Testing cancel_current_task function...")
        
        # Mock a running task
        mock_task = Mock()
        app.running_tasks["text_task"] = mock_task
        
        # Call cancel function
        result = app.cancel_current_task("text_task")
        
        print(f"   Cancel result: {result}")
        print(f"   Cancellation flag after cancel: {app.cancellation_flags['text_task']}")
        print(f"   Running task after cancel: {app.running_tasks['text_task']}")
        print(f"   Active job after cancel: {app.active_jobs['text_task']}")
        print(f"   Active tasks count after cancel: {app.job_manager.dashboard_state['active_tasks']}")
        
        # Verify mock task was cancelled
        mock_task.cancel.assert_called_once()
        
        # Test 4: Job completion tracking
        print("\n4️⃣ Testing job completion tracking...")
        
        # Check job history
        history = app.job_manager.get_jobs_history()
        print(f"   Jobs in history: {len(history)}")
        if history:
            latest_job = history[-1]
            print(f"   Latest job status: {latest_job[2]}")  # Status column
        
        # Test 5: Dashboard metrics
        print("\n5️⃣ Testing dashboard metrics...")
        
        metrics = app.job_manager.get_dashboard_metrics()
        queue_stats = app.job_manager.get_processing_queue()
        
        print(f"   Dashboard metrics: {metrics}")
        print(f"   Queue statistics: {queue_stats}")
        
        print("\n✅ All cancellation mechanism tests passed!")
        pass
        
    except Exception as e:
        print(f"\n❌ Test failed with error: {e}")
        import traceback
        traceback.print_exc()
        assert False, f"Test failed with error: {e}"

def test_task_queue_management():
    """Test task queue management functionality"""
    print("\n🔄 Testing Task Queue Management")
    print("=" * 40)
    
    try:
        import app
        
        # Test queue initialization
        print(f"Text task queue: {app.task_queues['text_task']}")
        print(f"File task queue: {app.task_queues['file_task']}")
        print(f"DICOM task queue: {app.task_queues['dicom_task']}")
        
        # Add some mock tasks to queue
        app.task_queues["text_task"] = ["task1", "task2", "task3"]
        print(f"Added mock tasks to text queue: {len(app.task_queues['text_task'])}")
        
        # Test queue clearing on cancellation
        app.cancel_current_task("text_task")
        print(f"Queue after cancellation: {len(app.task_queues['text_task'])}")
        
        print("✅ Task queue management tests passed!")
        pass
        
    except Exception as e:
        print(f"❌ Task queue test failed: {e}")
        assert False, f"Task queue test failed: {e}"

if __name__ == "__main__":
    print("🔥 FhirFlame Cancellation Mechanism Test Suite")
    print("Testing enhanced job cancellation and task management...")
    
    # Run tests
    test1_passed = test_cancellation_mechanism()
    test2_passed = test_task_queue_management()
    
    if test1_passed and test2_passed:
        print("\n🎉 All tests passed! Cancellation mechanism is working correctly.")
        sys.exit(0)
    else:
        print("\n❌ Some tests failed. Please check the implementation.")
        sys.exit(1)