da03 commited on
Commit
b9e6b75
·
1 Parent(s): fa82766
Files changed (3) hide show
  1. analyze_analytics.py +19 -3
  2. dispatcher.py +81 -58
  3. static/index.html +14 -6
analyze_analytics.py CHANGED
@@ -129,6 +129,7 @@ class AnalyticsAnalyzer:
129
  bypasses = [r for r in self.data['queue_metrics'] if r.get('type') == 'queue_bypass']
130
  waits = [r for r in self.data['queue_metrics'] if r.get('type') == 'queue_wait']
131
  statuses = [r for r in self.data['queue_metrics'] if r.get('type') == 'queue_status']
 
132
 
133
  total_users = len(bypasses) + len(waits)
134
  if total_users == 0:
@@ -151,14 +152,29 @@ class AnalyticsAnalyzer:
151
 
152
  if statuses:
153
  queue_sizes = [r['queue_size'] for r in statuses]
154
- estimated_waits = [r['estimated_wait'] for r in statuses if r['queue_size'] > 0]
 
 
 
 
 
 
 
155
 
156
  print(f"\nQueue size statistics:")
157
  print(f" Average queue size: {statistics.mean(queue_sizes):.1f}")
158
  print(f" Max queue size: {max(queue_sizes)}")
159
 
160
- if estimated_waits:
161
- print(f" Average estimated wait: {statistics.mean(estimated_waits):.1f}s")
 
 
 
 
 
 
 
 
162
  print()
163
 
164
  def analyze_ip_usage(self):
 
129
  bypasses = [r for r in self.data['queue_metrics'] if r.get('type') == 'queue_bypass']
130
  waits = [r for r in self.data['queue_metrics'] if r.get('type') == 'queue_wait']
131
  statuses = [r for r in self.data['queue_metrics'] if r.get('type') == 'queue_status']
132
+ limit_applications = [r for r in self.data['queue_metrics'] if r.get('type') == 'queue_limits_applied']
133
 
134
  total_users = len(bypasses) + len(waits)
135
  if total_users == 0:
 
152
 
153
  if statuses:
154
  queue_sizes = [r['queue_size'] for r in statuses]
155
+ # Handle both old 'estimated_wait' and new 'maximum_wait' fields for backwards compatibility
156
+ maximum_waits = []
157
+ for r in statuses:
158
+ if r['queue_size'] > 0:
159
+ if 'maximum_wait' in r:
160
+ maximum_waits.append(r['maximum_wait'])
161
+ elif 'estimated_wait' in r:
162
+ maximum_waits.append(r['estimated_wait'])
163
 
164
  print(f"\nQueue size statistics:")
165
  print(f" Average queue size: {statistics.mean(queue_sizes):.1f}")
166
  print(f" Max queue size: {max(queue_sizes)}")
167
 
168
+ if maximum_waits:
169
+ print(f" Average maximum wait: {statistics.mean(maximum_waits):.1f}s")
170
+ print(f" Peak maximum wait: {max(maximum_waits):.1f}s")
171
+
172
+ if limit_applications:
173
+ total_affected = sum(r['affected_sessions'] for r in limit_applications)
174
+ print(f"\nQueue limit applications:")
175
+ print(f" Times limits applied to existing sessions: {len(limit_applications)}")
176
+ print(f" Total sessions affected: {total_affected}")
177
+ print(f" Average sessions affected per application: {total_affected/len(limit_applications):.1f}")
178
  print()
179
 
180
  def analyze_ip_usage(self):
dispatcher.py CHANGED
@@ -240,7 +240,24 @@ class SystemAnalytics:
240
  self._write_log(f"⚠️ CRITICAL: No GPU workers available! {queue_size} users waiting")
241
  self._write_log(" Please check worker processes and GPU availability")
242
 
243
- def log_queue_status(self, queue_size: int, estimated_wait: float):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
244
  """Log queue status"""
245
  self.queue_size_samples.append(queue_size)
246
 
@@ -252,13 +269,13 @@ class SystemAnalytics:
252
  "type": "queue_status",
253
  "timestamp": timestamp,
254
  "queue_size": queue_size,
255
- "estimated_wait": estimated_wait,
256
  "avg_queue_size": avg_queue_size
257
  })
258
 
259
  # Only log to human-readable if there's a queue
260
  if queue_size > 0:
261
- self._write_log(f"📝 QUEUE STATUS: {queue_size} users waiting | Est. wait: {estimated_wait:.1f}s")
262
  self._write_log(f" 📊 Avg queue size: {avg_queue_size:.1f}")
263
 
264
  def log_periodic_summary(self):
@@ -371,11 +388,51 @@ class SessionManager:
371
 
372
  async def add_session_to_queue(self, session: UserSession):
373
  """Add a session to the queue"""
 
 
 
374
  self.sessions[session.session_id] = session
375
  self.session_queue.append(session.session_id)
376
  session.status = SessionStatus.QUEUED
377
  session.queue_start_time = time.time()
378
  logger.info(f"Added session {session.session_id} to queue. Queue size: {len(self.session_queue)}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
379
 
380
  async def process_queue(self):
381
  """Process the session queue"""
@@ -571,14 +628,14 @@ class SessionManager:
571
  session = self.sessions.get(session_id)
572
  if session and session.status == SessionStatus.QUEUED:
573
  try:
574
- # Calculate dynamic estimated wait time
575
- estimated_wait = self._calculate_dynamic_wait_time(i + 1)
576
 
577
  await session.websocket.send_json({
578
  "type": "queue_update",
579
  "position": i + 1,
580
  "total_waiting": len(self.session_queue),
581
- "estimated_wait_seconds": estimated_wait,
582
  "active_sessions": len(self.active_sessions),
583
  "available_workers": len([w for w in self.workers.values() if w.is_available])
584
  })
@@ -587,69 +644,35 @@ class SessionManager:
587
 
588
  # Log queue status if there's a queue
589
  if self.session_queue:
590
- estimated_wait = self._calculate_dynamic_wait_time(1)
591
- analytics.log_queue_status(len(self.session_queue), estimated_wait)
592
 
593
- def _calculate_dynamic_wait_time(self, position_in_queue: int) -> float:
594
- """Calculate dynamic estimated wait time based on current session progress"""
595
- current_time = time.time()
596
  available_workers = len([w for w in self.workers.values() if w.is_available])
597
 
598
  # If there are available workers, no wait time
599
  if available_workers > 0:
600
  return 0
601
 
602
- # Calculate remaining time for active sessions
603
- min_remaining_time = float('inf')
604
- active_session_times = []
605
-
606
- for session_id in self.active_sessions:
607
- session = self.sessions.get(session_id)
608
- if session and session.last_activity:
609
- if session.max_session_time:
610
- # Session has time limit (queue exists)
611
- elapsed = current_time - session.last_activity
612
- remaining = session.max_session_time - elapsed
613
- remaining = max(0, remaining) # Don't go negative
614
- else:
615
- # No time limit, estimate based on average usage
616
- elapsed = current_time - session.last_activity
617
- # Assume sessions without time limits will run for average of 45 seconds more
618
- remaining = max(45 - elapsed, 10) # Minimum 10 seconds
619
-
620
- active_session_times.append(remaining)
621
- min_remaining_time = min(min_remaining_time, remaining)
622
-
623
- # If no active sessions found, use default
624
- if not active_session_times:
625
- min_remaining_time = 30.0
626
-
627
- # Calculate estimated wait time based on position
628
  num_workers = len(self.workers)
629
  if num_workers == 0:
630
  return 999 # No workers available
631
 
632
- if position_in_queue <= num_workers:
633
- # User will get a worker as soon as current sessions end
634
- return min_remaining_time
635
- else:
636
- # User needs to wait for multiple session cycles
637
- cycles_to_wait = (position_in_queue - 1) // num_workers
638
- remaining_in_current_cycle = (position_in_queue - 1) % num_workers + 1
639
-
640
- # Time for complete cycles (use average session time)
641
- avg_session_time = self.MAX_SESSION_TIME_WITH_QUEUE if len(self.session_queue) > 0 else 45.0
642
- full_cycles_time = cycles_to_wait * avg_session_time
643
-
644
- # Time for current partial cycle
645
- if remaining_in_current_cycle <= len(active_session_times):
646
- # Sort session times to get when the Nth worker will be free
647
- sorted_times = sorted(active_session_times)
648
- current_cycle_time = sorted_times[remaining_in_current_cycle - 1]
649
- else:
650
- current_cycle_time = min_remaining_time
651
-
652
- return full_cycles_time + current_cycle_time
653
 
654
  async def handle_user_activity(self, session_id: str):
655
  """Update user activity timestamp and reset warning flags"""
 
240
  self._write_log(f"⚠️ CRITICAL: No GPU workers available! {queue_size} users waiting")
241
  self._write_log(" Please check worker processes and GPU availability")
242
 
243
+ def log_queue_limits_applied(self, affected_sessions: int, queue_size: int):
244
+ """Log when time limits are applied to existing sessions due to queue formation"""
245
+ timestamp = time.time()
246
+
247
+ # Human-readable log
248
+ self._write_log(f"🕐 QUEUE LIMITS APPLIED: {affected_sessions} existing sessions now have 60s limits")
249
+ self._write_log(f" 📊 Reason: Queue formed with {queue_size} waiting users")
250
+
251
+ # Structured data log
252
+ self._write_json_log(self.queue_metrics_file, {
253
+ "type": "queue_limits_applied",
254
+ "timestamp": timestamp,
255
+ "affected_sessions": affected_sessions,
256
+ "queue_size": queue_size,
257
+ "time_limit_applied": 60.0
258
+ })
259
+
260
+ def log_queue_status(self, queue_size: int, maximum_wait: float):
261
  """Log queue status"""
262
  self.queue_size_samples.append(queue_size)
263
 
 
269
  "type": "queue_status",
270
  "timestamp": timestamp,
271
  "queue_size": queue_size,
272
+ "maximum_wait": maximum_wait,
273
  "avg_queue_size": avg_queue_size
274
  })
275
 
276
  # Only log to human-readable if there's a queue
277
  if queue_size > 0:
278
+ self._write_log(f"📝 QUEUE STATUS: {queue_size} users waiting | Max wait: {maximum_wait:.1f}s")
279
  self._write_log(f" 📊 Avg queue size: {avg_queue_size:.1f}")
280
 
281
  def log_periodic_summary(self):
 
388
 
389
  async def add_session_to_queue(self, session: UserSession):
390
  """Add a session to the queue"""
391
+ # Check if queue was empty before adding this session
392
+ was_queue_empty = len(self.session_queue) == 0
393
+
394
  self.sessions[session.session_id] = session
395
  self.session_queue.append(session.session_id)
396
  session.status = SessionStatus.QUEUED
397
  session.queue_start_time = time.time()
398
  logger.info(f"Added session {session.session_id} to queue. Queue size: {len(self.session_queue)}")
399
+
400
+ # If queue was empty and now has users, apply time limits to existing active sessions
401
+ if was_queue_empty and len(self.session_queue) > 0:
402
+ await self.apply_queue_limits_to_existing_sessions()
403
+
404
+ async def apply_queue_limits_to_existing_sessions(self):
405
+ """Apply 60-second time limits to existing unlimited sessions when queue forms"""
406
+ current_time = time.time()
407
+ affected_sessions = 0
408
+
409
+ for session_id in list(self.active_sessions.keys()):
410
+ session = self.sessions.get(session_id)
411
+ if session and session.max_session_time is None: # Currently unlimited
412
+ # Give them 60 seconds from now
413
+ session.max_session_time = 60.0
414
+ session.last_activity = current_time # Reset activity timer to start 60s countdown
415
+ session.session_warning_sent = False # Reset warning flag
416
+ affected_sessions += 1
417
+
418
+ # Notify the user about the new time limit
419
+ try:
420
+ queue_size = len(self.session_queue)
421
+ user_text = "user" if queue_size == 1 else "users"
422
+ message = f"{queue_size} other {user_text} waiting. You have 60 seconds to finish."
423
+
424
+ await session.websocket.send_json({
425
+ "type": "queue_limit_applied",
426
+ "message": message,
427
+ "time_remaining": 60.0,
428
+ "queue_size": queue_size
429
+ })
430
+ logger.info(f"Applied 60s time limit to existing session {session_id} due to queue formation ({queue_size} users waiting)")
431
+ except Exception as e:
432
+ logger.error(f"Failed to notify session {session_id} about queue limit: {e}")
433
+
434
+ if affected_sessions > 0:
435
+ analytics.log_queue_limits_applied(affected_sessions, len(self.session_queue))
436
 
437
  async def process_queue(self):
438
  """Process the session queue"""
 
628
  session = self.sessions.get(session_id)
629
  if session and session.status == SessionStatus.QUEUED:
630
  try:
631
+ # Calculate maximum possible wait time
632
+ maximum_wait = self._calculate_maximum_wait_time(i + 1)
633
 
634
  await session.websocket.send_json({
635
  "type": "queue_update",
636
  "position": i + 1,
637
  "total_waiting": len(self.session_queue),
638
+ "maximum_wait_seconds": maximum_wait,
639
  "active_sessions": len(self.active_sessions),
640
  "available_workers": len([w for w in self.workers.values() if w.is_available])
641
  })
 
644
 
645
  # Log queue status if there's a queue
646
  if self.session_queue:
647
+ maximum_wait = self._calculate_maximum_wait_time(1)
648
+ analytics.log_queue_status(len(self.session_queue), maximum_wait)
649
 
650
+ def _calculate_maximum_wait_time(self, position_in_queue: int) -> float:
651
+ """Calculate maximum possible wait time (worst case scenario)"""
 
652
  available_workers = len([w for w in self.workers.values() if w.is_available])
653
 
654
  # If there are available workers, no wait time
655
  if available_workers > 0:
656
  return 0
657
 
658
+ # Calculate maximum wait time based on position and worker count
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
659
  num_workers = len(self.workers)
660
  if num_workers == 0:
661
  return 999 # No workers available
662
 
663
+ # When queue exists, each session is limited to 60 seconds maximum
664
+ # Calculate how many "waves" of users need to complete before this user gets GPU
665
+ waves_to_wait = (position_in_queue - 1) // num_workers
666
+ position_in_final_wave = (position_in_queue - 1) % num_workers + 1
667
+
668
+ # Maximum time per wave is 60 seconds (session time limit when queue exists)
669
+ max_session_time = self.MAX_SESSION_TIME_WITH_QUEUE
670
+
671
+ # Total maximum wait = (complete waves * 60s) + 60s for final wave
672
+ # The +1 wave accounts for current active sessions finishing
673
+ maximum_wait = (waves_to_wait + 1) * max_session_time
674
+
675
+ return maximum_wait
 
 
 
 
 
 
 
 
676
 
677
  async def handle_user_activity(self, session_id: str):
678
  """Update user activity timestamp and reset warning flags"""
static/index.html CHANGED
@@ -280,20 +280,24 @@
280
  console.log("Server detected user activity, resetting timeout");
281
  stopTimeoutCountdown();
282
  } else if (data.type === "queue_update") {
283
- console.log(`Queue update: Position ${data.position}/${data.total_waiting}, estimated wait: ${data.estimated_wait_seconds.toFixed(1)} seconds`);
284
- const waitSeconds = Math.ceil(data.estimated_wait_seconds);
285
 
286
  let waitText;
287
  if (waitSeconds === 0) {
288
  waitText = "Starting soon...";
289
  } else if (waitSeconds === 1) {
290
- waitText = "1 second";
291
  } else if (waitSeconds < 60) {
292
- waitText = `${waitSeconds} seconds`;
293
  } else {
294
  const minutes = Math.floor(waitSeconds / 60);
295
  const seconds = waitSeconds % 60;
296
- waitText = `${minutes}m ${seconds}s`;
 
 
 
 
297
  }
298
 
299
  const statusText = data.available_workers > 0 ?
@@ -302,7 +306,7 @@
302
 
303
  showConnectionStatus(
304
  statusText,
305
- `Estimated wait: ${waitText} (${data.active_sessions} active sessions)`
306
  );
307
  } else if (data.type === "session_start") {
308
  console.log("Session started, clearing queue display");
@@ -323,6 +327,10 @@
323
  } else if (data.type === "time_limit_removed") {
324
  console.log("Time limit removed - queue became empty");
325
  stopTimeoutCountdown();
 
 
 
 
326
  }
327
  };
328
  }
 
280
  console.log("Server detected user activity, resetting timeout");
281
  stopTimeoutCountdown();
282
  } else if (data.type === "queue_update") {
283
+ console.log(`Queue update: Position ${data.position}/${data.total_waiting}, maximum wait: ${data.maximum_wait_seconds.toFixed(1)} seconds`);
284
+ const waitSeconds = Math.ceil(data.maximum_wait_seconds);
285
 
286
  let waitText;
287
  if (waitSeconds === 0) {
288
  waitText = "Starting soon...";
289
  } else if (waitSeconds === 1) {
290
+ waitText = "1 second max";
291
  } else if (waitSeconds < 60) {
292
+ waitText = `${waitSeconds} seconds max`;
293
  } else {
294
  const minutes = Math.floor(waitSeconds / 60);
295
  const seconds = waitSeconds % 60;
296
+ if (seconds === 0) {
297
+ waitText = `${minutes} minutes max`;
298
+ } else {
299
+ waitText = `${minutes}m ${seconds}s max`;
300
+ }
301
  }
302
 
303
  const statusText = data.available_workers > 0 ?
 
306
 
307
  showConnectionStatus(
308
  statusText,
309
+ `Maximum wait: ${waitText} (${data.active_sessions} active sessions)`
310
  );
311
  } else if (data.type === "session_start") {
312
  console.log("Session started, clearing queue display");
 
327
  } else if (data.type === "time_limit_removed") {
328
  console.log("Time limit removed - queue became empty");
329
  stopTimeoutCountdown();
330
+ } else if (data.type === "queue_limit_applied") {
331
+ console.log(`Queue limit applied: ${data.message}, ${data.time_remaining} seconds remaining`);
332
+ setTimeoutMessage(`⏰ ${data.message} Time remaining: <span id="timeoutCountdown">${Math.ceil(data.time_remaining)}</span> seconds.`);
333
+ startTimeoutCountdown(Math.ceil(data.time_remaining));
334
  }
335
  };
336
  }