File size: 6,973 Bytes
ef88fd2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
64a144d
ef88fd2
 
 
 
 
 
 
 
 
 
 
 
 
92199b3
 
ef88fd2
 
92199b3
ef88fd2
 
 
2ff6d31
 
 
 
 
92199b3
 
 
 
 
ef88fd2
 
2ff6d31
ef88fd2
 
92199b3
 
ef88fd2
 
 
92199b3
ef88fd2
 
 
 
 
92199b3
ef88fd2
 
 
 
92199b3
2ff6d31
92199b3
ef88fd2
 
 
 
 
 
 
2ff6d31
 
 
 
 
ef88fd2
 
 
 
 
 
 
2ff6d31
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ef88fd2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2ff6d31
 
 
 
 
 
 
ef88fd2
 
 
 
 
 
 
64a144d
ef88fd2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
#!/usr/bin/env python3
"""
Script to start multiple GPU workers for the neural OS demo.
Usage: python start_workers.py --num-gpus 4
"""

import argparse
import subprocess
import time
import sys
import signal
import os
from typing import List

class WorkerManager:
    def __init__(self, num_gpus: int, dispatcher_url: str = "http://localhost:7860"):
        self.num_gpus = num_gpus
        self.dispatcher_url = dispatcher_url
        self.processes: List[subprocess.Popen] = []
        
    def start_workers(self):
        """Start all worker processes"""
        print(f"Starting {self.num_gpus} GPU workers...")
        
        for gpu_id in range(self.num_gpus):
            try:
                port = 8001 + gpu_id
                print(f"Starting worker for GPU {gpu_id} on port {port}...")
                
                # Start worker process with GPU isolation
                worker_address = f"localhost:{port}"
                cmd = [
                    sys.executable, "worker.py",
                    "--worker-address", worker_address,
                    "--dispatcher-url", self.dispatcher_url
                ]
                
                # Create log file for this worker
                log_file = f"worker_gpu_{gpu_id}.log"
                with open(log_file, 'w') as f:
                    f.write(f"Starting worker for GPU {gpu_id}\n")
                
                # Set environment variables for GPU isolation
                env = os.environ.copy()
                env['CUDA_VISIBLE_DEVICES'] = str(gpu_id)  # Only show this GPU to the worker
                env['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID'  # Consistent GPU ordering
                
                process = subprocess.Popen(
                    cmd,
                    stdout=open(log_file, 'a'),
                    stderr=subprocess.STDOUT,
                    universal_newlines=True,
                    bufsize=1,
                    env=env  # Pass the modified environment
                )
                
                self.processes.append(process)
                print(f"✓ Started worker {worker_address} (PID: {process.pid}) - Log: {log_file}")
                
                # Small delay between starts
                time.sleep(1)
                
            except Exception as e:
                print(f"✗ Failed to start worker {worker_address}: {e}")
                self.cleanup()
                return False
        
        print(f"\n✓ All {self.num_gpus} workers started successfully!")
        print("Worker addresses:")
        for i in range(self.num_gpus):
            print(f"  localhost:{8001 + i} - log: worker_gpu_{i}.log")
        return True
    
    def monitor_workers(self):
        """Monitor worker processes and print their output"""
        print("\nMonitoring workers (Ctrl+C to stop)...")
        print("-" * 50)
        
        # Keep track of file positions for each log file
        log_positions = {}
        for i in range(self.num_gpus):
            log_positions[i] = 0
        
        try:
            while True:
                # Check if any process has died
                for i, process in enumerate(self.processes):
                    if process.poll() is not None:
                        print(f"⚠️  Worker {i} (PID: {process.pid}) has died!")
                        # Optionally restart it
                
                # Read new lines from log files
                for i in range(self.num_gpus):
                    log_file = f"worker_gpu_{i}.log"
                    try:
                        if os.path.exists(log_file):
                            with open(log_file, 'r') as f:
                                f.seek(log_positions[i])
                                new_lines = f.readlines()
                                log_positions[i] = f.tell()
                                
                                for line in new_lines:
                                    print(f"[GPU {i}] {line.strip()}")
                    except Exception as e:
                        # File might be locked or not exist yet
                        pass
                
                time.sleep(0.1)
                
        except KeyboardInterrupt:
            print("\n\nReceived interrupt signal, shutting down workers...")
            self.cleanup()
    
    def cleanup(self):
        """Clean up all worker processes"""
        print("Stopping all workers...")
        
        for i, process in enumerate(self.processes):
            if process.poll() is None:  # Process is still running
                print(f"Stopping worker {i} (PID: {process.pid})...")
                try:
                    process.terminate()
                    # Wait for graceful shutdown
                    process.wait(timeout=5)
                    print(f"✓ Worker {i} stopped gracefully")
                except subprocess.TimeoutExpired:
                    print(f"⚠️  Force killing worker {i}...")
                    process.kill()
                    process.wait()
                except Exception as e:
                    print(f"Error stopping worker {i}: {e}")
            
            # Close stdout file handle if it's still open
            try:
                if hasattr(process, 'stdout') and process.stdout:
                    process.stdout.close()
            except:
                pass
        
        print("✓ All workers stopped")

def main():
    parser = argparse.ArgumentParser(description="Start multiple GPU workers")
    parser.add_argument("--num-gpus", type=int, required=True, 
                       help="Number of GPU workers to start")
    parser.add_argument("--dispatcher-url", type=str, default="http://localhost:7860",
                       help="URL of the dispatcher service")
    parser.add_argument("--no-monitor", action="store_true",
                       help="Start workers but don't monitor them")
    
    args = parser.parse_args()
    
    if args.num_gpus < 1:
        print("Error: Number of GPUs must be at least 1")
        sys.exit(1)
    
    # Check if worker.py exists
    if not os.path.exists("worker.py"):
        print("Error: worker.py not found in current directory")
        sys.exit(1)
    
    manager = WorkerManager(args.num_gpus, args.dispatcher_url)
    
    # Set up signal handlers for clean shutdown
    def signal_handler(sig, frame):
        print(f"\nReceived signal {sig}, shutting down...")
        manager.cleanup()
        sys.exit(0)
    
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)
    
    # Start workers
    if not manager.start_workers():
        sys.exit(1)
    
    if not args.no_monitor:
        manager.monitor_workers()
    else:
        print("Workers started. Use 'ps aux | grep worker.py' to check status.")
        print("To stop workers, use: pkill -f 'python.*worker.py'")

if __name__ == "__main__":
    main()