import json import os from datetime import datetime, timezone from sklearn.metrics import f1_score import pandas as pd from pathlib import Path from typing import Union from huggingface_hub import hf_hub_download from src.display.formatting import styled_error, styled_message, styled_warning from src.envs import API, EVAL_REQUESTS_PATH, EVAL_RESULTS_PATH, TOKEN, QUEUE_REPO, RESULTS_REPO from src.submission.check_validity import ( already_submitted_models, ) REQUESTED_MODELS = None GROUND_TRUTH = None class GroundTruth: def __init__(self, csv_path: Union[str, Path]): self.csv_path = Path(csv_path) self._data = None self._load_ground_truth() def _load_ground_truth(self): """Load headerless CSV with image_name and class columns""" try: # Read CSV without header, specify column names and types self._data = pd.read_csv( self.csv_path, header=None, names=['image_id', 'true_label'], dtype={'image_id': str, 'true_label': int}, usecols=[0, 1] # Only read first two columns ) # Validate column count if self._data.shape[1] != 2: raise ValueError("CSV must contain exactly 2 columns: image_name and class") except FileNotFoundError: raise RuntimeError(f"Ground truth file not found at {self.csv_path}") except pd.errors.ParserError: raise RuntimeError("Invalid CSV file format") def get_ground_truth(self) -> pd.DataFrame: """Return the ground truth DataFrame""" if self._data is None: self._load_ground_truth() return self._data.copy() # Return copy to prevent modification @property def labels(self) -> dict: """Get dictionary of {image_id: true_label}""" return dict(zip(self._data['image_id'], self._data['true_label'])) def get_ground_truth(): global GROUND_TRUTH # Initialize ground truth csv_path = hf_hub_download(repo_id=QUEUE_REPO, filename='./test_hidden.csv', repo_type="dataset", local_dir="./", token=TOKEN) GROUND_TRUTH = GroundTruth(csv_path) os.remove(csv_path) def process_submission( student_id: str, model_name: str, csv_upload: str, ): """Save submission request json file""" global REQUESTED_MODELS if not REQUESTED_MODELS: REQUESTED_MODELS = already_submitted_models(EVAL_REQUESTS_PATH) global GROUND_TRUTH if not GROUND_TRUTH: get_ground_truth() submitted_time = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") if not student_id.strip(): return styled_error("Submission failed! Student ID cannot be empty!") if csv_upload is None: return styled_error("Submission failed! CSV file cannot be empty!") # Seems good, creating the eval print("Adding new eval request") eval_entry = { "student_id": student_id, "model_name": model_name, "submitted_time": submitted_time, } # Check for duplicate submission if f"{student_id}_{model_name}_{submitted_time}" in REQUESTED_MODELS: return styled_warning("This model has been already submitted.") print("Creating eval reqest file") OUT_DIR = f"{EVAL_REQUESTS_PATH}/{student_id}" os.makedirs(OUT_DIR, exist_ok=True) out_path = os.path.join(OUT_DIR, f"request_{student_id}_{model_name}_{submitted_time}.json") with open(out_path, "w") as f: f.write(json.dumps(eval_entry)) print("Uploading eval request file") API.upload_file( path_or_fileobj=out_path, path_in_repo=out_path.split("eval/")[1], repo_id=QUEUE_REPO, repo_type="dataset", commit_message=f"Add {student_id}_{model_name} to eval queue at {submitted_time}", ) """Now compute the metrics""" # Read submitted predictions df = pd.read_csv(csv_upload.name) # Assign column names df.columns = ['image_id', 'prediction'] # Validate CSV format if df.shape[0] == GROUND_TRUTH.get_ground_truth().shape[0] + 1: return "❌ CSV must contain the same number of rows as the ground truth" print("Uploading the submitted CSV file") csv_path = os.path.join(OUT_DIR, f"submission_{student_id}_{model_name}_{submitted_time}.csv") API.upload_file( path_or_fileobj=csv_upload.name, path_in_repo=csv_path.split("eval/")[1], repo_id=QUEUE_REPO, repo_type="dataset", commit_message=f"Add {student_id}_{model_name} submitted CSV at {submitted_time}", ) # Get ground truth labels true_labels = GROUND_TRUTH.get_ground_truth() # Merge predictions with ground truth merged = df.merge(true_labels, on='image_id') # Calculate metrics accuracy = (merged['prediction'] == merged['true_label']).mean() f1 = f1_score(merged['true_label'], merged['prediction'], average='weighted') # Save results (implement your storage logic) # save_results(student_id, model_name, accuracy, f1) print("Creating eval result file") OUT_DIR = f"{EVAL_RESULTS_PATH}/{student_id}" os.makedirs(OUT_DIR, exist_ok=True) out_path = os.path.join(OUT_DIR, f"result_{student_id}_{model_name}_{submitted_time}.json") result_entry = { "config":{ "student_id": student_id, "model_name": model_name }, "results":{ "classification": { "accuracy": accuracy, "f1": f1 } } } with open(out_path, "w") as f: f.write(json.dumps(result_entry)) print("Uploading eval result file") API.upload_file( path_or_fileobj=out_path, path_in_repo=out_path.split("eval/")[1], repo_id=RESULTS_REPO, repo_type="dataset", commit_message=f"Add {student_id}_{model_name} to eval queue at {submitted_time}", ) print(f"Submission successful! Accuracy: {accuracy:.2%}, F1: {f1:.2%}") return styled_message(f"Submission successful! Accuracy: {accuracy:.2%}, F1: {f1:.2%}")