Spaces:
Runtime error
Runtime error
File size: 7,630 Bytes
09f04d8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
# evaluator.py
from settings import Settings
from typing import List
from models import Question, QuestionAnswerPair, Results
import requests
import random
import json
import logging
logger = logging.getLogger(__name__)
class Evaluator:
def __init__(self, settings: Settings):
self.settings = settings
def get_questions(self) -> list[Question]:
"""
Get the questions from the HuggingFace endpoint.
Returns:
list[Question]: A list of Question objects
"""
url = str(self.settings.scoring_api_base_url) + "/questions" # Ensure trailing slash for URL
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
questions = [Question(**question) for question in response.json()]
# Save questions to a local file for fallback/debugging
with open("questions.json", "w") as f:
json.dump([question.model_dump()
for question in questions], f, indent=4)
logger.info(f"Successfully fetched {len(questions)} questions from API.")
except requests.exceptions.RequestException as e:
logger.warning(f"Error fetching questions from API ({e}). Attempting to read from local file 'questions.json'.")
try:
# Read local file instead, dealing with API rate limits, etc.
with open("questions.json", "r") as f:
questions = [Question(**question) for question in json.load(f)]
logger.info(f"Successfully loaded {len(questions)} questions from local file.")
except FileNotFoundError:
logger.error("Local 'questions.json' not found. Cannot retrieve questions.")
questions = []
except json.JSONDecodeError as json_e:
logger.error(f"Error decoding local 'questions.json': {json_e}")
questions = []
return questions
def get_one_question(self, task_id=None) -> Question:
"""
Get a random, or requested question from the HuggingFace endpoint.
Falls back to local file if API fails.
Returns:
Question: A Question object
"""
if task_id:
questions = self.get_questions() # Get all questions to find specific task_id
for question in questions:
if question.task_id == task_id:
logger.info(f"Found specific question for task_id: {task_id}")
return question
logger.warning(f"Task ID {task_id} not found in fetched questions. Attempting random.")
# Try fetching a random question from the API
try:
url = str(self.settings.scoring_api_base_url) + "/random-question" # Ensure trailing slash
response = requests.get(url, timeout=10)
response.raise_for_status()
question = Question(**response.json())
logger.info(f"Successfully fetched random question from API: {question.task_id}")
return question
except requests.exceptions.RequestException as e:
logger.warning(f"Error fetching random question from API ({e}). Attempting to get random from local file.")
# Fallback to local file
questions = self.get_questions()
if questions:
random_question = questions[random.randint(0, len(questions)-1)]
logger.info(f"Loaded random question from local file: {random_question.task_id}")
return random_question
else:
logger.error("No questions available from API or local file.")
raise ValueError("No questions available to retrieve.")
def _read_answer_file(self, username: str) -> List[QuestionAnswerPair]:
"""Read the question answer pairs from a user-specific answer file."""
file_name = f"answers_{username}.json"
if not os.path.exists(file_name):
raise FileNotFoundError(f"Answer file '{file_name}' not found for user '{username}'.")
with open(file_name, "r") as f:
pairs = [QuestionAnswerPair(**pair) for pair in json.load(f)]
return pairs
def submit_answers(self, username: str) -> str:
"""Submits saved answers to the scoring endpoint and returns the result."""
try:
pairs_to_submit = self._read_answer_file(username)
answers_payload = [pair.get_answer() for pair in pairs_to_submit]
except FileNotFoundError as e:
logger.error(e)
return "Click 'Get One Answer' or 'Get All Answers' to run agent before trying to submit."
except Exception as e:
logger.error(f"Error reading local answer file: {e}")
return f"Error preparing answers for submission: {e}"
agent_code = f"https://huggingface.co/spaces/{self.settings.space_id}/tree/main"
submission_data = {
"username": self.settings.username, # Use username from settings for submission
"code_link": agent_code, # Changed from agent_code to code_link as per GAIA PDF
"answers": answers_payload
}
submit_url = str(self.settings.scoring_api_base_url) + "/submit" # Ensure trailing slash
logger.info(f"Submitting {len(answers_payload)} answers to: {submit_url}")
try:
response = requests.post(
submit_url, json=submission_data, timeout=60)
response.raise_for_status()
results = Results.model_validate(response.json())
logger.info(
f"Submission successful.\n"
f"User: {results.username}.\n"
f"Overall Score: {results.score}%.\n"
f"Correct Count: {results.correct_count}.\n"
f"Total Attempted: {results.total_attempted}.\n"
f"Message: {results.message}.\n"
f"Timestamp: {results.timestamp}.\n"
)
status_message = (
f"Submission Successful!\n"
f"User: {results.username}\n"
f"Overall Score: {results.score}% "
f"({results.correct_count}/{results.total_attempted} correct)\n"
f"Message: {results.message}"
)
return status_message
except requests.exceptions.HTTPError as e:
error_detail = f"Server responded with status {e.response.status_code}."
try:
error_json = e.response.json()
error_detail += f" Detail: {error_json.get('detail', e.response.text)}"
except requests.exceptions.JSONDecodeError:
error_detail += f" Response: {e.response.text[:500]}"
status_message = f"Submission Failed: {error_detail}"
logger.error(status_message) # Changed to error for HTTP errors
return status_message
except requests.exceptions.Timeout:
status_message = "Submission Failed: The request timed out."
logger.error(status_message) # Changed to error
return status_message
except requests.exceptions.RequestException as e:
status_message = f"Submission Failed: Network error - {e}"
logger.error(status_message) # Changed to error
return status_message
except Exception as e:
status_message = f"An unexpected error occurred during submission: {e}"
logger.error(status_message) # Changed to error
return status_message
|