Spaces:
Runtime error
Runtime error
File size: 5,496 Bytes
ef88fd2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
#!/usr/bin/env python3
"""
Script to start multiple GPU workers for the neural OS demo.
Usage: python start_workers.py --num-gpus 4
"""
import argparse
import subprocess
import time
import sys
import signal
import os
from typing import List
class WorkerManager:
def __init__(self, num_gpus: int, dispatcher_url: str = "http://localhost:8000"):
self.num_gpus = num_gpus
self.dispatcher_url = dispatcher_url
self.processes: List[subprocess.Popen] = []
def start_workers(self):
"""Start all worker processes"""
print(f"Starting {self.num_gpus} GPU workers...")
for gpu_id in range(self.num_gpus):
try:
port = 8001 + gpu_id
print(f"Starting worker for GPU {gpu_id} on port {port}...")
# Start worker process
cmd = [
sys.executable, "worker.py",
"--gpu-id", str(gpu_id),
"--dispatcher-url", self.dispatcher_url
]
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
bufsize=1
)
self.processes.append(process)
print(f"✓ Started worker {gpu_id} (PID: {process.pid})")
# Small delay between starts
time.sleep(1)
except Exception as e:
print(f"✗ Failed to start worker for GPU {gpu_id}: {e}")
self.cleanup()
return False
print(f"\n✓ All {self.num_gpus} workers started successfully!")
print("Workers are running on ports:", [8001 + i for i in range(self.num_gpus)])
return True
def monitor_workers(self):
"""Monitor worker processes and print their output"""
print("\nMonitoring workers (Ctrl+C to stop)...")
print("-" * 50)
try:
while True:
# Check if any process has died
for i, process in enumerate(self.processes):
if process.poll() is not None:
print(f"⚠️ Worker {i} (PID: {process.pid}) has died!")
# Optionally restart it
# Print output from processes
for i, process in enumerate(self.processes):
if process.stdout and process.stdout.readable():
try:
line = process.stdout.readline()
if line:
print(f"[GPU {i}] {line.strip()}")
except:
pass
time.sleep(0.1)
except KeyboardInterrupt:
print("\n\nReceived interrupt signal, shutting down workers...")
self.cleanup()
def cleanup(self):
"""Clean up all worker processes"""
print("Stopping all workers...")
for i, process in enumerate(self.processes):
if process.poll() is None: # Process is still running
print(f"Stopping worker {i} (PID: {process.pid})...")
try:
process.terminate()
# Wait for graceful shutdown
process.wait(timeout=5)
print(f"✓ Worker {i} stopped gracefully")
except subprocess.TimeoutExpired:
print(f"⚠️ Force killing worker {i}...")
process.kill()
process.wait()
except Exception as e:
print(f"Error stopping worker {i}: {e}")
print("✓ All workers stopped")
def main():
parser = argparse.ArgumentParser(description="Start multiple GPU workers")
parser.add_argument("--num-gpus", type=int, required=True,
help="Number of GPU workers to start")
parser.add_argument("--dispatcher-url", type=str, default="http://localhost:8000",
help="URL of the dispatcher service")
parser.add_argument("--no-monitor", action="store_true",
help="Start workers but don't monitor them")
args = parser.parse_args()
if args.num_gpus < 1:
print("Error: Number of GPUs must be at least 1")
sys.exit(1)
# Check if worker.py exists
if not os.path.exists("worker.py"):
print("Error: worker.py not found in current directory")
sys.exit(1)
manager = WorkerManager(args.num_gpus, args.dispatcher_url)
# Set up signal handlers for clean shutdown
def signal_handler(sig, frame):
print(f"\nReceived signal {sig}, shutting down...")
manager.cleanup()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Start workers
if not manager.start_workers():
sys.exit(1)
if not args.no_monitor:
manager.monitor_workers()
else:
print("Workers started. Use 'ps aux | grep worker.py' to check status.")
print("To stop workers, use: pkill -f 'python.*worker.py'")
if __name__ == "__main__":
main() |