import datetime import os import threading import time import io import traceback import json from typing import Dict, Any, List, Optional from urllib.parse import urlencode from fastapi import Depends, FastAPI, Form, HTTPException, Request, UploadFile, Body, Query from fastapi.responses import HTMLResponse, JSONResponse, FileResponse, StreamingResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from huggingface_hub import hf_hub_download from huggingface_hub.utils import disable_progress_bars from huggingface_hub.utils._errors import EntryNotFoundError from loguru import logger from pydantic import BaseModel, Field from requests.exceptions import RequestException from filelock import FileLock, Timeout as FileLockTimeout from competitions import __version__, utils from competitions.errors import AuthenticationError, PastDeadlineError, SubmissionError, SubmissionLimitError from competitions.info import CompetitionInfo from competitions.leaderboard import Leaderboard from competitions.oauth import attach_oauth from competitions.runner import JobRunner from competitions.submissions import Submissions from competitions.text import SUBMISSION_SELECTION_TEXT, SUBMISSION_TEXT from competitions.utils import team_file_api, submission_api, leaderboard_api, error_log_api from competitions.enums import SubmissionStatus, ErrorMessage HF_TOKEN = os.environ.get("HF_TOKEN", None) BASE_DIR = os.path.dirname(os.path.abspath(__file__)) COMPETITION_ID = os.environ.get("COMPETITION_ID") OUTPUT_PATH = os.environ.get("OUTPUT_PATH", "/tmp/model") START_DATE = os.environ.get("START_DATE", "2000-12-31") DISABLE_PUBLIC_LB = int(os.environ.get("DISABLE_PUBLIC_LB", 0)) disable_progress_bars() try: REQUIREMENTS_FNAME = hf_hub_download( repo_id=COMPETITION_ID, filename="requirements.txt", token=HF_TOKEN, repo_type="dataset", ) except EntryNotFoundError: REQUIREMENTS_FNAME = None if REQUIREMENTS_FNAME: logger.info("Uninstalling and installing requirements") utils.uninstall_requirements(REQUIREMENTS_FNAME) utils.install_requirements(REQUIREMENTS_FNAME) class LeaderboardRequest(BaseModel): lb: str class UpdateSelectedSubmissionsRequest(BaseModel): submission_ids: str class UpdateTeamNameRequest(BaseModel): new_team_name: str # "majar": { # "type": "string", # "title": "Majar", # "minLength": 1, # }, # "degree": { # "type": "string", # "title": "Degree", # "enum": ["Bachelor", "Master", "PhD"] # }, # "advising_professor": { # "type": "string", # "title": "Advising Professor", # "minLength": 1, # }, # "grade": { # "type": "string", # "title": "Grade", # "minLength": 1, # }, # "expected_graduation_year": { # "type": "string", # "title": "Expected Graduation Year", # "minLength": 1, # } class TeamMember(BaseModel): name: str = Field(..., max_length=50, description="Name of the team member") email: str = Field(..., max_length=100, description="Email of the team member") institution: str = Field(..., max_length=100, description="Institution of the team member") is_student: bool = Field(..., description="Is the team member a student?") majar: Optional[str] = Field(None, max_length=50, description="Major of the team member") degree: Optional[str] = Field(None, description="Degree of the team member") advising_professor: Optional[str] = Field(None, max_length=50, description="Advising professor of the team member") grade: Optional[str] = Field(None, max_length=50, description="Grade of the team member") expected_graduation_year: Optional[str] = Field(None, max_length=4, description="Expected graduation year of the team member") class RegisterRequest(BaseModel): team_name: str = Field(..., max_length=20, description="Name of the team") team_members: List[TeamMember] = Field(min_length=1, max_length=100, description="List of team members") def run_job_runner(): job_runner = JobRunner( competition_id=COMPETITION_ID, token=HF_TOKEN, output_path=OUTPUT_PATH, ) job_runner.run() def start_job_runner_thread(): thread = threading.Thread(target=run_job_runner) # thread.daemon = True thread.start() return thread def watchdog(job_runner_thread): while True: if not job_runner_thread.is_alive(): logger.warning("Job runner thread stopped. Restarting...") job_runner_thread = start_job_runner_thread() time.sleep(10) job_runner_thread = start_job_runner_thread() watchdog_thread = threading.Thread(target=watchdog, args=(job_runner_thread,)) watchdog_thread.daemon = True watchdog_thread.start() app = FastAPI() attach_oauth(app) static_path = os.path.join(BASE_DIR, "static") app.mount("/static", StaticFiles(directory=static_path), name="static") templates_path = os.path.join(BASE_DIR, "templates") templates = Jinja2Templates(directory=templates_path) @app.get("/", response_class=HTMLResponse) def read_form(request: Request): """ This function is used to render the HTML file :param request: :return: """ if HF_TOKEN is None: return HTTPException(status_code=500, detail="HF_TOKEN is not set.") competition_info = CompetitionInfo(competition_id=COMPETITION_ID, autotrain_token=HF_TOKEN) context = { "request": request, "logo": competition_info.logo_url, "competition_type": competition_info.competition_type, "version": __version__, "rules_available": competition_info.rules is not None, } return templates.TemplateResponse("index.html", context) @app.get("/login_status", response_class=JSONResponse) def use_oauth(request: Request, user_token: str = Depends(utils.user_authentication)): result = { "is_login": False, "is_admin": False, "is_registered": False, } comp_org = COMPETITION_ID.split("/")[0] if user_token: result["is_login"] = True result["is_admin"] = utils.is_user_admin(user_token, comp_org) team_info = utils.team_file_api.get_team_info(user_token) result["is_registered"] = team_info is not None result["is_white_team"] = team_info and team_info["id"] in team_file_api.get_team_white_list() return {"response": result} @app.get("/logout", response_class=HTMLResponse) def user_logout(request: Request): """Endpoint that logs out the user (e.g. delete cookie session).""" if "oauth_info" in request.session: request.session.pop("oauth_info", None) competition_info = CompetitionInfo(competition_id=COMPETITION_ID, autotrain_token=HF_TOKEN) context = { "request": request, "logo": competition_info.logo_url, "competition_type": competition_info.competition_type, "__version__": __version__, "rules_available": competition_info.rules is not None, } return templates.TemplateResponse("index.html", context) @app.get("/competition_info", response_class=JSONResponse) def get_comp_info(request: Request): competition_info = CompetitionInfo(competition_id=COMPETITION_ID, autotrain_token=HF_TOKEN) info = competition_info.competition_desc resp = {"response": info} return resp @app.get("/dataset_info", response_class=JSONResponse) def get_dataset_info(request: Request): competition_info = CompetitionInfo(competition_id=COMPETITION_ID, autotrain_token=HF_TOKEN) info = competition_info.dataset_desc resp = {"response": info} return resp @app.get("/rules", response_class=JSONResponse) def get_rules(request: Request): competition_info = CompetitionInfo(competition_id=COMPETITION_ID, autotrain_token=HF_TOKEN) if competition_info.rules is not None: return {"response": competition_info.rules} return {"response": "No rules available."} @app.get("/submission_info", response_class=JSONResponse) def get_submission_info(request: Request, user_token: str = Depends(utils.user_authentication)): if utils.team_file_api.get_team_info(user_token) is None: return {"response": "Please register your team first."} competition_info = CompetitionInfo(competition_id=COMPETITION_ID, autotrain_token=HF_TOKEN) info = competition_info.submission_desc resp = {"response": info} return resp @app.post("/leaderboard", response_class=JSONResponse) def fetch_leaderboard(_: str = Depends(utils.user_authentication)): df = leaderboard_api.get_leaderboard() if len(df) == 0: return {"response": "No teams yet. Why not make a submission?"} resp = {"response": df.to_markdown(index=False)} return resp @app.post("/my_submissions", response_class=JSONResponse) def my_submissions(request: Request, user_token: str = Depends(utils.user_authentication)): competition_info = CompetitionInfo(competition_id=COMPETITION_ID, autotrain_token=HF_TOKEN) if user_token is None: return { "response": { "submissions": "", "submission_text": SUBMISSION_TEXT.format(competition_info.submission_limit), "error": "**Invalid token. Please login.**", "team_name": "", } } team_info = utils.team_file_api.get_team_info(user_token) if team_info["id"] not in team_file_api.get_team_white_list(): raise HTTPException(status_code=403, detail="You are not allowed to access this resource.") sub = Submissions( end_date=competition_info.end_date, submission_limit=competition_info.submission_limit, competition_id=COMPETITION_ID, token=HF_TOKEN, competition_type=competition_info.competition_type, hardware=competition_info.hardware, ) try: subs = sub.my_submissions(user_token) except AuthenticationError: return { "response": { "submissions": "", "submission_text": SUBMISSION_TEXT.format(competition_info.submission_limit), "error": "**Invalid token. Please login.**", "team_name": "", } } subs = subs.to_dict(orient="records") error = "" if len(subs) == 0: error = "**You have not made any submissions yet.**" subs = "" submission_text = SUBMISSION_TEXT.format(competition_info.submission_limit) submission_selection_text = SUBMISSION_SELECTION_TEXT.format(competition_info.selection_limit) team_name = team_info["name"] for sub in subs: if sub["error_message"] in {ErrorMessage.RUNTIME_ERROR, ErrorMessage.BUILD_SPACE_FAILED}: sub["log_file_url"] = "/error_log?" + urlencode({"token": error_log_api.generate_log_token(sub["submission_id"])}) else: sub["log_file_url"] = "" resp = { "response": { "submissions": subs, "submission_text": submission_text + submission_selection_text, "error": error, "team_name": team_name, } } return resp @app.post("/new_submission", response_class=JSONResponse) def new_submission( hub_model: str = Form(...), submission_comment: str = Form(None), user_token: str = Depends(utils.user_authentication), ): if submission_comment is None: submission_comment = "" if user_token is None: return {"response": "Invalid token. Please login."} todays_date = datetime.datetime.now() start_date = datetime.datetime.strptime(START_DATE, "%Y-%m-%d") if todays_date < start_date: comp_org = COMPETITION_ID.split("/")[0] if not utils.is_user_admin(user_token, comp_org): return {"response": "Competition has not started yet!"} team_id = team_file_api.get_team_info(user_token)["id"] if team_id not in team_file_api.get_team_white_list(): return {"response": "You are not allowed to make submissions."} team_submission_limit_dict = team_file_api.get_team_submission_limit() lock = FileLock(f"./submission_lock/{team_id}.lock", blocking=False) try: with lock: if submission_api.exists_submission_info(team_id) and submission_api.count_by_status(team_id, [SubmissionStatus.QUEUED, SubmissionStatus.PENDING, SubmissionStatus.PROCESSING]) > 0: return {"response": "Another submission is being processed. Please wait a moment."} competition_info = CompetitionInfo(competition_id=COMPETITION_ID, autotrain_token=HF_TOKEN) if team_id in team_submission_limit_dict.keys(): submission_limit = team_submission_limit_dict[team_id] else: submission_limit = competition_info.submission_limit # submission_limit = competition_info.submission_limit sub = Submissions( end_date=competition_info.end_date, submission_limit=submission_limit, competition_id=COMPETITION_ID, token=HF_TOKEN, competition_type=competition_info.competition_type, hardware=competition_info.hardware, ) if competition_info.competition_type == "script": resp = sub.new_submission(user_token, hub_model, submission_comment) return {"response": f"Success! You have {resp} submissions remaining today."} return {"response": "Invalid competition type"} except RequestException: logger.error("Hugging Face Hub is unreachable, please try again later:", traceback.format_exc()) return {"response": "Hugging Face Hub is unreachable, please try again later"} except AuthenticationError: return {"response": "Invalid token"} except PastDeadlineError: return {"response": "Competition has ended"} except SubmissionError: return {"response": "Invalid submission file"} except SubmissionLimitError: return {"response": "Submission limit reached"} except FileLockTimeout: return {"response": "Another submission is being processed. Please wait a moment."} @app.post("/update_selected_submissions", response_class=JSONResponse) def update_selected_submissions( request: Request, body: UpdateSelectedSubmissionsRequest, user_token: str = Depends(utils.user_authentication) ): submission_ids = body.submission_ids if user_token is None: return {"success": False, "error": "Invalid token, please login."} competition_info = CompetitionInfo(competition_id=COMPETITION_ID, autotrain_token=HF_TOKEN) sub = Submissions( end_date=competition_info.end_date, submission_limit=competition_info.submission_limit, competition_id=COMPETITION_ID, token=HF_TOKEN, competition_type=competition_info.competition_type, hardware=competition_info.hardware, ) submission_ids = submission_ids.split(",") submission_ids = [s.strip() for s in submission_ids] if len(submission_ids) > competition_info.selection_limit: return { "success": False, "error": f"Please select at most {competition_info.selection_limit} submissions.", } sub.update_selected_submissions(user_token=user_token, selected_submission_ids=submission_ids) return {"success": True, "error": ""} @app.post("/update_team_name", response_class=JSONResponse) def update_team_name( request: Request, body: UpdateTeamNameRequest, user_token: str = Depends(utils.user_authentication) ): new_team_name = body.new_team_name if user_token is None: return {"success": False, "error": "Invalid token. Please login."} if str(new_team_name).strip() == "": return {"success": False, "error": "Team name cannot be empty."} try: utils.team_file_api.update_team_name(user_token, new_team_name) return {"success": True, "error": ""} except Exception as e: return {"success": False, "error": str(e)} @app.post("/admin/comp_info", response_class=JSONResponse) def admin_comp_info(request: Request, user_token: str = Depends(utils.user_authentication)): comp_org = COMPETITION_ID.split("/")[0] user_is_admin = utils.is_user_admin(user_token, comp_org) if not user_is_admin: return {"response": "You are not an admin."}, 403 competition_info = CompetitionInfo(competition_id=COMPETITION_ID, autotrain_token=HF_TOKEN) markdowns = { "competition_desc": competition_info.competition_desc, "rules": competition_info.rules, "submission_desc": competition_info.submission_desc, "dataset_desc": competition_info.dataset_desc, } if markdowns["rules"] is None: markdowns["rules"] = "No rules available." config = { "SUBMISSION_LIMIT": competition_info.submission_limit, "SELECTION_LIMIT": competition_info.selection_limit, "END_DATE": competition_info.end_date.strftime("%Y-%m-%d"), "EVAL_HIGHER_IS_BETTER": competition_info.eval_higher_is_better, "SUBMISSION_COLUMNS": competition_info.submission_columns_raw, "SUBMISSION_ID_COLUMN": competition_info.submission_id_col, "LOGO": competition_info.logo_url, "COMPETITION_TYPE": competition_info.competition_type, "EVAL_METRIC": competition_info.metric, "SUBMISSION_ROWS": competition_info.submission_rows, "TIME_LIMIT": competition_info.time_limit, "DATASET": competition_info.dataset, "SUBMISSION_FILENAMES": competition_info.submission_filenames, "SCORING_METRIC": competition_info.scoring_metric, "HARDWARE": competition_info.hardware, } return {"response": {"config": config, "markdowns": markdowns}} @app.post("/admin/update_comp_info", response_class=JSONResponse) def update_comp_info(data: Dict[str, Any] = Body(...), user_token: str = Depends(utils.user_authentication)): comp_org = COMPETITION_ID.split("/")[0] user_is_admin = utils.is_user_admin(user_token, comp_org) if not user_is_admin: return {"response": "You are not an admin."}, 403 competition_info = CompetitionInfo(competition_id=COMPETITION_ID, autotrain_token=HF_TOKEN) config = data["config"] markdowns = data["markdowns"] valid_keys = [ "SUBMISSION_LIMIT", "SELECTION_LIMIT", "END_DATE", "EVAL_HIGHER_IS_BETTER", "SUBMISSION_COLUMNS", "SUBMISSION_ID_COLUMN", "LOGO", "COMPETITION_TYPE", "EVAL_METRIC", "SUBMISSION_ROWS", "TIME_LIMIT", "DATASET", "SUBMISSION_FILENAMES", "SCORING_METRIC", "HARDWARE", ] for key in config: if key not in valid_keys: return {"success": False, "error": f"Invalid key: {key}"} try: competition_info.update_competition_info(config, markdowns, HF_TOKEN) except Exception as e: logger.error(e) return {"success": False}, 500 return {"success": True} @app.get("/register_template_file") def register_example_file(): file_path = os.path.join(BASE_DIR, "static", "Competition Participants Information Collection Template.xlsx") return FileResponse(file_path, media_type="application/octet-stream", filename="Competition Participants Information Collection Template.xlsx") @app.post("/register") def register( request: RegisterRequest, user_token: str = Depends(utils.user_authentication) ): if user_token is None: return {"success": False, "response": "Please login."} team_info = utils.team_file_api.get_team_info(user_token) if team_info is not None: return {"success": False, "response": "You have already registered your team."} utils.team_file_api.create_team( user_token, request.team_name, request.model_dump() ) return {"success": True, "response": "Team created successfully."} @app.post("/update_team_info") def update_team_info( request: RegisterRequest, user_token: str = Depends(utils.user_authentication) ): if user_token is None: return {"success": False, "response": "Please login."} utils.team_file_api.update_team( user_token, request.team_name, request.model_dump() ) return {"success": True, "response": "Update team info success."} @app.get("/error_log") def get_error_log(token: str = Query(..., description="Token to access the error log file.")): try: log_content = error_log_api.get_log_by_token(token) log_file = io.BytesIO(log_content.encode('utf-8')) log_file.seek(0) return StreamingResponse( log_file, media_type="text/plain", headers={"Content-Disposition": "attachment; filename=error_log.txt"} ) except Exception as e: logger.error(f"Error while fetching log file: {e}") raise HTTPException(status_code=500, detail="Internal server error.") @app.get("/register_page", response_class=HTMLResponse) def register_page(request: Request): """ This function is used to render the registration HTML page. """ if HF_TOKEN is None: return HTTPException(status_code=500, detail="HF_TOKEN is not set.") context = { "request": request, "version": __version__, } return templates.TemplateResponse("register_page.html", context) @app.get("/update_team_info_page", response_class=HTMLResponse) def update_team_info_page(request: Request, user_token: str = Depends(utils.user_authentication)): """ This function is used to render the update team info HTML page. """ if user_token is None: return HTTPException(status_code=403, detail="Please login to access this page.") team_info = utils.team_file_api.get_team_info(user_token) if team_info is None: return HTTPException(status_code=403, detail="You have not registered your team yet.") print(team_info) context = { "request": request, "team_info_json_string": json.dumps(team_info), "version": __version__, } return templates.TemplateResponse("update_team_info_page.html", context)