Libra-1995 commited on
Commit
7747993
·
1 Parent(s): 61703cc

feat: udapte timeout feature

Browse files
Files changed (1) hide show
  1. web_server.py +82 -8
web_server.py CHANGED
@@ -8,9 +8,10 @@ import enum
8
  import hugsim_env
9
  import subprocess as sp
10
  import shutil
 
11
  from collections import deque, OrderedDict
12
- from datetime import datetime
13
- from typing import Any, Dict, Optional, List
14
  from dataclasses import dataclass
15
  sys.path.append(os.getcwd())
16
 
@@ -150,6 +151,32 @@ def delete_client_space(client_space_id: str):
150
  print(f"Failed to delete space {client_space_id}. It may not exist or already deleted.")
151
 
152
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
153
  class FifoDict:
154
  def __init__(self, max_size: int):
155
  self.max_size = max_size
@@ -186,6 +213,8 @@ class EnvHandler:
186
  This can include multiple scene and configurations.
187
  """
188
  def __init__(self, scene_list: List[SceneConfig], base_output: str):
 
 
189
  self._lock = threading.Lock()
190
  self.scene_list = scene_list
191
  self.base_output = base_output
@@ -228,6 +257,7 @@ class EnvHandler:
228
  """
229
  Reset the environment and initialize variables.
230
  """
 
231
  self._log_list = deque(maxlen=100)
232
  self._done = False
233
  self._score_list = []
@@ -238,11 +268,30 @@ class EnvHandler:
238
  """
239
  Get the current state of the environment.
240
  """
 
241
  return {
242
  "obs": self._obs,
243
  "info": self._info,
244
  }
245
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
246
  @property
247
  def has_done(self) -> bool:
248
  """
@@ -278,6 +327,7 @@ class EnvHandler:
278
  Returns:
279
  bool: True if the episode is done, False otherwise.
280
  """
 
281
  acc, steer_rate = traj2control(plan_traj, self._info)
282
  action = {'acc': acc, 'steer_rate': steer_rate}
283
  self._log("Executing action:", action)
@@ -350,8 +400,9 @@ class EnvHandlerManager:
350
  def __init__(self):
351
  self._env_handlers = {}
352
  self._lock = threading.Lock()
 
353
 
354
- def _get_scene_list(self, env_id: str, base_output: str) -> List[SceneConfig]:
355
  """
356
  Load the scene configurations from the YAML files.
357
  Returns:
@@ -385,7 +436,7 @@ class EnvHandlerManager:
385
 
386
  def _generate_env_handler(self, env_id: str):
387
  base_output = "/app/app_datas/env_output"
388
- scene_list = self._get_scene_list(env_id, base_output)
389
  output = os.path.join(base_output, f"{env_id}_hugsim_env")
390
  os.makedirs(output, exist_ok=True)
391
  return EnvHandler(scene_list, base_output=output)
@@ -426,6 +477,27 @@ class EnvHandlerManager:
426
  if env is not None:
427
  env.close()
428
  torch.cuda.empty_cache()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
429
 
430
 
431
  app = FastAPI()
@@ -447,10 +519,12 @@ def _get_env_handler(
447
  raise HTTPException(status_code=401)
448
 
449
  submission_id = token_info["submission_id"]
450
- if not env_manager.exists_env_handler(submission_id):
451
- update_submission_data(token_info["team_id"], submission_id, {"status": SubmissionStatus.PROCESSING.value})
 
 
452
 
453
- env_handler = env_manager.get_env_handler(submission_id)
454
  if env_handler is None:
455
  raise HTTPException(status_code=404, detail="Environment handler already closed.")
456
  return env_handler
@@ -516,7 +590,7 @@ def execute_action_endpoint(
516
  execute_result = env_handler.execute_action(plan_traj)
517
  if execute_result.done:
518
  token_info = get_token_info(auth_token)
519
- env_manager.close_env_handler(token_info["submission_id"])
520
  delete_client_space(token_info["client_space_id"])
521
  final_score = env_handler.calculate_score()
522
  update_submission_data(token_info["team_id"], token_info["submission_id"], {"status": SubmissionStatus.SUCCESS.value, "score": final_score})
 
8
  import hugsim_env
9
  import subprocess as sp
10
  import shutil
11
+ import time
12
  from collections import deque, OrderedDict
13
+ from datetime import datetime, timezone
14
+ from typing import Any, Dict, Optional, List, Tuple
15
  from dataclasses import dataclass
16
  sys.path.append(os.getcwd())
17
 
 
151
  print(f"Failed to delete space {client_space_id}. It may not exist or already deleted.")
152
 
153
 
154
+ def get_env_id(team_id: str, submission_id: str) -> str:
155
+ """
156
+ Generate a unique environment ID based on team ID and submission ID.
157
+ Args:
158
+ team_id (str): The team ID.
159
+ submission_id (str): The submission ID.
160
+ Returns:
161
+ str: The unique environment ID.
162
+ """
163
+ return f"{team_id}____{submission_id}"
164
+
165
+
166
+ def parse_env_id(env_id: str) -> Tuple[str, str]:
167
+ """
168
+ Parse the environment ID to extract team ID and submission ID.
169
+ Args:
170
+ env_id (str): The environment ID.
171
+ Returns:
172
+ Dict[str, str]: A dictionary containing team ID and submission ID.
173
+ """
174
+ parts = env_id.split('____')
175
+ if len(parts) != 2:
176
+ raise ValueError("Invalid environment ID format.")
177
+ return parts[0], parts[1]
178
+
179
+
180
  class FifoDict:
181
  def __init__(self, max_size: int):
182
  self.max_size = max_size
 
213
  This can include multiple scene and configurations.
214
  """
215
  def __init__(self, scene_list: List[SceneConfig], base_output: str):
216
+ self._created_time = datetime.now(timezone.utc)
217
+ self._last_active_time = datetime.now(timezone.utc)
218
  self._lock = threading.Lock()
219
  self.scene_list = scene_list
220
  self.base_output = base_output
 
257
  """
258
  Reset the environment and initialize variables.
259
  """
260
+ self._last_active_time = datetime.now(timezone.utc)
261
  self._log_list = deque(maxlen=100)
262
  self._done = False
263
  self._score_list = []
 
268
  """
269
  Get the current state of the environment.
270
  """
271
+ self._last_active_time = datetime.now(timezone.utc)
272
  return {
273
  "obs": self._obs,
274
  "info": self._info,
275
  }
276
 
277
+ @property
278
+ def created_time(self) -> datetime:
279
+ """
280
+ Get the creation time of the environment handler.
281
+ Returns:
282
+ datetime: The creation time.
283
+ """
284
+ return self._created_time
285
+
286
+ @property
287
+ def last_active_time(self) -> datetime:
288
+ """
289
+ Get the last active time of the environment handler.
290
+ Returns:
291
+ datetime: The last active time.
292
+ """
293
+ return self._last_active_time
294
+
295
  @property
296
  def has_done(self) -> bool:
297
  """
 
327
  Returns:
328
  bool: True if the episode is done, False otherwise.
329
  """
330
+ self._last_active_time = datetime.now(timezone.utc)
331
  acc, steer_rate = traj2control(plan_traj, self._info)
332
  action = {'acc': acc, 'steer_rate': steer_rate}
333
  self._log("Executing action:", action)
 
400
  def __init__(self):
401
  self._env_handlers = {}
402
  self._lock = threading.Lock()
403
+ threading.Thread(target=self._clean_expired_env_handlers, daemon=True).start()
404
 
405
+ def _get_scene_list(self, base_output: str) -> List[SceneConfig]:
406
  """
407
  Load the scene configurations from the YAML files.
408
  Returns:
 
436
 
437
  def _generate_env_handler(self, env_id: str):
438
  base_output = "/app/app_datas/env_output"
439
+ scene_list = self._get_scene_list(base_output)
440
  output = os.path.join(base_output, f"{env_id}_hugsim_env")
441
  os.makedirs(output, exist_ok=True)
442
  return EnvHandler(scene_list, base_output=output)
 
477
  if env is not None:
478
  env.close()
479
  torch.cuda.empty_cache()
480
+
481
+ def _clean_expired_env_handlers(self):
482
+ """
483
+ Clean up expired environment handlers based on the last active time.
484
+ """
485
+ while 1:
486
+ try:
487
+ current_time = datetime.now(timezone.utc)
488
+ with self._lock:
489
+ expired_env_ids = [
490
+ env_id
491
+ for env_id, handler in self._env_handlers.items()
492
+ if handler and ((current_time - handler.created_time).total_seconds() > 3600 * 1.5 or (current_time - handler.last_active_time).total_seconds() > 180)
493
+ ]
494
+ for env_id in expired_env_ids:
495
+ self.close_env_handler(env_id)
496
+ team_id, submission_id = parse_env_id(env_id)
497
+ update_submission_data(team_id, submission_id, {"status": SubmissionStatus.FAILED.value, "error_message": "SPACE_TIMEOUT"})
498
+ except Exception as e:
499
+ print(f"Error in cleaning expired environment handlers: {e}")
500
+ time.sleep(15)
501
 
502
 
503
  app = FastAPI()
 
519
  raise HTTPException(status_code=401)
520
 
521
  submission_id = token_info["submission_id"]
522
+ team_id = token_info["team_id"]
523
+ env_id = get_env_id(team_id, submission_id)
524
+ if not env_manager.exists_env_handler(env_id):
525
+ update_submission_data(team_id, submission_id, {"status": SubmissionStatus.PROCESSING.value})
526
 
527
+ env_handler = env_manager.get_env_handler(env_id)
528
  if env_handler is None:
529
  raise HTTPException(status_code=404, detail="Environment handler already closed.")
530
  return env_handler
 
590
  execute_result = env_handler.execute_action(plan_traj)
591
  if execute_result.done:
592
  token_info = get_token_info(auth_token)
593
+ env_manager.close_env_handler(get_env_id(token_info["team_id"], token_info["submission_id"]))
594
  delete_client_space(token_info["client_space_id"])
595
  final_score = env_handler.calculate_score()
596
  update_submission_data(token_info["team_id"], token_info["submission_id"], {"status": SubmissionStatus.SUCCESS.value, "score": final_score})