Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
import time | |
from datetime import datetime, timezone | |
import pandas as pd | |
from datasets import Dataset | |
from pandas.api.types import is_integer_dtype | |
from app import is_valid | |
from src.datamodel.data import F1Data | |
from src.display.formatting import styled_error, styled_message | |
from src.display.utils import ModelType | |
from src.envs import SUBMISSIONS_REPO | |
from src.logger import get_logger | |
from validation.validate import is_submission_file_valid | |
logger = get_logger(__name__) | |
def _validate_all_submissions_present( | |
lbdb: F1Data, | |
pd_ds: pd.DataFrame, | |
): | |
logger.info(f"Validating DS size {len(pd_ds)} columns {pd_ds.columns} set {set(pd_ds.columns)}") | |
expected_cols = ["problem_id", "solution"] | |
if set(pd_ds.columns) != set(expected_cols): | |
return ValueError(f"Expected attributes: {expected_cols}, Got: {pd_ds.columns.tolist()}") | |
if not is_integer_dtype(pd_ds["problem_id"]): | |
return ValueError("problem_id must be str convertible to int") | |
if any(type(v) is not str for v in pd_ds["solution"]): | |
return ValueError("solution must be of type str") | |
submitted_ids = set(pd_ds.problem_id.astype(str)) | |
if submitted_ids != lbdb.code_problem_ids: | |
missing = lbdb.code_problem_ids - submitted_ids | |
unknown = submitted_ids - lbdb.code_problem_ids | |
raise ValueError(f"Mismatched problem IDs: {len(missing)} missing, {len(unknown)} unknown") | |
if len(pd_ds) > len(lbdb.code_problem_ids): | |
return ValueError("Duplicate problem IDs exist in uploaded file") | |
def add_new_solutions( | |
lbdb: F1Data, | |
system_name: str, | |
org: str, | |
sys_type: str, | |
submission_path: str, | |
ensure_all_present: bool = False, | |
): | |
logger.info( | |
f"Adding new submission! {system_name=}, {org=}, {sys_type=} and {submission_path=}", | |
) | |
# Double-checking. | |
for val in [system_name, org, sys_type]: | |
assert is_valid(val) | |
assert is_submission_file_valid(submission_path) | |
sys_type = ModelType.from_str(sys_type).name | |
try: | |
submission_df = pd.read_json(submission_path, lines=True) | |
if ensure_all_present: | |
_validate_all_submissions_present(lbdb=lbdb, pd_ds=submission_df) | |
except Exception: | |
logger.warning("Failed to parse submission DF!", exc_info=True) | |
return styled_error( | |
"An error occurred. Please try again later." | |
) # Use same message as external error. Avoid infoleak. | |
submission_id = f"{system_name}_{org}_{sys_type}_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}" | |
# Seems good, creating the eval. | |
logger.info(f"Adding new submission: {submission_id}") | |
submission_ts = time.time_ns() | |
def add_info(row): | |
return { | |
**row, | |
"system_name": system_name, | |
"organization": org, | |
"system_type": sys_type, | |
"submission_id": submission_id, | |
"submission_ts": submission_ts, | |
} | |
ds = Dataset.from_pandas(submission_df).map(add_info) | |
ds.push_to_hub( | |
SUBMISSIONS_REPO, | |
submission_id, | |
private=True, | |
) | |
return styled_message( | |
"Your request has been submitted to the evaluation queue!\n" | |
+ "Results may take up to 24 hours to be processed and shown in the leaderboard." | |
) | |