Libra-1995's picture
fix: big space
5702264
import glob
import io
import json
import os
import time
import uuid
import shutil
import os
import base64
from dataclasses import dataclass
from typing import List, Dict, Any
from collections import defaultdict
import tenacity
import pandas as pd
from huggingface_hub import HfApi, hf_hub_download, snapshot_download
from loguru import logger
from competitions.enums import SubmissionStatus, ErrorMessage
from competitions.info import CompetitionInfo
from competitions.utils import user_token_api, space_cleaner, dockerfile_modifier, server_manager
IS_AUTO_PAUSE_HUGSIM_SERVER = os.getenv("IS_AUTO_PAUSE_HUGSIM_SERVER", "true").lower() == "true"
@dataclass
class JobRunner:
competition_id: str
token: str
output_path: str
def __post_init__(self):
self.competition_info = CompetitionInfo(competition_id=self.competition_id, autotrain_token=self.token)
self.competition_id = self.competition_info.competition_id
self.competition_type = self.competition_info.competition_type
self.metric = self.competition_info.metric
self.submission_id_col = self.competition_info.submission_id_col
self.submission_cols = self.competition_info.submission_cols
self.submission_rows = self.competition_info.submission_rows
self.time_limit = self.competition_info.time_limit
self.dataset = self.competition_info.dataset
self.submission_filenames = self.competition_info.submission_filenames
def _get_all_submissions(self) -> List[Dict[str, Any]]:
submission_jsons = snapshot_download(
repo_id=self.competition_id,
allow_patterns="submission_info/*.json",
token=self.token,
repo_type="dataset",
)
submission_jsons = glob.glob(os.path.join(submission_jsons, "submission_info/*.json"))
all_submissions = []
for _json_path in submission_jsons:
with open(_json_path, "r", encoding="utf-8") as f:
_json = json.load(f)
team_id = _json["id"]
for sub in _json["submissions"]:
all_submissions.append(
{
"team_id": team_id,
"submission_id": sub["submission_id"],
"datetime": sub["datetime"],
"status": sub["status"],
"submission_repo": sub["submission_repo"],
"space_id": sub["space_id"],
"server_url": sub["server_url"],
"hardware": sub["hardware"],
}
)
return all_submissions
def _get_pending_subs(self, submissions: List[Dict[str, Any]]) -> pd.DataFrame:
pending_submissions = []
for sub in submissions:
if sub["status"] == SubmissionStatus.PENDING.value:
pending_submissions.append(sub)
if len(pending_submissions) == 0:
return None
logger.info(f"Found {len(pending_submissions)} pending submissions.")
pending_submissions = pd.DataFrame(pending_submissions)
pending_submissions["datetime"] = pd.to_datetime(pending_submissions["datetime"])
pending_submissions = pending_submissions.sort_values("datetime")
pending_submissions = pending_submissions.reset_index(drop=True)
return pending_submissions
def _get_server_active_count(self, submissions: List[Dict[str, Any]]) -> Dict[str, int]:
server_active_count = defaultdict(int)
for sub in submissions:
if sub["status"] in {SubmissionStatus.PROCESSING.value, SubmissionStatus.QUEUED.value}:
server_active_count[sub["server_url"]] += 1
return server_active_count
def _queue_submission(self, team_id, submission_id):
team_fname = hf_hub_download(
repo_id=self.competition_id,
filename=f"submission_info/{team_id}.json",
token=self.token,
repo_type="dataset",
)
with open(team_fname, "r", encoding="utf-8") as f:
team_submission_info = json.load(f)
for submission in team_submission_info["submissions"]:
if submission["submission_id"] == submission_id:
submission["status"] = SubmissionStatus.QUEUED.value
break
team_submission_info_json = json.dumps(team_submission_info, indent=4)
team_submission_info_json_bytes = team_submission_info_json.encode("utf-8")
team_submission_info_json_buffer = io.BytesIO(team_submission_info_json_bytes)
api = HfApi(token=self.token)
api.upload_file(
path_or_fileobj=team_submission_info_json_buffer,
path_in_repo=f"submission_info/{team_id}.json",
repo_id=self.competition_id,
repo_type="dataset",
)
def mark_submission_failed(self, team_id: str, submission_id: str, error_message: str):
team_fname = hf_hub_download(
repo_id=self.competition_id,
filename=f"submission_info/{team_id}.json",
token=self.token,
repo_type="dataset",
)
with open(team_fname, "r", encoding="utf-8") as f:
team_submission_info = json.load(f)
for submission in team_submission_info["submissions"]:
if submission["submission_id"] == submission_id:
submission["status"] = SubmissionStatus.FAILED.value
submission["error_message"] = error_message
team_submission_info_json = json.dumps(team_submission_info, indent=4)
team_submission_info_json_bytes = team_submission_info_json.encode("utf-8")
team_submission_info_json_buffer = io.BytesIO(team_submission_info_json_bytes)
api = HfApi(token=self.token)
api.upload_file(
path_or_fileobj=team_submission_info_json_buffer,
path_in_repo=f"submission_info/{team_id}.json",
repo_id=self.competition_id,
repo_type="dataset",
)
def _create_readme(self, project_name: str) -> str:
_readme = "---\n"
_readme += f"title: {project_name}\n"
_readme += "emoji: 🚀\n"
_readme += "colorFrom: green\n"
_readme += "colorTo: indigo\n"
_readme += "sdk: docker\n"
_readme += "pinned: false\n"
_readme += "---\n"
return _readme
def create_space(self, team_id, submission_id, submission_repo, space_id, server_url, hardware):
user_token = user_token_api.get(team_id)
api = HfApi(token=self.token)
params = {
"space_id": space_id,
"client_space_id": space_id,
"competition_id": self.competition_id,
"team_id": team_id,
"submission_id": submission_id,
"output_path": self.output_path,
"submission_repo": submission_repo,
"time_limit": self.time_limit,
"dataset": self.dataset,
"submission_filenames": self.submission_filenames,
}
token_info_json = json.dumps(params, indent=4)
token_info_json_bytes = token_info_json.encode("utf-8")
token_info_json_buffer = io.BytesIO(token_info_json_bytes)
api = HfApi(token=self.token)
client_token = uuid.uuid4().hex + uuid.uuid4().hex
api.upload_file(
path_or_fileobj=token_info_json_buffer,
path_in_repo=f"token_data_info/{client_token}.json",
repo_id=self.competition_id,
repo_type="dataset",
)
api.create_repo(
repo_id=space_id,
repo_type="space",
space_sdk="docker",
space_hardware=hardware,
private=True,
)
api.add_space_secret(repo_id=space_id, key="HUGSIM_API_TOKEN", value=client_token)
api.add_space_secret(repo_id=space_id, key="HUGSIM_SERVER_HOST", value=server_url)
client_code_local_dir = f"/tmp/data/client_repo/{space_id}"
client_commits = api.list_repo_commits(submission_repo, repo_type="model", token=user_token)
api.snapshot_download(
repo_id=submission_repo,
repo_type="model",
revision=client_commits[0].commit_id,
token=user_token,
local_dir=client_code_local_dir,
allow_patterns=["Dockerfile"],
)
with open(f"{client_code_local_dir}/README.md", "w", encoding="utf-8") as f:
f.write(self._create_readme(space_id))
shutil.copyfile("./other_files/network_filter.so", os.path.join(client_code_local_dir, "network_filter.so"))
for filename in os.listdir(client_code_local_dir):
if filename.lower() == "dockerfile":
filepath = os.path.join(client_code_local_dir, filename)
with open(filepath, "r", encoding="utf-8") as f:
dockerfile_content = f.read()
with open(filepath, "w", encoding="utf-8") as f:
f.write(dockerfile_modifier.modify_dockerfile_content(dockerfile_content, submission_repo, base64.b64encode(user_token.encode()).decode())[0])
try:
api.upload_folder(
repo_id=space_id,
repo_type="space",
folder_path=client_code_local_dir,
)
finally:
shutil.rmtree(client_code_local_dir, ignore_errors=True)
self._queue_submission(team_id, submission_id)
@tenacity.retry(stop=tenacity.stop_never, wait=tenacity.wait_fixed(15))
def run(self):
cur = 0
while True:
time.sleep(5)
if cur == 10000:
cur = 0
cur += 1
all_submissions = self._get_all_submissions()
# Clean up spaces every 100 iterations
if cur % 100 == 1:
logger.info("Cleaning up spaces...")
ready_submissions_count = 0
for space in all_submissions:
if space["status"] in {SubmissionStatus.QUEUED.value, SubmissionStatus.PROCESSING.value}:
logger.info(f"Cleaning up space {space['space_id']} for submission {space['submission_id']}")
space_cleaner.clean_space(
space["space_id"],
space["team_id"],
space["submission_id"],
)
if space["status"] in {SubmissionStatus.QUEUED.value, SubmissionStatus.PROCESSING.value, SubmissionStatus.PROCESSING.value}:
ready_submissions_count += 1
if ready_submissions_count == 0:
if IS_AUTO_PAUSE_HUGSIM_SERVER:
server_manager.pause_all_servers()
pending_submissions = self._get_pending_subs(all_submissions)
if pending_submissions is None:
continue
first_pending_sub = pending_submissions.iloc[0]
server_active_count = self._get_server_active_count(all_submissions)
if server_active_count[first_pending_sub["server_url"]] >= 1:
continue
try:
if IS_AUTO_PAUSE_HUGSIM_SERVER:
server_manager.start_all_servers()
self.create_space(first_pending_sub["team_id"], first_pending_sub["submission_id"], first_pending_sub["submission_repo"], first_pending_sub["space_id"], first_pending_sub["server_url"], first_pending_sub["hardware"])
except Exception as e:
logger.error(
f"Failed to create space for {first_pending_sub['submission_id']}: {e}"
)
# mark submission as failed
self.mark_submission_failed(first_pending_sub['team_id'], first_pending_sub['submission_id'], ErrorMessage.FAILED_TO_CREATE_SPACE.value)
try:
space_cleaner.delete_space(first_pending_sub["space_id"])
except Exception as e:
logger.error(f"Failed to delete space {first_pending_sub['space_id']}: {e}")
logger.error(f"Marked submission {first_pending_sub['submission_id']} as failed.")
continue