Spaces:
Runtime error
Runtime error
import json | |
import os | |
from datetime import datetime, timezone | |
from sklearn.metrics import f1_score | |
import pandas as pd | |
from pathlib import Path | |
from typing import Union | |
from huggingface_hub import hf_hub_download | |
from src.display.formatting import styled_error, styled_message, styled_warning | |
from src.envs import API, EVAL_REQUESTS_PATH, EVAL_RESULTS_PATH, TOKEN, QUEUE_REPO, RESULTS_REPO | |
from src.submission.check_validity import ( | |
already_submitted_models, | |
) | |
REQUESTED_MODELS = None | |
GROUND_TRUTH = None | |
class GroundTruth: | |
def __init__(self, csv_path: Union[str, Path]): | |
self.csv_path = Path(csv_path) | |
self._data = None | |
self._load_ground_truth() | |
def _load_ground_truth(self): | |
"""Load headerless CSV with image_name and class columns""" | |
try: | |
# Read CSV without header, specify column names and types | |
self._data = pd.read_csv( | |
self.csv_path, | |
header=None, | |
names=['image_id', 'true_label'], | |
dtype={'image_id': str, 'true_label': int}, | |
usecols=[0, 1] # Only read first two columns | |
) | |
# Validate column count | |
if self._data.shape[1] != 2: | |
raise ValueError("CSV must contain exactly 2 columns: image_name and class") | |
except FileNotFoundError: | |
raise RuntimeError(f"Ground truth file not found at {self.csv_path}") | |
except pd.errors.ParserError: | |
raise RuntimeError("Invalid CSV file format") | |
def get_ground_truth(self) -> pd.DataFrame: | |
"""Return the ground truth DataFrame""" | |
if self._data is None: | |
self._load_ground_truth() | |
return self._data.copy() # Return copy to prevent modification | |
def labels(self) -> dict: | |
"""Get dictionary of {image_id: true_label}""" | |
return dict(zip(self._data['image_id'], self._data['true_label'])) | |
def get_ground_truth(): | |
global GROUND_TRUTH | |
# Initialize ground truth | |
csv_path = hf_hub_download(repo_id=QUEUE_REPO, filename='./test_hidden.csv', repo_type="dataset", local_dir="./", token=TOKEN) | |
GROUND_TRUTH = GroundTruth(csv_path) | |
os.remove(csv_path) | |
def process_submission( | |
student_id: str, | |
model_name: str, | |
csv_upload: str, | |
): | |
"""Save submission request json file""" | |
global REQUESTED_MODELS | |
if not REQUESTED_MODELS: | |
REQUESTED_MODELS = already_submitted_models(EVAL_REQUESTS_PATH) | |
global GROUND_TRUTH | |
if not GROUND_TRUTH: | |
get_ground_truth() | |
submitted_time = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") | |
if not student_id.strip(): | |
return styled_error("Submission failed! Student ID cannot be empty!") | |
if csv_upload is None: | |
return styled_error("Submission failed! CSV file cannot be empty!") | |
# Seems good, creating the eval | |
print("Adding new eval request") | |
eval_entry = { | |
"student_id": student_id, | |
"model_name": model_name, | |
"submitted_time": submitted_time, | |
} | |
# Check for duplicate submission | |
if f"{student_id}_{model_name}_{submitted_time}" in REQUESTED_MODELS: | |
return styled_warning("This model has been already submitted.") | |
print("Creating eval reqest file") | |
OUT_DIR = f"{EVAL_REQUESTS_PATH}/{student_id}" | |
os.makedirs(OUT_DIR, exist_ok=True) | |
out_path = os.path.join(OUT_DIR, f"request_{student_id}_{model_name}_{submitted_time}.json") | |
with open(out_path, "w") as f: | |
f.write(json.dumps(eval_entry)) | |
print("Uploading eval request file") | |
API.upload_file( | |
path_or_fileobj=out_path, | |
path_in_repo=out_path.split("eval/")[1], | |
repo_id=QUEUE_REPO, | |
repo_type="dataset", | |
commit_message=f"Add {student_id}_{model_name} to eval queue at {submitted_time}", | |
) | |
"""Now compute the metrics""" | |
# Read submitted predictions | |
df = pd.read_csv(csv_upload.name) | |
# Assign column names | |
df.columns = ['image_id', 'prediction'] | |
# Validate CSV format | |
if df.shape[0] == GROUND_TRUTH.get_ground_truth().shape[0] + 1: | |
return "❌ CSV must contain the same number of rows as the ground truth" | |
print("Uploading the submitted CSV file") | |
csv_path = os.path.join(OUT_DIR, f"submission_{student_id}_{model_name}_{submitted_time}.csv") | |
API.upload_file( | |
path_or_fileobj=csv_upload.name, | |
path_in_repo=csv_path.split("eval/")[1], | |
repo_id=QUEUE_REPO, | |
repo_type="dataset", | |
commit_message=f"Add {student_id}_{model_name} submitted CSV at {submitted_time}", | |
) | |
# Get ground truth labels | |
true_labels = GROUND_TRUTH.get_ground_truth() | |
# Merge predictions with ground truth | |
merged = df.merge(true_labels, on='image_id') | |
# Calculate metrics | |
accuracy = (merged['prediction'] == merged['true_label']).mean() | |
f1 = f1_score(merged['true_label'], merged['prediction'], average='weighted') | |
# Save results (implement your storage logic) | |
# save_results(student_id, model_name, accuracy, f1) | |
print("Creating eval result file") | |
OUT_DIR = f"{EVAL_RESULTS_PATH}/{student_id}" | |
os.makedirs(OUT_DIR, exist_ok=True) | |
out_path = os.path.join(OUT_DIR, f"result_{student_id}_{model_name}_{submitted_time}.json") | |
result_entry = { | |
"config":{ | |
"student_id": student_id, | |
"model_name": model_name | |
}, | |
"results":{ | |
"classification": { | |
"accuracy": accuracy, | |
"f1": f1 | |
} | |
} | |
} | |
with open(out_path, "w") as f: | |
f.write(json.dumps(result_entry)) | |
print("Uploading eval result file") | |
API.upload_file( | |
path_or_fileobj=out_path, | |
path_in_repo=out_path.split("eval/")[1], | |
repo_id=RESULTS_REPO, | |
repo_type="dataset", | |
commit_message=f"Add {student_id}_{model_name} to eval queue at {submitted_time}", | |
) | |
print(f"Submission successful! Accuracy: {accuracy:.2%}, F1: {f1:.2%}") | |
return styled_message(f"Submission successful! Accuracy: {accuracy:.2%}, F1: {f1:.2%}") | |