Agent_Final_Assignment / evaluator.py
hassenhamdi's picture
Create evaluator.py
09f04d8 verified
# evaluator.py
from settings import Settings
from typing import List
from models import Question, QuestionAnswerPair, Results
import requests
import random
import json
import logging
logger = logging.getLogger(__name__)
class Evaluator:
def __init__(self, settings: Settings):
self.settings = settings
def get_questions(self) -> list[Question]:
"""
Get the questions from the HuggingFace endpoint.
Returns:
list[Question]: A list of Question objects
"""
url = str(self.settings.scoring_api_base_url) + "/questions" # Ensure trailing slash for URL
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
questions = [Question(**question) for question in response.json()]
# Save questions to a local file for fallback/debugging
with open("questions.json", "w") as f:
json.dump([question.model_dump()
for question in questions], f, indent=4)
logger.info(f"Successfully fetched {len(questions)} questions from API.")
except requests.exceptions.RequestException as e:
logger.warning(f"Error fetching questions from API ({e}). Attempting to read from local file 'questions.json'.")
try:
# Read local file instead, dealing with API rate limits, etc.
with open("questions.json", "r") as f:
questions = [Question(**question) for question in json.load(f)]
logger.info(f"Successfully loaded {len(questions)} questions from local file.")
except FileNotFoundError:
logger.error("Local 'questions.json' not found. Cannot retrieve questions.")
questions = []
except json.JSONDecodeError as json_e:
logger.error(f"Error decoding local 'questions.json': {json_e}")
questions = []
return questions
def get_one_question(self, task_id=None) -> Question:
"""
Get a random, or requested question from the HuggingFace endpoint.
Falls back to local file if API fails.
Returns:
Question: A Question object
"""
if task_id:
questions = self.get_questions() # Get all questions to find specific task_id
for question in questions:
if question.task_id == task_id:
logger.info(f"Found specific question for task_id: {task_id}")
return question
logger.warning(f"Task ID {task_id} not found in fetched questions. Attempting random.")
# Try fetching a random question from the API
try:
url = str(self.settings.scoring_api_base_url) + "/random-question" # Ensure trailing slash
response = requests.get(url, timeout=10)
response.raise_for_status()
question = Question(**response.json())
logger.info(f"Successfully fetched random question from API: {question.task_id}")
return question
except requests.exceptions.RequestException as e:
logger.warning(f"Error fetching random question from API ({e}). Attempting to get random from local file.")
# Fallback to local file
questions = self.get_questions()
if questions:
random_question = questions[random.randint(0, len(questions)-1)]
logger.info(f"Loaded random question from local file: {random_question.task_id}")
return random_question
else:
logger.error("No questions available from API or local file.")
raise ValueError("No questions available to retrieve.")
def _read_answer_file(self, username: str) -> List[QuestionAnswerPair]:
"""Read the question answer pairs from a user-specific answer file."""
file_name = f"answers_{username}.json"
if not os.path.exists(file_name):
raise FileNotFoundError(f"Answer file '{file_name}' not found for user '{username}'.")
with open(file_name, "r") as f:
pairs = [QuestionAnswerPair(**pair) for pair in json.load(f)]
return pairs
def submit_answers(self, username: str) -> str:
"""Submits saved answers to the scoring endpoint and returns the result."""
try:
pairs_to_submit = self._read_answer_file(username)
answers_payload = [pair.get_answer() for pair in pairs_to_submit]
except FileNotFoundError as e:
logger.error(e)
return "Click 'Get One Answer' or 'Get All Answers' to run agent before trying to submit."
except Exception as e:
logger.error(f"Error reading local answer file: {e}")
return f"Error preparing answers for submission: {e}"
agent_code = f"https://huggingface.co/spaces/{self.settings.space_id}/tree/main"
submission_data = {
"username": self.settings.username, # Use username from settings for submission
"code_link": agent_code, # Changed from agent_code to code_link as per GAIA PDF
"answers": answers_payload
}
submit_url = str(self.settings.scoring_api_base_url) + "/submit" # Ensure trailing slash
logger.info(f"Submitting {len(answers_payload)} answers to: {submit_url}")
try:
response = requests.post(
submit_url, json=submission_data, timeout=60)
response.raise_for_status()
results = Results.model_validate(response.json())
logger.info(
f"Submission successful.\n"
f"User: {results.username}.\n"
f"Overall Score: {results.score}%.\n"
f"Correct Count: {results.correct_count}.\n"
f"Total Attempted: {results.total_attempted}.\n"
f"Message: {results.message}.\n"
f"Timestamp: {results.timestamp}.\n"
)
status_message = (
f"Submission Successful!\n"
f"User: {results.username}\n"
f"Overall Score: {results.score}% "
f"({results.correct_count}/{results.total_attempted} correct)\n"
f"Message: {results.message}"
)
return status_message
except requests.exceptions.HTTPError as e:
error_detail = f"Server responded with status {e.response.status_code}."
try:
error_json = e.response.json()
error_detail += f" Detail: {error_json.get('detail', e.response.text)}"
except requests.exceptions.JSONDecodeError:
error_detail += f" Response: {e.response.text[:500]}"
status_message = f"Submission Failed: {error_detail}"
logger.error(status_message) # Changed to error for HTTP errors
return status_message
except requests.exceptions.Timeout:
status_message = "Submission Failed: The request timed out."
logger.error(status_message) # Changed to error
return status_message
except requests.exceptions.RequestException as e:
status_message = f"Submission Failed: Network error - {e}"
logger.error(status_message) # Changed to error
return status_message
except Exception as e:
status_message = f"An unexpected error occurred during submission: {e}"
logger.error(status_message) # Changed to error
return status_message