File size: 5,496 Bytes
ef88fd2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
#!/usr/bin/env python3
"""
Script to start multiple GPU workers for the neural OS demo.
Usage: python start_workers.py --num-gpus 4
"""

import argparse
import subprocess
import time
import sys
import signal
import os
from typing import List

class WorkerManager:
    def __init__(self, num_gpus: int, dispatcher_url: str = "http://localhost:8000"):
        self.num_gpus = num_gpus
        self.dispatcher_url = dispatcher_url
        self.processes: List[subprocess.Popen] = []
        
    def start_workers(self):
        """Start all worker processes"""
        print(f"Starting {self.num_gpus} GPU workers...")
        
        for gpu_id in range(self.num_gpus):
            try:
                port = 8001 + gpu_id
                print(f"Starting worker for GPU {gpu_id} on port {port}...")
                
                # Start worker process
                cmd = [
                    sys.executable, "worker.py",
                    "--gpu-id", str(gpu_id),
                    "--dispatcher-url", self.dispatcher_url
                ]
                
                process = subprocess.Popen(
                    cmd,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.STDOUT,
                    universal_newlines=True,
                    bufsize=1
                )
                
                self.processes.append(process)
                print(f"✓ Started worker {gpu_id} (PID: {process.pid})")
                
                # Small delay between starts
                time.sleep(1)
                
            except Exception as e:
                print(f"✗ Failed to start worker for GPU {gpu_id}: {e}")
                self.cleanup()
                return False
        
        print(f"\n✓ All {self.num_gpus} workers started successfully!")
        print("Workers are running on ports:", [8001 + i for i in range(self.num_gpus)])
        return True
    
    def monitor_workers(self):
        """Monitor worker processes and print their output"""
        print("\nMonitoring workers (Ctrl+C to stop)...")
        print("-" * 50)
        
        try:
            while True:
                # Check if any process has died
                for i, process in enumerate(self.processes):
                    if process.poll() is not None:
                        print(f"⚠️  Worker {i} (PID: {process.pid}) has died!")
                        # Optionally restart it
                        
                # Print output from processes
                for i, process in enumerate(self.processes):
                    if process.stdout and process.stdout.readable():
                        try:
                            line = process.stdout.readline()
                            if line:
                                print(f"[GPU {i}] {line.strip()}")
                        except:
                            pass
                
                time.sleep(0.1)
                
        except KeyboardInterrupt:
            print("\n\nReceived interrupt signal, shutting down workers...")
            self.cleanup()
    
    def cleanup(self):
        """Clean up all worker processes"""
        print("Stopping all workers...")
        
        for i, process in enumerate(self.processes):
            if process.poll() is None:  # Process is still running
                print(f"Stopping worker {i} (PID: {process.pid})...")
                try:
                    process.terminate()
                    # Wait for graceful shutdown
                    process.wait(timeout=5)
                    print(f"✓ Worker {i} stopped gracefully")
                except subprocess.TimeoutExpired:
                    print(f"⚠️  Force killing worker {i}...")
                    process.kill()
                    process.wait()
                except Exception as e:
                    print(f"Error stopping worker {i}: {e}")
        
        print("✓ All workers stopped")

def main():
    parser = argparse.ArgumentParser(description="Start multiple GPU workers")
    parser.add_argument("--num-gpus", type=int, required=True, 
                       help="Number of GPU workers to start")
    parser.add_argument("--dispatcher-url", type=str, default="http://localhost:8000",
                       help="URL of the dispatcher service")
    parser.add_argument("--no-monitor", action="store_true",
                       help="Start workers but don't monitor them")
    
    args = parser.parse_args()
    
    if args.num_gpus < 1:
        print("Error: Number of GPUs must be at least 1")
        sys.exit(1)
    
    # Check if worker.py exists
    if not os.path.exists("worker.py"):
        print("Error: worker.py not found in current directory")
        sys.exit(1)
    
    manager = WorkerManager(args.num_gpus, args.dispatcher_url)
    
    # Set up signal handlers for clean shutdown
    def signal_handler(sig, frame):
        print(f"\nReceived signal {sig}, shutting down...")
        manager.cleanup()
        sys.exit(0)
    
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)
    
    # Start workers
    if not manager.start_workers():
        sys.exit(1)
    
    if not args.no_monitor:
        manager.monitor_workers()
    else:
        print("Workers started. Use 'ps aux | grep worker.py' to check status.")
        print("To stop workers, use: pkill -f 'python.*worker.py'")

if __name__ == "__main__":
    main()