da03 commited on
Commit
4036b4b
·
1 Parent(s): 38d7454
Files changed (2) hide show
  1. dispatcher.py +66 -6
  2. static/index.html +20 -3
dispatcher.py CHANGED
@@ -228,21 +228,81 @@ class SessionManager:
228
  session = self.sessions.get(session_id)
229
  if session and session.status == SessionStatus.QUEUED:
230
  try:
231
- # Calculate estimated wait time
232
- active_sessions_count = len(self.active_sessions)
233
- avg_session_time = self.MAX_SESSION_TIME_WITH_QUEUE if active_sessions_count > 0 else 30.0
234
- estimated_wait = (i + 1) * avg_session_time / max(len(self.workers), 1)
235
 
236
  await session.websocket.send_json({
237
  "type": "queue_update",
238
  "position": i + 1,
239
  "total_waiting": len(self.session_queue),
240
  "estimated_wait_seconds": estimated_wait,
241
- "active_sessions": active_sessions_count
 
242
  })
243
  except Exception as e:
244
  logger.error(f"Failed to send queue update to session {session_id}: {e}")
245
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
246
  async def handle_user_activity(self, session_id: str):
247
  """Update user activity timestamp"""
248
  session = self.sessions.get(session_id)
@@ -428,7 +488,7 @@ async def periodic_queue_update():
428
  while True:
429
  try:
430
  await session_manager.update_queue_info()
431
- await asyncio.sleep(5) # Update every 5 seconds
432
  except Exception as e:
433
  logger.error(f"Error in periodic queue update: {e}")
434
 
 
228
  session = self.sessions.get(session_id)
229
  if session and session.status == SessionStatus.QUEUED:
230
  try:
231
+ # Calculate dynamic estimated wait time
232
+ estimated_wait = self._calculate_dynamic_wait_time(i + 1)
 
 
233
 
234
  await session.websocket.send_json({
235
  "type": "queue_update",
236
  "position": i + 1,
237
  "total_waiting": len(self.session_queue),
238
  "estimated_wait_seconds": estimated_wait,
239
+ "active_sessions": len(self.active_sessions),
240
+ "available_workers": len([w for w in self.workers.values() if w.is_available])
241
  })
242
  except Exception as e:
243
  logger.error(f"Failed to send queue update to session {session_id}: {e}")
244
 
245
+ def _calculate_dynamic_wait_time(self, position_in_queue: int) -> float:
246
+ """Calculate dynamic estimated wait time based on current session progress"""
247
+ current_time = time.time()
248
+ available_workers = len([w for w in self.workers.values() if w.is_available])
249
+
250
+ # If there are available workers, no wait time
251
+ if available_workers > 0:
252
+ return 0
253
+
254
+ # Calculate remaining time for active sessions
255
+ min_remaining_time = float('inf')
256
+ active_session_times = []
257
+
258
+ for session_id in self.active_sessions:
259
+ session = self.sessions.get(session_id)
260
+ if session and session.last_activity:
261
+ if session.max_session_time:
262
+ # Session has time limit (queue exists)
263
+ elapsed = current_time - session.last_activity
264
+ remaining = session.max_session_time - elapsed
265
+ remaining = max(0, remaining) # Don't go negative
266
+ else:
267
+ # No time limit, estimate based on average usage
268
+ elapsed = current_time - session.last_activity
269
+ # Assume sessions without time limits will run for average of 45 seconds more
270
+ remaining = max(45 - elapsed, 10) # Minimum 10 seconds
271
+
272
+ active_session_times.append(remaining)
273
+ min_remaining_time = min(min_remaining_time, remaining)
274
+
275
+ # If no active sessions found, use default
276
+ if not active_session_times:
277
+ min_remaining_time = 30.0
278
+
279
+ # Calculate estimated wait time based on position
280
+ num_workers = len(self.workers)
281
+ if num_workers == 0:
282
+ return 999 # No workers available
283
+
284
+ if position_in_queue <= num_workers:
285
+ # User will get a worker as soon as current sessions end
286
+ return min_remaining_time
287
+ else:
288
+ # User needs to wait for multiple session cycles
289
+ cycles_to_wait = (position_in_queue - 1) // num_workers
290
+ remaining_in_current_cycle = (position_in_queue - 1) % num_workers + 1
291
+
292
+ # Time for complete cycles (use average session time)
293
+ avg_session_time = self.MAX_SESSION_TIME_WITH_QUEUE if len(self.session_queue) > 0 else 45.0
294
+ full_cycles_time = cycles_to_wait * avg_session_time
295
+
296
+ # Time for current partial cycle
297
+ if remaining_in_current_cycle <= len(active_session_times):
298
+ # Sort session times to get when the Nth worker will be free
299
+ sorted_times = sorted(active_session_times)
300
+ current_cycle_time = sorted_times[remaining_in_current_cycle - 1]
301
+ else:
302
+ current_cycle_time = min_remaining_time
303
+
304
+ return full_cycles_time + current_cycle_time
305
+
306
  async def handle_user_activity(self, session_id: str):
307
  """Update user activity timestamp"""
308
  session = self.sessions.get(session_id)
 
488
  while True:
489
  try:
490
  await session_manager.update_queue_info()
491
+ await asyncio.sleep(2) # Update every 2 seconds for responsive experience
492
  except Exception as e:
493
  logger.error(f"Error in periodic queue update: {e}")
494
 
static/index.html CHANGED
@@ -282,10 +282,27 @@
282
  } else if (data.type === "queue_update") {
283
  console.log(`Queue update: Position ${data.position}/${data.total_waiting}, estimated wait: ${data.estimated_wait_seconds.toFixed(1)} seconds`);
284
  const waitSeconds = Math.ceil(data.estimated_wait_seconds);
285
- const waitText = waitSeconds === 1 ? "1 second" : `${waitSeconds} seconds`;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
286
  showConnectionStatus(
287
- `Position ${data.position} in queue`,
288
- `Estimated wait time: ${waitText}`
289
  );
290
  } else if (data.type === "session_start") {
291
  console.log("Session started, clearing queue display");
 
282
  } else if (data.type === "queue_update") {
283
  console.log(`Queue update: Position ${data.position}/${data.total_waiting}, estimated wait: ${data.estimated_wait_seconds.toFixed(1)} seconds`);
284
  const waitSeconds = Math.ceil(data.estimated_wait_seconds);
285
+
286
+ let waitText;
287
+ if (waitSeconds === 0) {
288
+ waitText = "Starting soon...";
289
+ } else if (waitSeconds === 1) {
290
+ waitText = "1 second";
291
+ } else if (waitSeconds < 60) {
292
+ waitText = `${waitSeconds} seconds`;
293
+ } else {
294
+ const minutes = Math.floor(waitSeconds / 60);
295
+ const seconds = waitSeconds % 60;
296
+ waitText = `${minutes}m ${seconds}s`;
297
+ }
298
+
299
+ const statusText = data.available_workers > 0 ?
300
+ "Processing queue..." :
301
+ `Position ${data.position} in queue`;
302
+
303
  showConnectionStatus(
304
+ statusText,
305
+ `Estimated wait: ${waitText} (${data.active_sessions} active sessions)`
306
  );
307
  } else if (data.type === "session_start") {
308
  console.log("Session started, clearing queue display");