da03 commited on
Commit
2ff6d31
·
1 Parent(s): c74f490
Files changed (2) hide show
  1. start_system.sh +4 -1
  2. start_workers.py +38 -12
start_system.sh CHANGED
@@ -131,7 +131,10 @@ done
131
  echo ""
132
  echo "📋 Log files:"
133
  echo " Dispatcher: dispatcher.log"
134
- echo " Workers: workers.log"
 
 
 
135
  echo ""
136
  echo "Press Ctrl+C to stop the system"
137
  echo "================================"
 
131
  echo ""
132
  echo "📋 Log files:"
133
  echo " Dispatcher: dispatcher.log"
134
+ echo " Workers summary: workers.log"
135
+ for ((i=0; i<NUM_GPUS; i++)); do
136
+ echo " GPU $i worker: worker_gpu_$i.log"
137
+ done
138
  echo ""
139
  echo "Press Ctrl+C to stop the system"
140
  echo "================================"
start_workers.py CHANGED
@@ -34,16 +34,21 @@ class WorkerManager:
34
  "--dispatcher-url", self.dispatcher_url
35
  ]
36
 
 
 
 
 
 
37
  process = subprocess.Popen(
38
  cmd,
39
- stdout=subprocess.PIPE,
40
  stderr=subprocess.STDOUT,
41
  universal_newlines=True,
42
  bufsize=1
43
  )
44
 
45
  self.processes.append(process)
46
- print(f"✓ Started worker {gpu_id} (PID: {process.pid})")
47
 
48
  # Small delay between starts
49
  time.sleep(1)
@@ -55,6 +60,9 @@ class WorkerManager:
55
 
56
  print(f"\n✓ All {self.num_gpus} workers started successfully!")
57
  print("Workers are running on ports:", [8001 + i for i in range(self.num_gpus)])
 
 
 
58
  return True
59
 
60
  def monitor_workers(self):
@@ -62,6 +70,11 @@ class WorkerManager:
62
  print("\nMonitoring workers (Ctrl+C to stop)...")
63
  print("-" * 50)
64
 
 
 
 
 
 
65
  try:
66
  while True:
67
  # Check if any process has died
@@ -69,16 +82,22 @@ class WorkerManager:
69
  if process.poll() is not None:
70
  print(f"⚠️ Worker {i} (PID: {process.pid}) has died!")
71
  # Optionally restart it
72
-
73
- # Print output from processes
74
- for i, process in enumerate(self.processes):
75
- if process.stdout and process.stdout.readable():
76
- try:
77
- line = process.stdout.readline()
78
- if line:
79
- print(f"[GPU {i}] {line.strip()}")
80
- except:
81
- pass
 
 
 
 
 
 
82
 
83
  time.sleep(0.1)
84
 
@@ -104,6 +123,13 @@ class WorkerManager:
104
  process.wait()
105
  except Exception as e:
106
  print(f"Error stopping worker {i}: {e}")
 
 
 
 
 
 
 
107
 
108
  print("✓ All workers stopped")
109
 
 
34
  "--dispatcher-url", self.dispatcher_url
35
  ]
36
 
37
+ # Create log file for this worker
38
+ log_file = f"worker_gpu_{gpu_id}.log"
39
+ with open(log_file, 'w') as f:
40
+ f.write(f"Starting worker for GPU {gpu_id}\n")
41
+
42
  process = subprocess.Popen(
43
  cmd,
44
+ stdout=open(log_file, 'a'),
45
  stderr=subprocess.STDOUT,
46
  universal_newlines=True,
47
  bufsize=1
48
  )
49
 
50
  self.processes.append(process)
51
+ print(f"✓ Started worker {gpu_id} (PID: {process.pid}) - Log: {log_file}")
52
 
53
  # Small delay between starts
54
  time.sleep(1)
 
60
 
61
  print(f"\n✓ All {self.num_gpus} workers started successfully!")
62
  print("Workers are running on ports:", [8001 + i for i in range(self.num_gpus)])
63
+ print("Worker log files:")
64
+ for i in range(self.num_gpus):
65
+ print(f" GPU {i}: worker_gpu_{i}.log")
66
  return True
67
 
68
  def monitor_workers(self):
 
70
  print("\nMonitoring workers (Ctrl+C to stop)...")
71
  print("-" * 50)
72
 
73
+ # Keep track of file positions for each log file
74
+ log_positions = {}
75
+ for i in range(self.num_gpus):
76
+ log_positions[i] = 0
77
+
78
  try:
79
  while True:
80
  # Check if any process has died
 
82
  if process.poll() is not None:
83
  print(f"⚠️ Worker {i} (PID: {process.pid}) has died!")
84
  # Optionally restart it
85
+
86
+ # Read new lines from log files
87
+ for i in range(self.num_gpus):
88
+ log_file = f"worker_gpu_{i}.log"
89
+ try:
90
+ if os.path.exists(log_file):
91
+ with open(log_file, 'r') as f:
92
+ f.seek(log_positions[i])
93
+ new_lines = f.readlines()
94
+ log_positions[i] = f.tell()
95
+
96
+ for line in new_lines:
97
+ print(f"[GPU {i}] {line.strip()}")
98
+ except Exception as e:
99
+ # File might be locked or not exist yet
100
+ pass
101
 
102
  time.sleep(0.1)
103
 
 
123
  process.wait()
124
  except Exception as e:
125
  print(f"Error stopping worker {i}: {e}")
126
+
127
+ # Close stdout file handle if it's still open
128
+ try:
129
+ if hasattr(process, 'stdout') and process.stdout:
130
+ process.stdout.close()
131
+ except:
132
+ pass
133
 
134
  print("✓ All workers stopped")
135