|
import glob |
|
import io |
|
import json |
|
import os |
|
import time |
|
import uuid |
|
import shutil |
|
import os |
|
import base64 |
|
from dataclasses import dataclass |
|
from typing import List, Dict, Any |
|
from collections import defaultdict |
|
|
|
import tenacity |
|
import pandas as pd |
|
from huggingface_hub import HfApi, hf_hub_download, snapshot_download |
|
from loguru import logger |
|
|
|
from competitions.enums import SubmissionStatus, ErrorMessage |
|
from competitions.info import CompetitionInfo |
|
from competitions.utils import user_token_api, space_cleaner, dockerfile_modifier, server_manager |
|
|
|
IS_AUTO_PAUSE_HUGSIM_SERVER = os.getenv("IS_AUTO_PAUSE_HUGSIM_SERVER", "true").lower() == "true" |
|
|
|
|
|
@dataclass |
|
class JobRunner: |
|
competition_id: str |
|
token: str |
|
output_path: str |
|
|
|
def __post_init__(self): |
|
self.competition_info = CompetitionInfo(competition_id=self.competition_id, autotrain_token=self.token) |
|
self.competition_id = self.competition_info.competition_id |
|
self.competition_type = self.competition_info.competition_type |
|
self.metric = self.competition_info.metric |
|
self.submission_id_col = self.competition_info.submission_id_col |
|
self.submission_cols = self.competition_info.submission_cols |
|
self.submission_rows = self.competition_info.submission_rows |
|
self.time_limit = self.competition_info.time_limit |
|
self.dataset = self.competition_info.dataset |
|
self.submission_filenames = self.competition_info.submission_filenames |
|
|
|
def _get_all_submissions(self) -> List[Dict[str, Any]]: |
|
submission_jsons = snapshot_download( |
|
repo_id=self.competition_id, |
|
allow_patterns="submission_info/*.json", |
|
token=self.token, |
|
repo_type="dataset", |
|
) |
|
submission_jsons = glob.glob(os.path.join(submission_jsons, "submission_info/*.json")) |
|
all_submissions = [] |
|
for _json_path in submission_jsons: |
|
with open(_json_path, "r", encoding="utf-8") as f: |
|
_json = json.load(f) |
|
team_id = _json["id"] |
|
for sub in _json["submissions"]: |
|
all_submissions.append( |
|
{ |
|
"team_id": team_id, |
|
"submission_id": sub["submission_id"], |
|
"datetime": sub["datetime"], |
|
"status": sub["status"], |
|
"submission_repo": sub["submission_repo"], |
|
"space_id": sub["space_id"], |
|
"server_url": sub["server_url"], |
|
"hardware": sub["hardware"], |
|
} |
|
) |
|
return all_submissions |
|
|
|
|
|
def _get_pending_subs(self, submissions: List[Dict[str, Any]]) -> pd.DataFrame: |
|
pending_submissions = [] |
|
for sub in submissions: |
|
if sub["status"] == SubmissionStatus.PENDING.value: |
|
pending_submissions.append(sub) |
|
if len(pending_submissions) == 0: |
|
return None |
|
logger.info(f"Found {len(pending_submissions)} pending submissions.") |
|
pending_submissions = pd.DataFrame(pending_submissions) |
|
pending_submissions["datetime"] = pd.to_datetime(pending_submissions["datetime"]) |
|
pending_submissions = pending_submissions.sort_values("datetime") |
|
pending_submissions = pending_submissions.reset_index(drop=True) |
|
return pending_submissions |
|
|
|
def _get_server_active_count(self, submissions: List[Dict[str, Any]]) -> Dict[str, int]: |
|
server_active_count = defaultdict(int) |
|
for sub in submissions: |
|
if sub["status"] in {SubmissionStatus.PROCESSING.value, SubmissionStatus.QUEUED.value}: |
|
server_active_count[sub["server_url"]] += 1 |
|
return server_active_count |
|
|
|
def _queue_submission(self, team_id, submission_id): |
|
team_fname = hf_hub_download( |
|
repo_id=self.competition_id, |
|
filename=f"submission_info/{team_id}.json", |
|
token=self.token, |
|
repo_type="dataset", |
|
) |
|
with open(team_fname, "r", encoding="utf-8") as f: |
|
team_submission_info = json.load(f) |
|
|
|
for submission in team_submission_info["submissions"]: |
|
if submission["submission_id"] == submission_id: |
|
submission["status"] = SubmissionStatus.QUEUED.value |
|
break |
|
|
|
team_submission_info_json = json.dumps(team_submission_info, indent=4) |
|
team_submission_info_json_bytes = team_submission_info_json.encode("utf-8") |
|
team_submission_info_json_buffer = io.BytesIO(team_submission_info_json_bytes) |
|
api = HfApi(token=self.token) |
|
api.upload_file( |
|
path_or_fileobj=team_submission_info_json_buffer, |
|
path_in_repo=f"submission_info/{team_id}.json", |
|
repo_id=self.competition_id, |
|
repo_type="dataset", |
|
) |
|
|
|
def mark_submission_failed(self, team_id: str, submission_id: str, error_message: str): |
|
team_fname = hf_hub_download( |
|
repo_id=self.competition_id, |
|
filename=f"submission_info/{team_id}.json", |
|
token=self.token, |
|
repo_type="dataset", |
|
) |
|
with open(team_fname, "r", encoding="utf-8") as f: |
|
team_submission_info = json.load(f) |
|
|
|
for submission in team_submission_info["submissions"]: |
|
if submission["submission_id"] == submission_id: |
|
submission["status"] = SubmissionStatus.FAILED.value |
|
submission["error_message"] = error_message |
|
|
|
team_submission_info_json = json.dumps(team_submission_info, indent=4) |
|
team_submission_info_json_bytes = team_submission_info_json.encode("utf-8") |
|
team_submission_info_json_buffer = io.BytesIO(team_submission_info_json_bytes) |
|
|
|
api = HfApi(token=self.token) |
|
api.upload_file( |
|
path_or_fileobj=team_submission_info_json_buffer, |
|
path_in_repo=f"submission_info/{team_id}.json", |
|
repo_id=self.competition_id, |
|
repo_type="dataset", |
|
) |
|
|
|
def _create_readme(self, project_name: str) -> str: |
|
_readme = "---\n" |
|
_readme += f"title: {project_name}\n" |
|
_readme += "emoji: 🚀\n" |
|
_readme += "colorFrom: green\n" |
|
_readme += "colorTo: indigo\n" |
|
_readme += "sdk: docker\n" |
|
_readme += "pinned: false\n" |
|
_readme += "---\n" |
|
return _readme |
|
|
|
def create_space(self, team_id, submission_id, submission_repo, space_id, server_url, hardware): |
|
user_token = user_token_api.get(team_id) |
|
|
|
api = HfApi(token=self.token) |
|
params = { |
|
"space_id": space_id, |
|
"client_space_id": space_id, |
|
"competition_id": self.competition_id, |
|
"team_id": team_id, |
|
"submission_id": submission_id, |
|
"output_path": self.output_path, |
|
"submission_repo": submission_repo, |
|
"time_limit": self.time_limit, |
|
"dataset": self.dataset, |
|
"submission_filenames": self.submission_filenames, |
|
} |
|
token_info_json = json.dumps(params, indent=4) |
|
token_info_json_bytes = token_info_json.encode("utf-8") |
|
token_info_json_buffer = io.BytesIO(token_info_json_bytes) |
|
|
|
api = HfApi(token=self.token) |
|
client_token = uuid.uuid4().hex + uuid.uuid4().hex |
|
|
|
api.upload_file( |
|
path_or_fileobj=token_info_json_buffer, |
|
path_in_repo=f"token_data_info/{client_token}.json", |
|
repo_id=self.competition_id, |
|
repo_type="dataset", |
|
) |
|
api.create_repo( |
|
repo_id=space_id, |
|
repo_type="space", |
|
space_sdk="docker", |
|
space_hardware=hardware, |
|
private=True, |
|
) |
|
|
|
api.add_space_secret(repo_id=space_id, key="HUGSIM_API_TOKEN", value=client_token) |
|
api.add_space_secret(repo_id=space_id, key="HUGSIM_SERVER_HOST", value=server_url) |
|
client_code_local_dir = f"/tmp/data/client_repo/{space_id}" |
|
client_commits = api.list_repo_commits(submission_repo, repo_type="model", token=user_token) |
|
api.snapshot_download( |
|
repo_id=submission_repo, |
|
repo_type="model", |
|
revision=client_commits[0].commit_id, |
|
token=user_token, |
|
local_dir=client_code_local_dir, |
|
allow_patterns=["Dockerfile"], |
|
) |
|
with open(f"{client_code_local_dir}/README.md", "w", encoding="utf-8") as f: |
|
f.write(self._create_readme(space_id)) |
|
shutil.copyfile("./other_files/network_filter.so", os.path.join(client_code_local_dir, "network_filter.so")) |
|
for filename in os.listdir(client_code_local_dir): |
|
if filename.lower() == "dockerfile": |
|
filepath = os.path.join(client_code_local_dir, filename) |
|
with open(filepath, "r", encoding="utf-8") as f: |
|
dockerfile_content = f.read() |
|
with open(filepath, "w", encoding="utf-8") as f: |
|
f.write(dockerfile_modifier.modify_dockerfile_content(dockerfile_content, submission_repo, base64.b64encode(user_token.encode()).decode())[0]) |
|
try: |
|
api.upload_folder( |
|
repo_id=space_id, |
|
repo_type="space", |
|
folder_path=client_code_local_dir, |
|
) |
|
finally: |
|
shutil.rmtree(client_code_local_dir, ignore_errors=True) |
|
self._queue_submission(team_id, submission_id) |
|
|
|
@tenacity.retry(stop=tenacity.stop_never, wait=tenacity.wait_fixed(15)) |
|
def run(self): |
|
cur = 0 |
|
while True: |
|
time.sleep(5) |
|
if cur == 10000: |
|
cur = 0 |
|
cur += 1 |
|
all_submissions = self._get_all_submissions() |
|
|
|
|
|
if cur % 100 == 1: |
|
logger.info("Cleaning up spaces...") |
|
ready_submissions_count = 0 |
|
for space in all_submissions: |
|
if space["status"] in {SubmissionStatus.QUEUED.value, SubmissionStatus.PROCESSING.value}: |
|
logger.info(f"Cleaning up space {space['space_id']} for submission {space['submission_id']}") |
|
space_cleaner.clean_space( |
|
space["space_id"], |
|
space["team_id"], |
|
space["submission_id"], |
|
) |
|
if space["status"] in {SubmissionStatus.QUEUED.value, SubmissionStatus.PROCESSING.value, SubmissionStatus.PROCESSING.value}: |
|
ready_submissions_count += 1 |
|
if ready_submissions_count == 0: |
|
if IS_AUTO_PAUSE_HUGSIM_SERVER: |
|
server_manager.pause_all_servers() |
|
|
|
pending_submissions = self._get_pending_subs(all_submissions) |
|
if pending_submissions is None: |
|
continue |
|
first_pending_sub = pending_submissions.iloc[0] |
|
server_active_count = self._get_server_active_count(all_submissions) |
|
|
|
if server_active_count[first_pending_sub["server_url"]] >= 1: |
|
continue |
|
try: |
|
if IS_AUTO_PAUSE_HUGSIM_SERVER: |
|
server_manager.start_all_servers() |
|
self.create_space(first_pending_sub["team_id"], first_pending_sub["submission_id"], first_pending_sub["submission_repo"], first_pending_sub["space_id"], first_pending_sub["server_url"], first_pending_sub["hardware"]) |
|
except Exception as e: |
|
logger.error( |
|
f"Failed to create space for {first_pending_sub['submission_id']}: {e}" |
|
) |
|
|
|
self.mark_submission_failed(first_pending_sub['team_id'], first_pending_sub['submission_id'], ErrorMessage.FAILED_TO_CREATE_SPACE.value) |
|
try: |
|
space_cleaner.delete_space(first_pending_sub["space_id"]) |
|
except Exception as e: |
|
logger.error(f"Failed to delete space {first_pending_sub['space_id']}: {e}") |
|
logger.error(f"Marked submission {first_pending_sub['submission_id']} as failed.") |
|
continue |
|
|