da03 commited on
Commit
5850e24
·
1 Parent(s): bad20e3
Files changed (1) hide show
  1. dispatcher.py +43 -12
dispatcher.py CHANGED
@@ -721,31 +721,62 @@ class SessionManager:
721
  analytics.log_queue_status(len(self.session_queue), maximum_wait)
722
 
723
  def _calculate_maximum_wait_time(self, position_in_queue: int) -> float:
724
- """Calculate maximum possible wait time (worst case scenario)"""
725
  available_workers = len([w for w in self.workers.values() if w.is_available])
726
 
727
  # If there are available workers, no wait time
728
  if available_workers > 0:
729
  return 0
730
 
731
- # Calculate maximum wait time based on position and worker count
732
  num_workers = len(self.workers)
733
  if num_workers == 0:
734
  return 999 # No workers available
735
 
736
- # When queue exists, each session is limited to 60 seconds maximum
737
- # Calculate how many "waves" of users need to complete before this user gets GPU
738
- waves_to_wait = (position_in_queue - 1) // num_workers
739
- position_in_final_wave = (position_in_queue - 1) % num_workers + 1
 
 
 
 
 
 
 
 
 
 
 
 
 
740
 
741
- # Maximum time per wave is 60 seconds (session time limit when queue exists)
742
- max_session_time = self.MAX_SESSION_TIME_WITH_QUEUE
743
 
744
- # Total maximum wait = (complete waves * 60s) + 60s for final wave
745
- # The +1 wave accounts for current active sessions finishing
746
- maximum_wait = (waves_to_wait + 1) * max_session_time
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
747
 
748
- return maximum_wait
749
 
750
  async def handle_user_activity(self, session_id: str):
751
  """Update user activity timestamp and reset warning flags"""
 
721
  analytics.log_queue_status(len(self.session_queue), maximum_wait)
722
 
723
  def _calculate_maximum_wait_time(self, position_in_queue: int) -> float:
724
+ """Calculate realistic wait time based on actual session remaining times"""
725
  available_workers = len([w for w in self.workers.values() if w.is_available])
726
 
727
  # If there are available workers, no wait time
728
  if available_workers > 0:
729
  return 0
730
 
731
+ # Calculate wait time based on position and worker count
732
  num_workers = len(self.workers)
733
  if num_workers == 0:
734
  return 999 # No workers available
735
 
736
+ # Get actual remaining times for active sessions
737
+ current_time = time.time()
738
+ active_session_remaining_times = []
739
+
740
+ for session_id in self.active_sessions:
741
+ session = self.sessions.get(session_id)
742
+ if session:
743
+ # Only consider session time limit (hard limit), not idle timeout
744
+ # Users can reset idle timeout by moving mouse, but session limit is fixed
745
+ if session.max_session_time and session.session_limit_start_time:
746
+ elapsed = current_time - session.session_limit_start_time
747
+ remaining_time = max(0, session.max_session_time - elapsed)
748
+ else:
749
+ # No session limit set (shouldn't happen when queue exists, but fallback)
750
+ remaining_time = self.MAX_SESSION_TIME_WITH_QUEUE
751
+
752
+ active_session_remaining_times.append(remaining_time)
753
 
754
+ # Sort remaining times (shortest first)
755
+ active_session_remaining_times.sort()
756
 
757
+ # Calculate when this user will get a worker
758
+ if position_in_queue <= len(active_session_remaining_times):
759
+ # User will get a worker when the Nth session ends
760
+ estimated_wait = active_session_remaining_times[position_in_queue - 1]
761
+ logger.info(f"Queue position {position_in_queue}: Will get worker when session #{position_in_queue} ends in {estimated_wait:.1f}s")
762
+ logger.info(f"Active session remaining times: {[f'{t:.1f}s' for t in active_session_remaining_times]}")
763
+ else:
764
+ # More people in queue than active sessions
765
+ # Need to wait for multiple "waves" to complete
766
+ full_waves = (position_in_queue - 1) // num_workers
767
+ position_in_final_wave = (position_in_queue - 1) % num_workers
768
+
769
+ # Wait for current sessions to end, then full waves, then partial wave
770
+ if active_session_remaining_times:
771
+ first_wave_time = max(active_session_remaining_times)
772
+ else:
773
+ first_wave_time = self.MAX_SESSION_TIME_WITH_QUEUE
774
+
775
+ additional_wave_time = full_waves * self.MAX_SESSION_TIME_WITH_QUEUE
776
+ estimated_wait = first_wave_time + additional_wave_time
777
+ logger.info(f"Queue position {position_in_queue}: Need {full_waves} full waves + current sessions. Wait: {estimated_wait:.1f}s")
778
 
779
+ return estimated_wait
780
 
781
  async def handle_user_activity(self, session_id: str):
782
  """Update user activity timestamp and reset warning flags"""