|
import glob |
|
import json |
|
import os |
|
import time |
|
from dataclasses import dataclass |
|
from datetime import datetime |
|
|
|
import pandas as pd |
|
from huggingface_hub import hf_hub_download, snapshot_download |
|
from loguru import logger |
|
|
|
from competitions.enums import SubmissionStatus |
|
|
|
|
|
@dataclass |
|
class Leaderboard: |
|
end_date: datetime |
|
eval_higher_is_better: bool |
|
max_selected_submissions: int |
|
competition_id: str |
|
token: str |
|
scoring_metric: str |
|
|
|
def __post_init__(self): |
|
self.non_score_columns = ["id", "submission_datetime"] |
|
|
|
def _process_public_lb(self): |
|
start_time = time.time() |
|
submissions_folder = snapshot_download( |
|
repo_id=self.competition_id, |
|
allow_patterns="submission_info/*.json", |
|
use_auth_token=self.token, |
|
repo_type="dataset", |
|
) |
|
logger.info(f"Downloaded submissions in {time.time() - start_time} seconds") |
|
start_time = time.time() |
|
submissions = [] |
|
for submission in glob.glob(os.path.join(submissions_folder, "submission_info", "*.json")): |
|
with open(submission, "r", encoding="utf-8") as f: |
|
submission_info = json.load(f) |
|
|
|
submission_info["submissions"] = [ |
|
sub for sub in submission_info["submissions"] if sub["status"] == SubmissionStatus.SUCCESS.value |
|
] |
|
submission_info["submissions"] = [ |
|
sub |
|
for sub in submission_info["submissions"] |
|
if datetime.strptime(sub["datetime"], "%Y-%m-%d %H:%M:%S") < self.end_date |
|
] |
|
if len(submission_info["submissions"]) == 0: |
|
continue |
|
|
|
user_id = submission_info["id"] |
|
user_submissions = [] |
|
for sub in submission_info["submissions"]: |
|
_sub = { |
|
"id": user_id, |
|
|
|
|
|
|
|
|
|
} |
|
for k, v in sub["public_score"].items(): |
|
_sub[k] = v |
|
_sub["submission_datetime"] = sub["datetime"] |
|
user_submissions.append(_sub) |
|
|
|
user_submissions.sort(key=lambda x: x[self.scoring_metric], reverse=self.eval_higher_is_better) |
|
best_user_submission = user_submissions[0] |
|
submissions.append(best_user_submission) |
|
logger.info(f"Processed submissions in {time.time() - start_time} seconds") |
|
return submissions |
|
|
|
def _process_private_lb(self): |
|
start_time = time.time() |
|
submissions_folder = snapshot_download( |
|
repo_id=self.competition_id, |
|
allow_patterns="submission_info/*.json", |
|
use_auth_token=self.token, |
|
repo_type="dataset", |
|
) |
|
logger.info(f"Downloaded submissions in {time.time() - start_time} seconds") |
|
start_time = time.time() |
|
submissions = [] |
|
for submission in glob.glob(os.path.join(submissions_folder, "submission_info", "*.json")): |
|
with open(submission, "r", encoding="utf-8") as f: |
|
submission_info = json.load(f) |
|
submission_info["submissions"] = [ |
|
sub for sub in submission_info["submissions"] if sub["status"] == SubmissionStatus.SUCCESS.value |
|
] |
|
if len(submission_info["submissions"]) == 0: |
|
continue |
|
|
|
user_id = submission_info["id"] |
|
user_submissions = [] |
|
for sub in submission_info["submissions"]: |
|
_sub = { |
|
"id": user_id, |
|
|
|
|
|
|
|
"selected": sub["selected"], |
|
} |
|
for k, v in sub["public_score"].items(): |
|
_sub[f"public_{k}"] = v |
|
for k, v in sub["private_score"].items(): |
|
_sub[f"private_{k}"] = v |
|
_sub["submission_datetime"] = sub["datetime"] |
|
user_submissions.append(_sub) |
|
|
|
|
|
selected_submissions = 0 |
|
for sub in user_submissions: |
|
if sub["selected"]: |
|
selected_submissions += 1 |
|
|
|
if selected_submissions == 0: |
|
|
|
user_submissions.sort( |
|
key=lambda x: x[f"public_{self.scoring_metric}"], reverse=self.eval_higher_is_better |
|
) |
|
|
|
best_user_submission = user_submissions[0] |
|
|
|
elif selected_submissions <= self.max_selected_submissions: |
|
|
|
user_submissions = [sub for sub in user_submissions if sub["selected"]] |
|
|
|
user_submissions.sort( |
|
key=lambda x: x[f"private_{self.scoring_metric}"], reverse=self.eval_higher_is_better |
|
) |
|
|
|
best_user_submission = user_submissions[0] |
|
else: |
|
logger.warning( |
|
f"User {user_id} has more than {self.max_selected_submissions} selected submissions. Skipping user..." |
|
) |
|
continue |
|
|
|
|
|
best_user_submission = {k: v for k, v in best_user_submission.items() if not k.startswith("public_")} |
|
|
|
|
|
best_user_submission = {k.replace("private_", ""): v for k, v in best_user_submission.items()} |
|
|
|
|
|
best_user_submission.pop("selected") |
|
submissions.append(best_user_submission) |
|
logger.info(f"Processed submissions in {time.time() - start_time} seconds") |
|
return submissions |
|
|
|
def fetch(self, private=False): |
|
if private: |
|
submissions = self._process_private_lb() |
|
else: |
|
submissions = self._process_public_lb() |
|
|
|
if len(submissions) == 0: |
|
return pd.DataFrame() |
|
|
|
df = pd.DataFrame(submissions) |
|
|
|
|
|
df["submission_datetime"] = pd.to_datetime(df["submission_datetime"], format="%Y-%m-%d %H:%M:%S") |
|
|
|
|
|
df = df[df["submission_datetime"] < self.end_date].reset_index(drop=True) |
|
|
|
|
|
|
|
if self.eval_higher_is_better: |
|
if private: |
|
df = df.sort_values( |
|
by=[self.scoring_metric, "submission_datetime"], |
|
ascending=[False, True], |
|
) |
|
else: |
|
df = df.sort_values( |
|
by=[self.scoring_metric, "submission_datetime"], |
|
ascending=[False, True], |
|
) |
|
else: |
|
if private: |
|
df = df.sort_values( |
|
by=[self.scoring_metric, "submission_datetime"], |
|
ascending=[True, True], |
|
) |
|
else: |
|
df = df.sort_values( |
|
by=[self.scoring_metric, "submission_datetime"], |
|
ascending=[True, True], |
|
) |
|
|
|
|
|
for col in df.columns: |
|
if col in self.non_score_columns: |
|
continue |
|
df[col] = df[col].round(4) |
|
|
|
|
|
df = df.reset_index(drop=True) |
|
df["rank"] = df.index + 1 |
|
|
|
|
|
df["submission_datetime"] = df["submission_datetime"].dt.strftime("%Y-%m-%d %H:%M:%S") |
|
|
|
|
|
columns = df.columns.tolist() |
|
columns.remove("submission_datetime") |
|
columns.append("submission_datetime") |
|
df = df[columns] |
|
|
|
|
|
columns = df.columns.tolist() |
|
columns.remove("rank") |
|
columns = ["rank"] + columns |
|
df = df[columns] |
|
|
|
team_metadata = hf_hub_download( |
|
repo_id=self.competition_id, |
|
filename="teams.json", |
|
token=self.token, |
|
repo_type="dataset", |
|
) |
|
with open(team_metadata, "r", encoding="utf-8") as f: |
|
team_metadata = json.load(f) |
|
|
|
df["id"] = df["id"].apply(lambda x: team_metadata[x]["name"]) |
|
|
|
return df |
|
|