da03 commited on
Commit
fa82766
Β·
1 Parent(s): 941cf55
Files changed (4) hide show
  1. analyze_analytics.py +241 -0
  2. dispatcher.py +140 -6
  3. start_system.sh +17 -5
  4. tail_workers.py +73 -0
analyze_analytics.py ADDED
@@ -0,0 +1,241 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ """
3
+ Analytics Analysis Tool for Neural OS Multi-GPU System
4
+
5
+ This script analyzes the structured analytics logs to generate reports and insights.
6
+ Usage: python analyze_analytics.py [--since HOURS] [--type TYPE]
7
+ """
8
+
9
+ import json
10
+ import argparse
11
+ import glob
12
+ import time
13
+ from collections import defaultdict, Counter
14
+ from datetime import datetime, timedelta
15
+ import statistics
16
+
17
+ class AnalyticsAnalyzer:
18
+ def __init__(self, since_hours=24):
19
+ self.since_timestamp = time.time() - (since_hours * 3600)
20
+ self.data = {
21
+ 'gpu_metrics': [],
22
+ 'connection_events': [],
23
+ 'queue_metrics': [],
24
+ 'ip_stats': []
25
+ }
26
+ self.load_data()
27
+
28
+ def load_data(self):
29
+ """Load all analytics data files"""
30
+ file_types = {
31
+ 'gpu_metrics': 'gpu_metrics_*.jsonl',
32
+ 'connection_events': 'connection_events_*.jsonl',
33
+ 'queue_metrics': 'queue_metrics_*.jsonl',
34
+ 'ip_stats': 'ip_stats_*.jsonl'
35
+ }
36
+
37
+ for data_type, pattern in file_types.items():
38
+ files = glob.glob(pattern)
39
+ for file_path in files:
40
+ try:
41
+ with open(file_path, 'r') as f:
42
+ for line in f:
43
+ try:
44
+ record = json.loads(line.strip())
45
+ if record.get('type') != 'metadata' and record.get('timestamp', 0) >= self.since_timestamp:
46
+ self.data[data_type].append(record)
47
+ except json.JSONDecodeError:
48
+ continue
49
+ except FileNotFoundError:
50
+ continue
51
+
52
+ print(f"Loaded data from the last {(time.time() - self.since_timestamp) / 3600:.1f} hours:")
53
+ for data_type, records in self.data.items():
54
+ print(f" {data_type}: {len(records)} records")
55
+ print()
56
+
57
+ def analyze_gpu_utilization(self):
58
+ """Analyze GPU utilization patterns"""
59
+ print("πŸ–₯️ GPU UTILIZATION ANALYSIS")
60
+ print("=" * 40)
61
+
62
+ gpu_records = [r for r in self.data['gpu_metrics'] if r.get('type') == 'gpu_status']
63
+ if not gpu_records:
64
+ print("No GPU utilization data found.")
65
+ return
66
+
67
+ utilizations = [r['utilization_percent'] for r in gpu_records]
68
+ total_gpus = gpu_records[-1].get('total_gpus', 0)
69
+
70
+ print(f"Total GPUs: {total_gpus}")
71
+ print(f"Average utilization: {statistics.mean(utilizations):.1f}%")
72
+ print(f"Peak utilization: {max(utilizations):.1f}%")
73
+ print(f"Minimum utilization: {min(utilizations):.1f}%")
74
+ print(f"Utilization std dev: {statistics.stdev(utilizations) if len(utilizations) > 1 else 0:.1f}%")
75
+
76
+ # Utilization distribution
77
+ high_util = sum(1 for u in utilizations if u >= 80)
78
+ med_util = sum(1 for u in utilizations if 40 <= u < 80)
79
+ low_util = sum(1 for u in utilizations if u < 40)
80
+
81
+ print(f"\nUtilization distribution:")
82
+ print(f" High (β‰₯80%): {high_util} samples ({high_util/len(utilizations)*100:.1f}%)")
83
+ print(f" Medium (40-79%): {med_util} samples ({med_util/len(utilizations)*100:.1f}%)")
84
+ print(f" Low (<40%): {low_util} samples ({low_util/len(utilizations)*100:.1f}%)")
85
+ print()
86
+
87
+ def analyze_connections(self):
88
+ """Analyze connection patterns"""
89
+ print("πŸ”— CONNECTION ANALYSIS")
90
+ print("=" * 40)
91
+
92
+ opens = [r for r in self.data['connection_events'] if r.get('type') == 'connection_open']
93
+ closes = [r for r in self.data['connection_events'] if r.get('type') == 'connection_close']
94
+
95
+ if not opens and not closes:
96
+ print("No connection data found.")
97
+ return
98
+
99
+ print(f"Total connections opened: {len(opens)}")
100
+ print(f"Total connections closed: {len(closes)}")
101
+
102
+ if closes:
103
+ durations = [r['duration'] for r in closes]
104
+ interactions = [r['interactions'] for r in closes]
105
+ reasons = [r['reason'] for r in closes]
106
+
107
+ print(f"\nSession durations:")
108
+ print(f" Average: {statistics.mean(durations):.1f}s")
109
+ print(f" Median: {statistics.median(durations):.1f}s")
110
+ print(f" Max: {max(durations):.1f}s")
111
+ print(f" Min: {min(durations):.1f}s")
112
+
113
+ print(f"\nInteractions per session:")
114
+ print(f" Average: {statistics.mean(interactions):.1f}")
115
+ print(f" Median: {statistics.median(interactions):.1f}")
116
+ print(f" Max: {max(interactions)}")
117
+
118
+ print(f"\nSession end reasons:")
119
+ reason_counts = Counter(reasons)
120
+ for reason, count in reason_counts.most_common():
121
+ print(f" {reason}: {count} ({count/len(closes)*100:.1f}%)")
122
+ print()
123
+
124
+ def analyze_queue_performance(self):
125
+ """Analyze queue performance"""
126
+ print("πŸ“ QUEUE PERFORMANCE ANALYSIS")
127
+ print("=" * 40)
128
+
129
+ bypasses = [r for r in self.data['queue_metrics'] if r.get('type') == 'queue_bypass']
130
+ waits = [r for r in self.data['queue_metrics'] if r.get('type') == 'queue_wait']
131
+ statuses = [r for r in self.data['queue_metrics'] if r.get('type') == 'queue_status']
132
+
133
+ total_users = len(bypasses) + len(waits)
134
+ if total_users == 0:
135
+ print("No queue data found.")
136
+ return
137
+
138
+ print(f"Total users processed: {total_users}")
139
+ print(f"Users bypassed queue: {len(bypasses)} ({len(bypasses)/total_users*100:.1f}%)")
140
+ print(f"Users waited in queue: {len(waits)} ({len(waits)/total_users*100:.1f}%)")
141
+
142
+ if waits:
143
+ wait_times = [r['wait_time'] for r in waits]
144
+ positions = [r['queue_position'] for r in waits]
145
+
146
+ print(f"\nWait time statistics:")
147
+ print(f" Average wait: {statistics.mean(wait_times):.1f}s")
148
+ print(f" Median wait: {statistics.median(wait_times):.1f}s")
149
+ print(f" Max wait: {max(wait_times):.1f}s")
150
+ print(f" Average queue position: {statistics.mean(positions):.1f}")
151
+
152
+ if statuses:
153
+ queue_sizes = [r['queue_size'] for r in statuses]
154
+ estimated_waits = [r['estimated_wait'] for r in statuses if r['queue_size'] > 0]
155
+
156
+ print(f"\nQueue size statistics:")
157
+ print(f" Average queue size: {statistics.mean(queue_sizes):.1f}")
158
+ print(f" Max queue size: {max(queue_sizes)}")
159
+
160
+ if estimated_waits:
161
+ print(f" Average estimated wait: {statistics.mean(estimated_waits):.1f}s")
162
+ print()
163
+
164
+ def analyze_ip_usage(self):
165
+ """Analyze IP address usage patterns"""
166
+ print("🌍 IP USAGE ANALYSIS")
167
+ print("=" * 40)
168
+
169
+ ip_records = self.data['ip_stats']
170
+ if not ip_records:
171
+ print("No IP usage data found.")
172
+ return
173
+
174
+ # Get latest connection counts per IP
175
+ latest_ip_data = {}
176
+ for record in ip_records:
177
+ if record.get('type') == 'ip_update':
178
+ ip = record['ip_address']
179
+ latest_ip_data[ip] = record['connection_count']
180
+
181
+ if not latest_ip_data:
182
+ print("No IP connection data found.")
183
+ return
184
+
185
+ total_connections = sum(latest_ip_data.values())
186
+ unique_ips = len(latest_ip_data)
187
+
188
+ print(f"Total unique IP addresses: {unique_ips}")
189
+ print(f"Total connections: {total_connections}")
190
+ print(f"Average connections per IP: {total_connections/unique_ips:.1f}")
191
+
192
+ print(f"\nTop IP addresses by connection count:")
193
+ sorted_ips = sorted(latest_ip_data.items(), key=lambda x: x[1], reverse=True)
194
+ for i, (ip, count) in enumerate(sorted_ips[:10], 1):
195
+ percentage = count / total_connections * 100
196
+ print(f" {i:2d}. {ip}: {count} connections ({percentage:.1f}%)")
197
+ print()
198
+
199
+ def generate_summary_report(self):
200
+ """Generate a comprehensive summary report"""
201
+ print("πŸ“Š SYSTEM SUMMARY REPORT")
202
+ print("=" * 50)
203
+
204
+ # Time range
205
+ start_time = datetime.fromtimestamp(self.since_timestamp)
206
+ end_time = datetime.now()
207
+ duration_hours = (end_time.timestamp() - self.since_timestamp) / 3600
208
+
209
+ print(f"Report period: {start_time.strftime('%Y-%m-%d %H:%M:%S')} to {end_time.strftime('%Y-%m-%d %H:%M:%S')}")
210
+ print(f"Duration: {duration_hours:.1f} hours")
211
+ print()
212
+
213
+ self.analyze_gpu_utilization()
214
+ self.analyze_connections()
215
+ self.analyze_queue_performance()
216
+ self.analyze_ip_usage()
217
+
218
+ def main():
219
+ parser = argparse.ArgumentParser(description='Analyze Neural OS analytics data')
220
+ parser.add_argument('--since', type=float, default=24,
221
+ help='Analyze data from the last N hours (default: 24)')
222
+ parser.add_argument('--type', choices=['gpu', 'connections', 'queue', 'ip', 'summary'],
223
+ default='summary', help='Type of analysis to perform')
224
+
225
+ args = parser.parse_args()
226
+
227
+ analyzer = AnalyticsAnalyzer(since_hours=args.since)
228
+
229
+ if args.type == 'gpu':
230
+ analyzer.analyze_gpu_utilization()
231
+ elif args.type == 'connections':
232
+ analyzer.analyze_connections()
233
+ elif args.type == 'queue':
234
+ analyzer.analyze_queue_performance()
235
+ elif args.type == 'ip':
236
+ analyzer.analyze_ip_usage()
237
+ else:
238
+ analyzer.generate_summary_report()
239
+
240
+ if __name__ == '__main__':
241
+ main()
dispatcher.py CHANGED
@@ -32,28 +32,75 @@ class SystemAnalytics:
32
  self.users_waited_in_queue = 0 # Users who had to wait
33
  self.gpu_utilization_samples = deque(maxlen=100) # GPU utilization over time
34
  self.queue_size_samples = deque(maxlen=100) # Queue size over time
 
 
35
  self.log_file = None
36
- self._init_log_file()
 
 
 
 
37
 
38
- def _init_log_file(self):
39
- """Initialize the system log file"""
40
  timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
41
- log_filename = f"system_analytics_{timestamp}.log"
42
- self.log_file = log_filename
 
43
  self._write_log("="*80)
44
  self._write_log("NEURAL OS MULTI-GPU SYSTEM ANALYTICS")
45
  self._write_log("="*80)
46
  self._write_log(f"System started at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
47
  self._write_log("")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
48
 
49
  def _write_log(self, message):
50
- """Write message to log file and console"""
51
  timestamp = datetime.now().strftime("%H:%M:%S")
52
  log_message = f"[{timestamp}] {message}"
53
  print(log_message)
54
  with open(self.log_file, "a") as f:
55
  f.write(log_message + "\n")
56
 
 
 
 
 
 
57
  def log_new_connection(self, client_id: str, ip: str):
58
  """Log new connection"""
59
  self.total_connections += 1
@@ -61,8 +108,30 @@ class SystemAnalytics:
61
  self.ip_addresses[ip] += 1
62
 
63
  unique_ips = len(self.ip_addresses)
 
 
 
64
  self._write_log(f"πŸ”— NEW CONNECTION: {client_id} from {ip}")
65
  self._write_log(f" πŸ“Š Total connections: {self.total_connections} | Active: {self.active_connections} | Unique IPs: {unique_ips}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
66
 
67
  def log_connection_closed(self, client_id: str, duration: float, interactions: int, reason: str = "normal"):
68
  """Log connection closed"""
@@ -71,17 +140,44 @@ class SystemAnalytics:
71
  self.session_durations.append(duration)
72
 
73
  avg_duration = sum(self.session_durations) / len(self.session_durations) if self.session_durations else 0
 
74
 
 
75
  self._write_log(f"πŸšͺ CONNECTION CLOSED: {client_id}")
76
  self._write_log(f" ⏱️ Duration: {duration:.1f}s | Interactions: {interactions} | Reason: {reason}")
77
  self._write_log(f" πŸ“Š Active connections: {self.active_connections} | Avg session duration: {avg_duration:.1f}s")
 
 
 
 
 
 
 
 
 
 
 
 
78
 
79
  def log_queue_bypass(self, client_id: str):
80
  """Log when user bypasses queue (gets GPU immediately)"""
81
  self.users_bypassed_queue += 1
82
  bypass_rate = (self.users_bypassed_queue / self.total_connections) * 100 if self.total_connections > 0 else 0
 
 
 
83
  self._write_log(f"⚑ QUEUE BYPASS: {client_id} got GPU immediately")
84
  self._write_log(f" πŸ“Š Bypass rate: {bypass_rate:.1f}% ({self.users_bypassed_queue}/{self.total_connections})")
 
 
 
 
 
 
 
 
 
 
85
 
86
  def log_queue_wait(self, client_id: str, wait_time: float, queue_position: int):
87
  """Log when user had to wait in queue"""
@@ -90,9 +186,23 @@ class SystemAnalytics:
90
 
91
  avg_wait = sum(self.waiting_times) / len(self.waiting_times) if self.waiting_times else 0
92
  wait_rate = (self.users_waited_in_queue / self.total_connections) * 100 if self.total_connections > 0 else 0
 
93
 
 
94
  self._write_log(f"⏳ QUEUE WAIT: {client_id} waited {wait_time:.1f}s (was #{queue_position})")
95
  self._write_log(f" πŸ“Š Wait rate: {wait_rate:.1f}% | Avg wait time: {avg_wait:.1f}s")
 
 
 
 
 
 
 
 
 
 
 
 
96
 
97
  def log_gpu_status(self, total_gpus: int, active_gpus: int, available_gpus: int):
98
  """Log GPU utilization"""
@@ -100,9 +210,22 @@ class SystemAnalytics:
100
  self.gpu_utilization_samples.append(utilization)
101
 
102
  avg_utilization = sum(self.gpu_utilization_samples) / len(self.gpu_utilization_samples) if self.gpu_utilization_samples else 0
 
103
 
 
104
  self._write_log(f"πŸ–₯️ GPU STATUS: {active_gpus}/{total_gpus} in use ({utilization:.1f}% utilization)")
105
  self._write_log(f" πŸ“Š Available: {available_gpus} | Avg utilization: {avg_utilization:.1f}%")
 
 
 
 
 
 
 
 
 
 
 
106
 
107
  def log_worker_registered(self, worker_id: str, gpu_id: int, endpoint: str):
108
  """Log when a worker registers"""
@@ -122,7 +245,18 @@ class SystemAnalytics:
122
  self.queue_size_samples.append(queue_size)
123
 
124
  avg_queue_size = sum(self.queue_size_samples) / len(self.queue_size_samples) if self.queue_size_samples else 0
 
 
 
 
 
 
 
 
 
 
125
 
 
126
  if queue_size > 0:
127
  self._write_log(f"πŸ“ QUEUE STATUS: {queue_size} users waiting | Est. wait: {estimated_wait:.1f}s")
128
  self._write_log(f" πŸ“Š Avg queue size: {avg_queue_size:.1f}")
 
32
  self.users_waited_in_queue = 0 # Users who had to wait
33
  self.gpu_utilization_samples = deque(maxlen=100) # GPU utilization over time
34
  self.queue_size_samples = deque(maxlen=100) # Queue size over time
35
+
36
+ # File handles for different analytics
37
  self.log_file = None
38
+ self.gpu_metrics_file = None
39
+ self.connection_events_file = None
40
+ self.queue_metrics_file = None
41
+ self.ip_stats_file = None
42
+ self._init_log_files()
43
 
44
+ def _init_log_files(self):
45
+ """Initialize all analytics log files"""
46
  timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
47
+
48
+ # Main human-readable log
49
+ self.log_file = f"system_analytics_{timestamp}.log"
50
  self._write_log("="*80)
51
  self._write_log("NEURAL OS MULTI-GPU SYSTEM ANALYTICS")
52
  self._write_log("="*80)
53
  self._write_log(f"System started at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
54
  self._write_log("")
55
+
56
+ # Structured data files for analysis
57
+ self.gpu_metrics_file = f"gpu_metrics_{timestamp}.jsonl"
58
+ self.connection_events_file = f"connection_events_{timestamp}.jsonl"
59
+ self.queue_metrics_file = f"queue_metrics_{timestamp}.jsonl"
60
+ self.ip_stats_file = f"ip_stats_{timestamp}.jsonl"
61
+
62
+ # Initialize with headers/metadata
63
+ self._write_json_log(self.gpu_metrics_file, {
64
+ "type": "metadata",
65
+ "timestamp": time.time(),
66
+ "description": "GPU utilization metrics",
67
+ "fields": ["timestamp", "total_gpus", "active_gpus", "available_gpus", "utilization_percent"]
68
+ })
69
+
70
+ self._write_json_log(self.connection_events_file, {
71
+ "type": "metadata",
72
+ "timestamp": time.time(),
73
+ "description": "Connection lifecycle events",
74
+ "fields": ["timestamp", "event_type", "client_id", "ip_address", "duration", "interactions", "reason"]
75
+ })
76
+
77
+ self._write_json_log(self.queue_metrics_file, {
78
+ "type": "metadata",
79
+ "timestamp": time.time(),
80
+ "description": "Queue performance metrics",
81
+ "fields": ["timestamp", "queue_size", "estimated_wait", "bypass_rate", "avg_wait_time"]
82
+ })
83
+
84
+ self._write_json_log(self.ip_stats_file, {
85
+ "type": "metadata",
86
+ "timestamp": time.time(),
87
+ "description": "IP address usage statistics",
88
+ "fields": ["timestamp", "ip_address", "connection_count", "total_unique_ips"]
89
+ })
90
 
91
  def _write_log(self, message):
92
+ """Write message to main log file and console"""
93
  timestamp = datetime.now().strftime("%H:%M:%S")
94
  log_message = f"[{timestamp}] {message}"
95
  print(log_message)
96
  with open(self.log_file, "a") as f:
97
  f.write(log_message + "\n")
98
 
99
+ def _write_json_log(self, filename, data):
100
+ """Write structured data to JSON lines file"""
101
+ with open(filename, "a") as f:
102
+ f.write(json.dumps(data) + "\n")
103
+
104
  def log_new_connection(self, client_id: str, ip: str):
105
  """Log new connection"""
106
  self.total_connections += 1
 
108
  self.ip_addresses[ip] += 1
109
 
110
  unique_ips = len(self.ip_addresses)
111
+ timestamp = time.time()
112
+
113
+ # Human-readable log
114
  self._write_log(f"πŸ”— NEW CONNECTION: {client_id} from {ip}")
115
  self._write_log(f" πŸ“Š Total connections: {self.total_connections} | Active: {self.active_connections} | Unique IPs: {unique_ips}")
116
+
117
+ # Structured data logs
118
+ self._write_json_log(self.connection_events_file, {
119
+ "type": "connection_open",
120
+ "timestamp": timestamp,
121
+ "client_id": client_id,
122
+ "ip_address": ip,
123
+ "total_connections": self.total_connections,
124
+ "active_connections": self.active_connections,
125
+ "unique_ips": unique_ips
126
+ })
127
+
128
+ self._write_json_log(self.ip_stats_file, {
129
+ "type": "ip_update",
130
+ "timestamp": timestamp,
131
+ "ip_address": ip,
132
+ "connection_count": self.ip_addresses[ip],
133
+ "total_unique_ips": unique_ips
134
+ })
135
 
136
  def log_connection_closed(self, client_id: str, duration: float, interactions: int, reason: str = "normal"):
137
  """Log connection closed"""
 
140
  self.session_durations.append(duration)
141
 
142
  avg_duration = sum(self.session_durations) / len(self.session_durations) if self.session_durations else 0
143
+ timestamp = time.time()
144
 
145
+ # Human-readable log
146
  self._write_log(f"πŸšͺ CONNECTION CLOSED: {client_id}")
147
  self._write_log(f" ⏱️ Duration: {duration:.1f}s | Interactions: {interactions} | Reason: {reason}")
148
  self._write_log(f" πŸ“Š Active connections: {self.active_connections} | Avg session duration: {avg_duration:.1f}s")
149
+
150
+ # Structured data log
151
+ self._write_json_log(self.connection_events_file, {
152
+ "type": "connection_close",
153
+ "timestamp": timestamp,
154
+ "client_id": client_id,
155
+ "duration": duration,
156
+ "interactions": interactions,
157
+ "reason": reason,
158
+ "active_connections": self.active_connections,
159
+ "avg_session_duration": avg_duration
160
+ })
161
 
162
  def log_queue_bypass(self, client_id: str):
163
  """Log when user bypasses queue (gets GPU immediately)"""
164
  self.users_bypassed_queue += 1
165
  bypass_rate = (self.users_bypassed_queue / self.total_connections) * 100 if self.total_connections > 0 else 0
166
+ timestamp = time.time()
167
+
168
+ # Human-readable log
169
  self._write_log(f"⚑ QUEUE BYPASS: {client_id} got GPU immediately")
170
  self._write_log(f" πŸ“Š Bypass rate: {bypass_rate:.1f}% ({self.users_bypassed_queue}/{self.total_connections})")
171
+
172
+ # Structured data log
173
+ self._write_json_log(self.queue_metrics_file, {
174
+ "type": "queue_bypass",
175
+ "timestamp": timestamp,
176
+ "client_id": client_id,
177
+ "bypass_rate": bypass_rate,
178
+ "users_bypassed": self.users_bypassed_queue,
179
+ "total_connections": self.total_connections
180
+ })
181
 
182
  def log_queue_wait(self, client_id: str, wait_time: float, queue_position: int):
183
  """Log when user had to wait in queue"""
 
186
 
187
  avg_wait = sum(self.waiting_times) / len(self.waiting_times) if self.waiting_times else 0
188
  wait_rate = (self.users_waited_in_queue / self.total_connections) * 100 if self.total_connections > 0 else 0
189
+ timestamp = time.time()
190
 
191
+ # Human-readable log
192
  self._write_log(f"⏳ QUEUE WAIT: {client_id} waited {wait_time:.1f}s (was #{queue_position})")
193
  self._write_log(f" πŸ“Š Wait rate: {wait_rate:.1f}% | Avg wait time: {avg_wait:.1f}s")
194
+
195
+ # Structured data log
196
+ self._write_json_log(self.queue_metrics_file, {
197
+ "type": "queue_wait",
198
+ "timestamp": timestamp,
199
+ "client_id": client_id,
200
+ "wait_time": wait_time,
201
+ "queue_position": queue_position,
202
+ "wait_rate": wait_rate,
203
+ "avg_wait_time": avg_wait,
204
+ "users_waited": self.users_waited_in_queue
205
+ })
206
 
207
  def log_gpu_status(self, total_gpus: int, active_gpus: int, available_gpus: int):
208
  """Log GPU utilization"""
 
210
  self.gpu_utilization_samples.append(utilization)
211
 
212
  avg_utilization = sum(self.gpu_utilization_samples) / len(self.gpu_utilization_samples) if self.gpu_utilization_samples else 0
213
+ timestamp = time.time()
214
 
215
+ # Human-readable log
216
  self._write_log(f"πŸ–₯️ GPU STATUS: {active_gpus}/{total_gpus} in use ({utilization:.1f}% utilization)")
217
  self._write_log(f" πŸ“Š Available: {available_gpus} | Avg utilization: {avg_utilization:.1f}%")
218
+
219
+ # Structured data log
220
+ self._write_json_log(self.gpu_metrics_file, {
221
+ "type": "gpu_status",
222
+ "timestamp": timestamp,
223
+ "total_gpus": total_gpus,
224
+ "active_gpus": active_gpus,
225
+ "available_gpus": available_gpus,
226
+ "utilization_percent": utilization,
227
+ "avg_utilization_percent": avg_utilization
228
+ })
229
 
230
  def log_worker_registered(self, worker_id: str, gpu_id: int, endpoint: str):
231
  """Log when a worker registers"""
 
245
  self.queue_size_samples.append(queue_size)
246
 
247
  avg_queue_size = sum(self.queue_size_samples) / len(self.queue_size_samples) if self.queue_size_samples else 0
248
+ timestamp = time.time()
249
+
250
+ # Always log to structured data for analysis
251
+ self._write_json_log(self.queue_metrics_file, {
252
+ "type": "queue_status",
253
+ "timestamp": timestamp,
254
+ "queue_size": queue_size,
255
+ "estimated_wait": estimated_wait,
256
+ "avg_queue_size": avg_queue_size
257
+ })
258
 
259
+ # Only log to human-readable if there's a queue
260
  if queue_size > 0:
261
  self._write_log(f"πŸ“ QUEUE STATUS: {queue_size} users waiting | Est. wait: {estimated_wait:.1f}s")
262
  self._write_log(f" πŸ“Š Avg queue size: {avg_queue_size:.1f}")
start_system.sh CHANGED
@@ -131,14 +131,26 @@ for ((i=0; i<NUM_GPUS; i++)); do
131
  done
132
  echo ""
133
  echo "πŸ“‹ Log files:"
134
- echo " System analytics: system_analytics_*.log (real-time monitoring)"
135
- echo " Dispatcher: dispatcher.log"
136
- echo " Workers summary: workers.log"
 
 
 
 
137
  for ((i=0; i<NUM_GPUS; i++)); do
138
- echo " GPU $i worker: worker_gpu_$i.log"
139
  done
140
  echo ""
141
- echo "πŸ’‘ Monitor system in real-time: tail -f system_analytics_*.log"
 
 
 
 
 
 
 
 
142
  echo "Press Ctrl+C to stop the system"
143
  echo "================================"
144
 
 
131
  done
132
  echo ""
133
  echo "πŸ“‹ Log files:"
134
+ echo " πŸ“Š Analytics (human-readable): system_analytics_*.log"
135
+ echo " πŸ–₯️ GPU metrics (JSON): gpu_metrics_*.jsonl"
136
+ echo " πŸ”— Connection events (JSON): connection_events_*.jsonl"
137
+ echo " πŸ“ Queue metrics (JSON): queue_metrics_*.jsonl"
138
+ echo " 🌍 IP statistics (JSON): ip_stats_*.jsonl"
139
+ echo " 🎯 Dispatcher: dispatcher.log"
140
+ echo " πŸ”§ Workers summary: workers.log"
141
  for ((i=0; i<NUM_GPUS; i++)); do
142
+ echo " πŸ–₯️ GPU $i worker: worker_gpu_$i.log"
143
  done
144
  echo ""
145
+ echo "πŸ’‘ Real-time monitoring:"
146
+ echo " Human-readable: tail -f system_analytics_*.log"
147
+ echo " GPU utilization: tail -f gpu_metrics_*.jsonl"
148
+ echo " Connection events: tail -f connection_events_*.jsonl"
149
+ echo ""
150
+ echo "πŸ“ˆ Data analysis:"
151
+ echo " Summary report: python analyze_analytics.py"
152
+ echo " Last 6 hours: python analyze_analytics.py --since 6"
153
+ echo " GPU analysis only: python analyze_analytics.py --type gpu"
154
  echo "Press Ctrl+C to stop the system"
155
  echo "================================"
156
 
tail_workers.py ADDED
@@ -0,0 +1,73 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ """
3
+ Script to tail all worker log files simultaneously.
4
+ Usage: python tail_workers.py [--num-gpus N]
5
+ """
6
+
7
+ import argparse
8
+ import os
9
+ import time
10
+ import sys
11
+ from typing import Dict
12
+
13
+ def tail_all_workers(num_gpus: int):
14
+ """Tail all worker log files simultaneously"""
15
+ print(f"Tailing logs for {num_gpus} GPU workers...")
16
+ print("=" * 60)
17
+
18
+ # Keep track of file positions
19
+ log_positions: Dict[int, int] = {}
20
+ for i in range(num_gpus):
21
+ log_positions[i] = 0
22
+
23
+ try:
24
+ while True:
25
+ has_new_output = False
26
+
27
+ for i in range(num_gpus):
28
+ log_file = f"worker_gpu_{i}.log"
29
+
30
+ try:
31
+ if os.path.exists(log_file):
32
+ with open(log_file, 'r') as f:
33
+ f.seek(log_positions[i])
34
+ new_lines = f.readlines()
35
+
36
+ if new_lines:
37
+ has_new_output = True
38
+ for line in new_lines:
39
+ timestamp = time.strftime("%H:%M:%S")
40
+ print(f"[{timestamp}] [GPU {i}] {line.rstrip()}")
41
+
42
+ log_positions[i] = f.tell()
43
+ else:
44
+ # File doesn't exist yet, check if we should show a message
45
+ if log_positions[i] == 0:
46
+ print(f"[INFO] Waiting for {log_file} to be created...")
47
+ log_positions[i] = -1 # Mark as checked
48
+
49
+ except Exception as e:
50
+ print(f"[ERROR] Error reading {log_file}: {e}")
51
+
52
+ # Only sleep if there was no new output to keep it responsive
53
+ if not has_new_output:
54
+ time.sleep(0.1)
55
+
56
+ except KeyboardInterrupt:
57
+ print("\nStopping log monitoring...")
58
+
59
+ def main():
60
+ parser = argparse.ArgumentParser(description="Tail all worker log files")
61
+ parser.add_argument("--num-gpus", type=int, default=2,
62
+ help="Number of GPU workers to monitor (default: 2)")
63
+
64
+ args = parser.parse_args()
65
+
66
+ if args.num_gpus < 1:
67
+ print("Error: Number of GPUs must be at least 1")
68
+ sys.exit(1)
69
+
70
+ tail_all_workers(args.num_gpus)
71
+
72
+ if __name__ == "__main__":
73
+ main()