Spaces:
Starting
on
T4
Starting
on
T4
import sys | |
import os | |
import pickle | |
import json | |
import threading | |
import time | |
import io | |
import enum | |
import hugsim_env | |
from collections import deque, OrderedDict | |
from datetime import datetime | |
from typing import Any, Dict | |
sys.path.append(os.getcwd()) | |
from fastapi import FastAPI, Body, Header, Depends, HTTPException | |
from fastapi.responses import HTMLResponse, Response | |
from omegaconf import OmegaConf | |
from huggingface_hub import HfApi, hf_hub_download | |
import open3d as o3d | |
import numpy as np | |
import gymnasium | |
import uvicorn | |
from sim.utils.sim_utils import traj2control, traj_transform_to_global | |
from sim.utils.score_calculator import hugsim_evaluate | |
HF_TOKEN = os.getenv('HF_TOKEN', None) | |
COMPETITION_ID = os.getenv('COMPETITION_ID', None) | |
hf_api = HfApi(token=HF_TOKEN) | |
class SubmissionStatus(enum.Enum): | |
PENDING = 0 | |
QUEUED = 1 | |
PROCESSING = 2 | |
SUCCESS = 3 | |
FAILED = 4 | |
def get_token_info(token: str) -> Dict[str, Any]: | |
token_info_path = hf_hub_download( | |
repo_id=COMPETITION_ID, | |
filename=f"token_data_info/{token}.json", | |
repo_type="dataset", | |
token=token | |
) | |
with open(token_info_path, 'r') as f: | |
token_info = json.load(f) | |
return token_info | |
def download_submission_info(team_id: str) -> Dict[str, Any]: | |
""" | |
Download the submission info from Hugging Face Hub. | |
Args: | |
team_id (str): The team ID. | |
Returns: | |
Dict[str, Any]: The submission info. | |
""" | |
submission_info_path = hf_hub_download( | |
repo_id=COMPETITION_ID, | |
filename=f"submission_info/{team_id}.json", | |
repo_type="dataset", | |
token=HF_TOKEN | |
) | |
with open(submission_info_path, 'r') as f: | |
submission_info = json.load(f) | |
return submission_info | |
def upload_submission_info(team_id: str, user_submission_info: Dict[str, Any]): | |
user_submission_info_json = json.dumps(user_submission_info, indent=4) | |
user_submission_info_json_bytes = user_submission_info_json.encode("utf-8") | |
user_submission_info_json_buffer = io.BytesIO(user_submission_info_json_bytes) | |
hf_api.upload_file( | |
path_or_fileobj=user_submission_info_json_buffer, | |
path_in_repo=f"submission_info/{team_id}.json", | |
repo_id=COMPETITION_ID, | |
repo_type="dataset", | |
) | |
def update_submission_status(team_id: str, submission_id: str, status: int): | |
user_submission_info = download_submission_info(team_id) | |
for submission in user_submission_info["submissions"]: | |
if submission["submission_id"] == submission_id: | |
submission["status"] = status | |
break | |
upload_submission_info(team_id, user_submission_info) | |
def delete_client_space(client_space_id: str): | |
hf_api.delete_repo( | |
repo_id=client_space_id, | |
repo_type="space" | |
) | |
class FifoDict: | |
def __init__(self, max_size: int): | |
self.max_size = max_size | |
self._order_dict = OrderedDict() | |
self.locker = threading.Lock() | |
def push(self, key: str, value: Any): | |
with self.locker: | |
if key in self._order_dict: | |
self._order_dict.move_to_end(key) | |
return | |
if len(self._order_dict) >= self.max_size: | |
self._order_dict.popitem(last=False) | |
self._order_dict[key] = value | |
def get(self, key: str) -> Any: | |
return self._order_dict.get(key, None) | |
class EnvHandler: | |
def __init__(self, cfg, output): | |
self.cfg = cfg | |
self.output = output | |
self.env = gymnasium.make('hugsim_env/HUGSim-v0', cfg=cfg, output=output) | |
self._lock = threading.Lock() | |
self.reset_env() | |
def close(self): | |
""" | |
Close the environment and release resources. | |
""" | |
self.env.close() | |
self._log("Environment closed.") | |
def reset_env(self): | |
""" | |
Reset the environment and initialize variables. | |
""" | |
self._cnt = 0 | |
self._done = False | |
self._save_data = {'type': 'closeloop', 'frames': []} | |
self._obs, self._info = self.env.reset() | |
self._log_list = deque(maxlen=100) | |
self._log("Environment reset complete.") | |
def get_current_state(self): | |
""" | |
Get the current state of the environment. | |
""" | |
return { | |
"obs": self._obs, | |
"info": self._info, | |
} | |
def has_done(self) -> bool: | |
""" | |
Check if the episode is done. | |
Returns: | |
bool: True if the episode is done, False otherwise. | |
""" | |
return self._done | |
def log_list(self) -> deque: | |
""" | |
Get the log list. | |
Returns: | |
deque: The log list containing recent log messages. | |
""" | |
return self._log_list | |
def execute_action(self, plan_traj: np.ndarray) -> bool: | |
""" | |
Execute the action based on the planned trajectory. | |
Args: | |
plan_traj (Any): The planned trajectory to follow. | |
Returns: | |
bool: True if the episode is done, False otherwise. | |
""" | |
acc, steer_rate = traj2control(plan_traj, self._info) | |
action = {'acc': acc, 'steer_rate': steer_rate} | |
self._log("Executing action:", action) | |
self._obs, _, terminated, truncated, self._info = self.env.step(action) | |
self._cnt += 1 | |
self._done = terminated or truncated or self._cnt > 400 | |
imu_plan_traj = plan_traj[:, [1, 0]] | |
imu_plan_traj[:, 1] *= -1 | |
global_traj = traj_transform_to_global(imu_plan_traj, self._info['ego_box']) | |
self._save_data['frames'].append({ | |
'time_stamp': self._info['timestamp'], | |
'is_key_frame': True, | |
'ego_box': self._info['ego_box'], | |
'obj_boxes': self._info['obj_boxes'], | |
'obj_names': ['car' for _ in self._info['obj_boxes']], | |
'planned_traj': { | |
'traj': global_traj, | |
'timestep': 0.5 | |
}, | |
'collision': self._info['collision'], | |
'rc': self._info['rc'] | |
}) | |
if not self._done: | |
return False | |
with open(os.path.join(self.output, 'data.pkl'), 'wb') as wf: | |
pickle.dump([self._save_data], wf) | |
ground_xyz = np.asarray(o3d.io.read_point_cloud(os.path.join(self.output, 'ground.ply')).points) | |
scene_xyz = np.asarray(o3d.io.read_point_cloud(os.path.join(self.output, 'scene.ply')).points) | |
results = hugsim_evaluate([self._save_data], ground_xyz, scene_xyz) | |
with open(os.path.join(self.output, 'eval.json'), 'w') as f: | |
json.dump(results, f) | |
self._log("Evaluation results saved.") | |
return True | |
def _log(self, *messages): | |
log_message = f"[{str(datetime.now())}]" + " ".join([str(msg) for msg in messages]) + "\n" | |
with self._lock: | |
self._log_list.append(log_message) | |
class EnvHandlerManager: | |
def __init__(self): | |
self._env_handlers = {} | |
self._lock = threading.Lock() | |
def _generate_env_handler(self, env_id: str): | |
base_path = os.path.join(os.path.dirname(__file__), 'docker', "web_server_config", 'nuscenes_base.yaml') | |
scenario_path = os.path.join(os.path.dirname(__file__), 'docker', "web_server_config", 'scene-0383-medium-00.yaml') | |
camera_path = os.path.join(os.path.dirname(__file__), 'docker', "web_server_config", 'nuscenes_camera.yaml') | |
kinematic_path = os.path.join(os.path.dirname(__file__), 'docker', "web_server_config", 'kinematic.yaml') | |
scenario_config = OmegaConf.load(scenario_path) | |
base_config = OmegaConf.load(base_path) | |
camera_config = OmegaConf.load(camera_path) | |
kinematic_config = OmegaConf.load(kinematic_path) | |
cfg = OmegaConf.merge( | |
{"scenario": scenario_config}, | |
{"base": base_config}, | |
{"camera": camera_config}, | |
{"kinematic": kinematic_config} | |
) | |
model_path = os.path.join(cfg.base.model_base, cfg.scenario.scene_name) | |
model_config = OmegaConf.load(os.path.join(model_path, 'cfg.yaml')) | |
model_config.update({"model_path": "/app/app_datas/PAMI2024/release/ss/scenes/nuscenes/scene-0383"}) | |
cfg.update(model_config) | |
cfg.base.output_dir = "/app/app_datas/env_output" | |
output = os.path.join(cfg.base.output_dir, f"{env_id}_hugsim_env") | |
os.makedirs(output, exist_ok=True) | |
return EnvHandler(cfg, output) | |
def get_env_handler(self, env_id: str) -> EnvHandler: | |
""" | |
Get the environment handler for the given environment ID. | |
Args: | |
env_id (str): The environment ID. | |
Returns: | |
EnvHandler: The environment handler instance. | |
""" | |
with self._lock: | |
if env_id not in self._env_handlers: | |
self._env_handlers[env_id] = self._generate_env_handler(env_id) | |
return self._env_handlers[env_id] | |
app = FastAPI() | |
_result_dict= FifoDict(max_size=100) | |
env_manager = EnvHandlerManager() | |
def _get_env_handler(auth_token: str = Header(...)) -> EnvHandler: | |
try: | |
token_info = get_token_info(auth_token) | |
except Exception: | |
raise HTTPException(status_code=401) | |
return env_manager.get_env_handler(token_info["submission_id"]) | |
def _load_numpy_ndarray_json_str(json_str: str) -> np.ndarray: | |
""" | |
Load a numpy ndarray from a JSON string. | |
""" | |
data = json.loads(json_str) | |
return np.array(data["data"], dtype=data["dtype"]).reshape(data["shape"]) | |
def reset_endpoint(env_handler: EnvHandler = Depends(_get_env_handler)): | |
""" | |
Reset the environment. | |
""" | |
env_handler.reset_env() | |
return {"success": True} | |
def get_current_state_endpoint(env_handler: EnvHandler = Depends(_get_env_handler)): | |
""" | |
Get the current state of the environment. | |
""" | |
state = env_handler.get_current_state() | |
return Response(content=pickle.dumps(state), media_type="application/octet-stream") | |
def execute_action_endpoint( | |
plan_traj: str = Body(..., embed=True), | |
transaction_id: str = Body(..., embed=True), | |
auth_token: str = Header(...), | |
env_handler: EnvHandler = Depends(_get_env_handler) | |
): | |
""" | |
Execute the action based on the planned trajectory. | |
Args: | |
plan_traj (str): The planned trajectory in JSON format. | |
transaction_id (str): The unique transaction ID for caching results. | |
env_handler (EnvHandler): The environment handler instance. | |
Returns: | |
Response: The response containing the execution result. | |
""" | |
cache_result = _result_dict.get(transaction_id) | |
if cache_result is not None: | |
return Response(content=cache_result, media_type="application/octet-stream") | |
if env_handler.has_done: | |
result = pickle.dumps({"done": done, "state": None}) | |
_result_dict.push(transaction_id, result) | |
return Response(content=result, media_type="application/octet-stream") | |
plan_traj = _load_numpy_ndarray_json_str(plan_traj) | |
done = env_handler.execute_action(plan_traj) | |
if done: | |
token_info = get_token_info(auth_token) | |
env_manager.get_env_handler(token_info["submission_id"]).close() | |
delete_client_space(token_info["client_space_id"]) | |
update_submission_status(token_info["team_id"], token_info["submission_id"], SubmissionStatus.SUCCESS.value) | |
result = pickle.dumps({"done": done, "state": None}) | |
_result_dict.push(transaction_id, result) | |
return Response(content=result, media_type="application/octet-stream") | |
state = env_handler.get_current_state() | |
result = pickle.dumps({"done": done, "state": state}) | |
_result_dict.push(transaction_id, result) | |
return Response(content=result, media_type="application/octet-stream") | |
def main_page_endpoint(env_handler: EnvHandler = Depends(_get_env_handler)): | |
""" | |
Main page endpoint to display logs. | |
""" | |
log_str = "\n".join(env_handler.log_list) | |
html_content = f""" | |
<html><body><pre>{log_str}</pre></body></html> | |
<script> | |
setTimeout(function() {{ | |
window.location.reload(); | |
}}, 5000); | |
</script> | |
""" | |
return HTMLResponse(content=html_content) | |
uvicorn.run(app, port=7860, workers=1) | |