|
import io |
|
import json |
|
import uuid |
|
from dataclasses import dataclass |
|
from datetime import datetime |
|
|
|
import pandas as pd |
|
from huggingface_hub import HfApi, hf_hub_download |
|
from huggingface_hub.utils._errors import EntryNotFoundError |
|
from loguru import logger |
|
|
|
from .errors import AuthenticationError, PastDeadlineError, SubmissionError, SubmissionLimitError |
|
from .utils import user_authentication |
|
|
|
|
|
@dataclass |
|
class Submissions: |
|
competition_id: str |
|
submission_limit: str |
|
end_date: datetime |
|
token: str |
|
|
|
def __post_init__(self): |
|
self.public_sub_columns = [ |
|
"datetime", |
|
"submission_id", |
|
"public_score", |
|
"submission_comment", |
|
"selected", |
|
"status", |
|
] |
|
self.private_sub_columns = [ |
|
"datetime", |
|
"submission_id", |
|
"public_score", |
|
"private_score", |
|
"submission_comment", |
|
"selected", |
|
"status", |
|
] |
|
|
|
def _verify_submission(self, bytes_data): |
|
return True |
|
|
|
def _add_new_user(self, user_info): |
|
api = HfApi(token=self.token) |
|
user_submission_info = {} |
|
user_submission_info["name"] = user_info["name"] |
|
user_submission_info["id"] = user_info["id"] |
|
user_submission_info["submissions"] = [] |
|
|
|
user_submission_info_json = json.dumps(user_submission_info, indent=4) |
|
user_submission_info_json_bytes = user_submission_info_json.encode("utf-8") |
|
user_submission_info_json_buffer = io.BytesIO(user_submission_info_json_bytes) |
|
|
|
api.upload_file( |
|
path_or_fileobj=user_submission_info_json_buffer, |
|
path_in_repo=f"submission_info/{user_info['id']}.json", |
|
repo_id=self.competition_id, |
|
repo_type="dataset", |
|
) |
|
|
|
def _check_user_submission_limit(self, user_info): |
|
user_id = user_info["id"] |
|
try: |
|
user_fname = hf_hub_download( |
|
repo_id=self.competition_id, |
|
filename=f"submission_info/{user_id}.json", |
|
token=self.token, |
|
repo_type="dataset", |
|
) |
|
except EntryNotFoundError: |
|
self._add_new_user(user_info) |
|
user_fname = hf_hub_download( |
|
repo_id=self.competition_id, |
|
filename=f"submission_info/{user_id}.json", |
|
token=self.token, |
|
repo_type="dataset", |
|
) |
|
except Exception as e: |
|
logger.error(e) |
|
raise Exception("Hugging Face Hub is unreachable, please try again later.") |
|
|
|
with open(user_fname, "r", encoding="utf-8") as f: |
|
user_submission_info = json.load(f) |
|
|
|
todays_date = datetime.now().strftime("%Y-%m-%d") |
|
if len(user_submission_info["submissions"]) == 0: |
|
user_submission_info["submissions"] = [] |
|
|
|
|
|
todays_submissions = 0 |
|
for sub in user_submission_info["submissions"]: |
|
if sub["date"] == todays_date: |
|
todays_submissions += 1 |
|
if todays_submissions >= self.submission_limit: |
|
return False |
|
return True |
|
|
|
def _submissions_today(self, user_info): |
|
user_id = user_info["id"] |
|
try: |
|
user_fname = hf_hub_download( |
|
repo_id=self.competition_id, |
|
filename=f"submission_info/{user_id}.json", |
|
token=self.token, |
|
repo_type="dataset", |
|
) |
|
except EntryNotFoundError: |
|
self._add_new_user(user_info) |
|
user_fname = hf_hub_download( |
|
repo_id=self.competition_id, |
|
filename=f"submission_info/{user_id}.json", |
|
token=self.token, |
|
repo_type="dataset", |
|
) |
|
except Exception as e: |
|
logger.error(e) |
|
raise Exception("Hugging Face Hub is unreachable, please try again later.") |
|
|
|
with open(user_fname, "r", encoding="utf-8") as f: |
|
user_submission_info = json.load(f) |
|
|
|
todays_date = datetime.now().strftime("%Y-%m-%d") |
|
if len(user_submission_info["submissions"]) == 0: |
|
user_submission_info["submissions"] = [] |
|
|
|
|
|
todays_submissions = 0 |
|
for sub in user_submission_info["submissions"]: |
|
if sub["date"] == todays_date: |
|
todays_submissions += 1 |
|
return todays_submissions |
|
|
|
def _increment_submissions(self, user_id, submission_id, submission_comment): |
|
user_fname = hf_hub_download( |
|
repo_id=self.competition_id, |
|
filename=f"submission_info/{user_id}.json", |
|
token=self.token, |
|
repo_type="dataset", |
|
) |
|
with open(user_fname, "r", encoding="utf-8") as f: |
|
user_submission_info = json.load(f) |
|
todays_date = datetime.now().strftime("%Y-%m-%d") |
|
current_time = datetime.now().strftime("%H:%M:%S") |
|
|
|
|
|
user_submission_info["submissions"].append( |
|
{ |
|
"date": todays_date, |
|
"time": current_time, |
|
"submission_id": submission_id, |
|
"submission_comment": submission_comment, |
|
"status": "pending", |
|
"selected": False, |
|
"public_score": -1, |
|
"private_score": -1, |
|
} |
|
) |
|
|
|
todays_submissions = 0 |
|
for sub in user_submission_info["submissions"]: |
|
if sub["date"] == todays_date: |
|
todays_submissions += 1 |
|
|
|
|
|
user_submission_info_json = json.dumps(user_submission_info, indent=4) |
|
user_submission_info_json_bytes = user_submission_info_json.encode("utf-8") |
|
user_submission_info_json_buffer = io.BytesIO(user_submission_info_json_bytes) |
|
api = HfApi(token=self.token) |
|
api.upload_file( |
|
path_or_fileobj=user_submission_info_json_buffer, |
|
path_in_repo=f"submission_info/{user_id}.json", |
|
repo_id=self.competition_id, |
|
repo_type="dataset", |
|
) |
|
return todays_submissions |
|
|
|
def _download_user_subs(self, user_id): |
|
user_fname = hf_hub_download( |
|
repo_id=self.competition_id, |
|
filename=f"submission_info/{user_id}.json", |
|
token=self.token, |
|
repo_type="dataset", |
|
) |
|
with open(user_fname, "r", encoding="utf-8") as f: |
|
user_submission_info = json.load(f) |
|
return user_submission_info["submissions"] |
|
|
|
def update_selected_submissions(self, user_token, selected_submission_ids): |
|
current_datetime = datetime.now() |
|
if current_datetime > self.end_date: |
|
raise PastDeadlineError("Competition has ended.") |
|
|
|
user_info = self._get_user_info(user_token) |
|
user_id = user_info["id"] |
|
|
|
user_fname = hf_hub_download( |
|
repo_id=self.competition_id, |
|
filename=f"submission_info/{user_id}.json", |
|
token=self.token, |
|
repo_type="dataset", |
|
) |
|
with open(user_fname, "r", encoding="utf-8") as f: |
|
user_submission_info = json.load(f) |
|
|
|
for sub in user_submission_info["submissions"]: |
|
if sub["submission_id"] in selected_submission_ids: |
|
sub["selected"] = True |
|
else: |
|
sub["selected"] = False |
|
|
|
|
|
user_submission_info_json = json.dumps(user_submission_info, indent=4) |
|
user_submission_info_json_bytes = user_submission_info_json.encode("utf-8") |
|
user_submission_info_json_buffer = io.BytesIO(user_submission_info_json_bytes) |
|
api = HfApi(token=self.token) |
|
api.upload_file( |
|
path_or_fileobj=user_submission_info_json_buffer, |
|
path_in_repo=f"submission_info/{user_id}.json", |
|
repo_id=self.competition_id, |
|
repo_type="dataset", |
|
) |
|
|
|
def _get_user_subs(self, user_info, private=False): |
|
|
|
user_id = user_info["id"] |
|
try: |
|
user_submissions = self._download_user_subs(user_id) |
|
except EntryNotFoundError: |
|
logger.warning("No submissions found for user") |
|
return pd.DataFrame(), pd.DataFrame() |
|
|
|
submissions_df = pd.DataFrame(user_submissions) |
|
|
|
if not private: |
|
submissions_df = submissions_df.drop(columns=["private_score"]) |
|
submissions_df = submissions_df[self.public_sub_columns] |
|
else: |
|
submissions_df = submissions_df[self.private_sub_columns] |
|
if not private: |
|
failed_submissions = submissions_df[ |
|
(submissions_df["status"].isin(["failed", "error"])) | (submissions_df["public_score"] == -1) |
|
] |
|
successful_submissions = submissions_df[ |
|
~submissions_df["status"].isin(["failed", "error"]) & (submissions_df["public_score"] != -1) |
|
] |
|
else: |
|
failed_submissions = submissions_df[ |
|
(submissions_df["status"].isin(["failed", "error"])) |
|
| (submissions_df["private_score"] == -1) |
|
| (submissions_df["public_score"] == -1) |
|
] |
|
successful_submissions = submissions_df[ |
|
~submissions_df["status"].isin(["failed", "error"]) |
|
& (submissions_df["private_score"] != -1) |
|
& (submissions_df["public_score"] != -1) |
|
] |
|
failed_submissions = failed_submissions.reset_index(drop=True) |
|
successful_submissions = successful_submissions.reset_index(drop=True) |
|
|
|
if not private: |
|
first_submission = successful_submissions.iloc[0] |
|
if isinstance(first_submission["public_score"], dict): |
|
|
|
temp_scores_df = successful_submissions["public_score"].apply(pd.Series) |
|
temp_scores_df = temp_scores_df.rename(columns=lambda x: "public_" + str(x)) |
|
successful_submissions = pd.concat( |
|
[ |
|
successful_submissions.drop(["public_score"], axis=1), |
|
temp_scores_df, |
|
], |
|
axis=1, |
|
) |
|
else: |
|
first_submission = successful_submissions.iloc[0] |
|
if isinstance(first_submission["private_score"], dict): |
|
|
|
temp_scores_df = successful_submissions["private_score"].apply(pd.Series) |
|
temp_scores_df = temp_scores_df.rename(columns=lambda x: "private_" + str(x)) |
|
successful_submissions = pd.concat( |
|
[ |
|
successful_submissions.drop(["private_score"], axis=1), |
|
temp_scores_df, |
|
], |
|
axis=1, |
|
) |
|
|
|
if isinstance(first_submission["public_score"], dict): |
|
|
|
temp_scores_df = successful_submissions["public_score"].apply(pd.Series) |
|
temp_scores_df = temp_scores_df.rename(columns=lambda x: "public_" + str(x)) |
|
successful_submissions = pd.concat( |
|
[ |
|
successful_submissions.drop(["public_score"], axis=1), |
|
temp_scores_df, |
|
], |
|
axis=1, |
|
) |
|
return successful_submissions, failed_submissions |
|
|
|
def _get_user_info(self, user_token): |
|
user_info = user_authentication(token=user_token) |
|
if "error" in user_info: |
|
raise AuthenticationError("Invalid token") |
|
|
|
if user_info["emailVerified"] is False: |
|
raise AuthenticationError("Please verify your email on Hugging Face Hub") |
|
return user_info |
|
|
|
def my_submissions(self, user_token): |
|
user_info = self._get_user_info(user_token) |
|
current_date_time = datetime.now() |
|
private = False |
|
if current_date_time >= self.end_date: |
|
private = True |
|
success_subs, failed_subs = self._get_user_subs(user_info, private=private) |
|
return success_subs, failed_subs |
|
|
|
def new_submission(self, user_token, uploaded_file, submission_comment): |
|
|
|
user_info = self._get_user_info(user_token) |
|
|
|
|
|
if self._check_user_submission_limit(user_info) is False: |
|
raise SubmissionLimitError("Submission limit reached") |
|
|
|
logger.info(type(uploaded_file)) |
|
bytes_data = uploaded_file.file.read() |
|
|
|
|
|
if not self._verify_submission(bytes_data): |
|
raise SubmissionError("Invalid submission file") |
|
else: |
|
user_id = user_info["id"] |
|
submission_id = str(uuid.uuid4()) |
|
file_extension = uploaded_file.filename.split(".")[-1] |
|
|
|
api = HfApi(token=self.token) |
|
api.upload_file( |
|
path_or_fileobj=bytes_data, |
|
path_in_repo=f"submissions/{user_id}-{submission_id}.{file_extension}", |
|
repo_id=self.competition_id, |
|
repo_type="dataset", |
|
) |
|
|
|
submissions_made = self._increment_submissions( |
|
user_id=user_id, |
|
submission_id=submission_id, |
|
submission_comment="", |
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
remaining_submissions = self.submission_limit - submissions_made |
|
return remaining_submissions |
|
|