Abhishek Thakur
fix org
695a2f7
raw
history blame
9.44 kB
import io
import json
import os
import shlex
import subprocess
import traceback
import requests
from huggingface_hub import HfApi, hf_hub_download
from loguru import logger
from competitions.enums import SubmissionStatus
from competitions.params import EvalParams
from . import HF_URL
def user_authentication(token):
if token.startswith("hf_oauth"):
_api_url = HF_URL + "/oauth/userinfo"
else:
_api_url = HF_URL + "/api/whoami-v2"
headers = {}
cookies = {}
if token.startswith("hf_"):
headers["Authorization"] = f"Bearer {token}"
else:
cookies = {"token": token}
try:
response = requests.get(
_api_url,
headers=headers,
cookies=cookies,
timeout=3,
)
except (requests.Timeout, ConnectionError) as err:
logger.error(f"Failed to request whoami-v2 - {repr(err)}")
raise Exception("Hugging Face Hub is unreachable, please try again later.")
resp = response.json()
user_info = {}
if "error" in resp:
return resp
if token.startswith("hf_oauth"):
user_info["id"] = resp["sub"]
user_info["name"] = resp["preferred_username"]
user_info["orgs"] = [resp["orgs"][k]["preferred_username"] for k in range(len(resp["orgs"]))]
else:
user_info["id"] = resp["id"]
user_info["name"] = resp["name"]
user_info["orgs"] = [resp["orgs"][k]["name"] for k in range(len(resp["orgs"]))]
return user_info
def make_clickable_user(user_id):
link = "https://huggingface.co/" + user_id
return f'<a target="_blank" href="{link}">{user_id}</a>'
def run_evaluation(params, local=False, wait=False):
params = json.loads(params)
if isinstance(params, str):
params = json.loads(params)
params = EvalParams(**params)
if not local:
params.output_path = "/tmp/model"
params.save(output_dir=params.output_path)
cmd = [
"python",
"-m",
"competitions.evaluate",
"--config",
os.path.join(params.output_path, "params.json"),
]
cmd = [str(c) for c in cmd]
logger.info(cmd)
env = os.environ.copy()
cmd = shlex.split(" ".join(cmd))
process = subprocess.Popen(cmd, env=env)
if wait:
process.wait()
return process.pid
def pause_space(params):
if "SPACE_ID" in os.environ:
if os.environ["SPACE_ID"].split("/")[-1].startswith("comp-"):
logger.info("Pausing space...")
api = HfApi(token=params.token)
api.pause_space(repo_id=os.environ["SPACE_ID"])
def delete_space(params):
if "SPACE_ID" in os.environ:
if os.environ["SPACE_ID"].split("/")[-1].startswith("comp-"):
logger.info("Deleting space...")
api = HfApi(token=params.token)
api.delete_repo(repo_id=os.environ["SPACE_ID"], repo_type="space")
def download_submission_info(params):
user_fname = hf_hub_download(
repo_id=params.competition_id,
filename=f"submission_info/{params.team_id}.json",
token=params.token,
repo_type="dataset",
)
with open(user_fname, "r", encoding="utf-8") as f:
user_submission_info = json.load(f)
return user_submission_info
def upload_submission_info(params, user_submission_info):
user_submission_info_json = json.dumps(user_submission_info, indent=4)
user_submission_info_json_bytes = user_submission_info_json.encode("utf-8")
user_submission_info_json_buffer = io.BytesIO(user_submission_info_json_bytes)
api = HfApi(token=params.token)
api.upload_file(
path_or_fileobj=user_submission_info_json_buffer,
path_in_repo=f"submission_info/{params.team_id}.json",
repo_id=params.competition_id,
repo_type="dataset",
)
def update_submission_status(params, status):
user_submission_info = download_submission_info(params)
for submission in user_submission_info["submissions"]:
if submission["submission_id"] == params.submission_id:
submission["status"] = status
break
upload_submission_info(params, user_submission_info)
def update_submission_score(params, public_score, private_score):
user_submission_info = download_submission_info(params)
for submission in user_submission_info["submissions"]:
if submission["submission_id"] == params.submission_id:
submission["public_score"] = public_score
submission["private_score"] = private_score
submission["status"] = "done"
break
upload_submission_info(params, user_submission_info)
def monitor(func):
def wrapper(*args, **kwargs):
params = kwargs.get("params", None)
if params is None and len(args) > 0:
params = args[0]
try:
return func(*args, **kwargs)
except Exception as e:
error_message = f"""{func.__name__} has failed due to an exception: {traceback.format_exc()}"""
logger.error(error_message)
logger.error(str(e))
update_submission_status(params, SubmissionStatus.FAILED.value)
pause_space(params)
return wrapper
def uninstall_requirements(requirements_fname):
if os.path.exists(requirements_fname):
# read the requirements.txt
uninstall_list = []
with open(requirements_fname, "r", encoding="utf-8") as f:
for line in f:
if line.startswith("-"):
uninstall_list.append(line[1:])
# create an uninstall.txt
with open("uninstall.txt", "w", encoding="utf-8") as f:
for line in uninstall_list:
f.write(line)
pipe = subprocess.Popen(
[
"pip",
"uninstall",
"-r",
"uninstall.txt",
"-y",
],
)
pipe.wait()
logger.info("Requirements uninstalled.")
return
def install_requirements(requirements_fname):
# check if params.project_name has a requirements.txt
if os.path.exists(requirements_fname):
# install the requirements using subprocess, wait for it to finish
install_list = []
with open(requirements_fname, "r", encoding="utf-8") as f:
for line in f:
# if line startswith - then skip but dont skip if line startswith --
if line.startswith("-"):
if not line.startswith("--"):
continue
install_list.append(line)
with open("install.txt", "w", encoding="utf-8") as f:
for line in install_list:
f.write(line)
pipe = subprocess.Popen(
[
"pip",
"install",
"-r",
"install.txt",
],
)
pipe.wait()
logger.info("Requirements installed.")
return
logger.info("No requirements.txt found. Skipping requirements installation.")
return
def can_user_submit_before_start(user_token, competition_organization):
user_info = user_authentication(token=user_token)
user_orgs = user_info.get("orgs", [])
for org in user_orgs:
if org == competition_organization:
return True
return False
def get_team_name(user_token, competition_id, hf_token):
user_info = user_authentication(token=user_token)
user_id = user_info["id"]
user_team = hf_hub_download(
repo_id=competition_id,
filename="user_team.json",
token=hf_token,
repo_type="dataset",
)
with open(user_team, "r", encoding="utf-8") as f:
user_team = json.load(f)
if user_id not in user_team:
return None
team_id = user_team[user_id]
team_metadata = hf_hub_download(
repo_id=competition_id,
filename="teams.json",
token=hf_token,
repo_type="dataset",
)
with open(team_metadata, "r", encoding="utf-8") as f:
team_metadata = json.load(f)
team_name = team_metadata[team_id]["name"]
return team_name
def update_team_name(user_token, new_team_name, competition_id, hf_token):
user_info = user_authentication(token=user_token)
user_id = user_info["id"]
user_team = hf_hub_download(
repo_id=competition_id,
filename="user_team.json",
token=hf_token,
repo_type="dataset",
)
with open(user_team, "r", encoding="utf-8") as f:
user_team = json.load(f)
if user_id not in user_team:
raise Exception("User is not part of a team")
team_id = user_team[user_id]
team_metadata = hf_hub_download(
repo_id=competition_id,
filename="teams.json",
token=hf_token,
repo_type="dataset",
)
with open(team_metadata, "r", encoding="utf-8") as f:
team_metadata = json.load(f)
team_metadata[team_id]["name"] = new_team_name
team_metadata_json = json.dumps(team_metadata, indent=4)
team_metadata_json_bytes = team_metadata_json.encode("utf-8")
team_metadata_json_buffer = io.BytesIO(team_metadata_json_bytes)
api = HfApi(token=hf_token)
api.upload_file(
path_or_fileobj=team_metadata_json_buffer,
path_in_repo="teams.json",
repo_id=competition_id,
repo_type="dataset",
)
return new_team_name