# evaluator.py from settings import Settings from typing import List from models import Question, QuestionAnswerPair, Results import requests import random import json import logging logger = logging.getLogger(__name__) class Evaluator: def __init__(self, settings: Settings): self.settings = settings def get_questions(self) -> list[Question]: """ Get the questions from the HuggingFace endpoint. Returns: list[Question]: A list of Question objects """ url = str(self.settings.scoring_api_base_url) + "/questions" # Ensure trailing slash for URL try: response = requests.get(url, timeout=10) response.raise_for_status() questions = [Question(**question) for question in response.json()] # Save questions to a local file for fallback/debugging with open("questions.json", "w") as f: json.dump([question.model_dump() for question in questions], f, indent=4) logger.info(f"Successfully fetched {len(questions)} questions from API.") except requests.exceptions.RequestException as e: logger.warning(f"Error fetching questions from API ({e}). Attempting to read from local file 'questions.json'.") try: # Read local file instead, dealing with API rate limits, etc. with open("questions.json", "r") as f: questions = [Question(**question) for question in json.load(f)] logger.info(f"Successfully loaded {len(questions)} questions from local file.") except FileNotFoundError: logger.error("Local 'questions.json' not found. Cannot retrieve questions.") questions = [] except json.JSONDecodeError as json_e: logger.error(f"Error decoding local 'questions.json': {json_e}") questions = [] return questions def get_one_question(self, task_id=None) -> Question: """ Get a random, or requested question from the HuggingFace endpoint. Falls back to local file if API fails. Returns: Question: A Question object """ if task_id: questions = self.get_questions() # Get all questions to find specific task_id for question in questions: if question.task_id == task_id: logger.info(f"Found specific question for task_id: {task_id}") return question logger.warning(f"Task ID {task_id} not found in fetched questions. Attempting random.") # Try fetching a random question from the API try: url = str(self.settings.scoring_api_base_url) + "/random-question" # Ensure trailing slash response = requests.get(url, timeout=10) response.raise_for_status() question = Question(**response.json()) logger.info(f"Successfully fetched random question from API: {question.task_id}") return question except requests.exceptions.RequestException as e: logger.warning(f"Error fetching random question from API ({e}). Attempting to get random from local file.") # Fallback to local file questions = self.get_questions() if questions: random_question = questions[random.randint(0, len(questions)-1)] logger.info(f"Loaded random question from local file: {random_question.task_id}") return random_question else: logger.error("No questions available from API or local file.") raise ValueError("No questions available to retrieve.") def _read_answer_file(self, username: str) -> List[QuestionAnswerPair]: """Read the question answer pairs from a user-specific answer file.""" file_name = f"answers_{username}.json" if not os.path.exists(file_name): raise FileNotFoundError(f"Answer file '{file_name}' not found for user '{username}'.") with open(file_name, "r") as f: pairs = [QuestionAnswerPair(**pair) for pair in json.load(f)] return pairs def submit_answers(self, username: str) -> str: """Submits saved answers to the scoring endpoint and returns the result.""" try: pairs_to_submit = self._read_answer_file(username) answers_payload = [pair.get_answer() for pair in pairs_to_submit] except FileNotFoundError as e: logger.error(e) return "Click 'Get One Answer' or 'Get All Answers' to run agent before trying to submit." except Exception as e: logger.error(f"Error reading local answer file: {e}") return f"Error preparing answers for submission: {e}" agent_code = f"https://huggingface.co/spaces/{self.settings.space_id}/tree/main" submission_data = { "username": self.settings.username, # Use username from settings for submission "code_link": agent_code, # Changed from agent_code to code_link as per GAIA PDF "answers": answers_payload } submit_url = str(self.settings.scoring_api_base_url) + "/submit" # Ensure trailing slash logger.info(f"Submitting {len(answers_payload)} answers to: {submit_url}") try: response = requests.post( submit_url, json=submission_data, timeout=60) response.raise_for_status() results = Results.model_validate(response.json()) logger.info( f"Submission successful.\n" f"User: {results.username}.\n" f"Overall Score: {results.score}%.\n" f"Correct Count: {results.correct_count}.\n" f"Total Attempted: {results.total_attempted}.\n" f"Message: {results.message}.\n" f"Timestamp: {results.timestamp}.\n" ) status_message = ( f"Submission Successful!\n" f"User: {results.username}\n" f"Overall Score: {results.score}% " f"({results.correct_count}/{results.total_attempted} correct)\n" f"Message: {results.message}" ) return status_message except requests.exceptions.HTTPError as e: error_detail = f"Server responded with status {e.response.status_code}." try: error_json = e.response.json() error_detail += f" Detail: {error_json.get('detail', e.response.text)}" except requests.exceptions.JSONDecodeError: error_detail += f" Response: {e.response.text[:500]}" status_message = f"Submission Failed: {error_detail}" logger.error(status_message) # Changed to error for HTTP errors return status_message except requests.exceptions.Timeout: status_message = "Submission Failed: The request timed out." logger.error(status_message) # Changed to error return status_message except requests.exceptions.RequestException as e: status_message = f"Submission Failed: Network error - {e}" logger.error(status_message) # Changed to error return status_message except Exception as e: status_message = f"An unexpected error occurred during submission: {e}" logger.error(status_message) # Changed to error return status_message