Spaces:
Runtime error
Runtime error
# evaluator.py | |
from settings import Settings | |
from typing import List | |
from models import Question, QuestionAnswerPair, Results | |
import requests | |
import random | |
import json | |
import logging | |
logger = logging.getLogger(__name__) | |
class Evaluator: | |
def __init__(self, settings: Settings): | |
self.settings = settings | |
def get_questions(self) -> list[Question]: | |
""" | |
Get the questions from the HuggingFace endpoint. | |
Returns: | |
list[Question]: A list of Question objects | |
""" | |
url = str(self.settings.scoring_api_base_url) + "/questions" # Ensure trailing slash for URL | |
try: | |
response = requests.get(url, timeout=10) | |
response.raise_for_status() | |
questions = [Question(**question) for question in response.json()] | |
# Save questions to a local file for fallback/debugging | |
with open("questions.json", "w") as f: | |
json.dump([question.model_dump() | |
for question in questions], f, indent=4) | |
logger.info(f"Successfully fetched {len(questions)} questions from API.") | |
except requests.exceptions.RequestException as e: | |
logger.warning(f"Error fetching questions from API ({e}). Attempting to read from local file 'questions.json'.") | |
try: | |
# Read local file instead, dealing with API rate limits, etc. | |
with open("questions.json", "r") as f: | |
questions = [Question(**question) for question in json.load(f)] | |
logger.info(f"Successfully loaded {len(questions)} questions from local file.") | |
except FileNotFoundError: | |
logger.error("Local 'questions.json' not found. Cannot retrieve questions.") | |
questions = [] | |
except json.JSONDecodeError as json_e: | |
logger.error(f"Error decoding local 'questions.json': {json_e}") | |
questions = [] | |
return questions | |
def get_one_question(self, task_id=None) -> Question: | |
""" | |
Get a random, or requested question from the HuggingFace endpoint. | |
Falls back to local file if API fails. | |
Returns: | |
Question: A Question object | |
""" | |
if task_id: | |
questions = self.get_questions() # Get all questions to find specific task_id | |
for question in questions: | |
if question.task_id == task_id: | |
logger.info(f"Found specific question for task_id: {task_id}") | |
return question | |
logger.warning(f"Task ID {task_id} not found in fetched questions. Attempting random.") | |
# Try fetching a random question from the API | |
try: | |
url = str(self.settings.scoring_api_base_url) + "/random-question" # Ensure trailing slash | |
response = requests.get(url, timeout=10) | |
response.raise_for_status() | |
question = Question(**response.json()) | |
logger.info(f"Successfully fetched random question from API: {question.task_id}") | |
return question | |
except requests.exceptions.RequestException as e: | |
logger.warning(f"Error fetching random question from API ({e}). Attempting to get random from local file.") | |
# Fallback to local file | |
questions = self.get_questions() | |
if questions: | |
random_question = questions[random.randint(0, len(questions)-1)] | |
logger.info(f"Loaded random question from local file: {random_question.task_id}") | |
return random_question | |
else: | |
logger.error("No questions available from API or local file.") | |
raise ValueError("No questions available to retrieve.") | |
def _read_answer_file(self, username: str) -> List[QuestionAnswerPair]: | |
"""Read the question answer pairs from a user-specific answer file.""" | |
file_name = f"answers_{username}.json" | |
if not os.path.exists(file_name): | |
raise FileNotFoundError(f"Answer file '{file_name}' not found for user '{username}'.") | |
with open(file_name, "r") as f: | |
pairs = [QuestionAnswerPair(**pair) for pair in json.load(f)] | |
return pairs | |
def submit_answers(self, username: str) -> str: | |
"""Submits saved answers to the scoring endpoint and returns the result.""" | |
try: | |
pairs_to_submit = self._read_answer_file(username) | |
answers_payload = [pair.get_answer() for pair in pairs_to_submit] | |
except FileNotFoundError as e: | |
logger.error(e) | |
return "Click 'Get One Answer' or 'Get All Answers' to run agent before trying to submit." | |
except Exception as e: | |
logger.error(f"Error reading local answer file: {e}") | |
return f"Error preparing answers for submission: {e}" | |
agent_code = f"https://huggingface.co/spaces/{self.settings.space_id}/tree/main" | |
submission_data = { | |
"username": self.settings.username, # Use username from settings for submission | |
"code_link": agent_code, # Changed from agent_code to code_link as per GAIA PDF | |
"answers": answers_payload | |
} | |
submit_url = str(self.settings.scoring_api_base_url) + "/submit" # Ensure trailing slash | |
logger.info(f"Submitting {len(answers_payload)} answers to: {submit_url}") | |
try: | |
response = requests.post( | |
submit_url, json=submission_data, timeout=60) | |
response.raise_for_status() | |
results = Results.model_validate(response.json()) | |
logger.info( | |
f"Submission successful.\n" | |
f"User: {results.username}.\n" | |
f"Overall Score: {results.score}%.\n" | |
f"Correct Count: {results.correct_count}.\n" | |
f"Total Attempted: {results.total_attempted}.\n" | |
f"Message: {results.message}.\n" | |
f"Timestamp: {results.timestamp}.\n" | |
) | |
status_message = ( | |
f"Submission Successful!\n" | |
f"User: {results.username}\n" | |
f"Overall Score: {results.score}% " | |
f"({results.correct_count}/{results.total_attempted} correct)\n" | |
f"Message: {results.message}" | |
) | |
return status_message | |
except requests.exceptions.HTTPError as e: | |
error_detail = f"Server responded with status {e.response.status_code}." | |
try: | |
error_json = e.response.json() | |
error_detail += f" Detail: {error_json.get('detail', e.response.text)}" | |
except requests.exceptions.JSONDecodeError: | |
error_detail += f" Response: {e.response.text[:500]}" | |
status_message = f"Submission Failed: {error_detail}" | |
logger.error(status_message) # Changed to error for HTTP errors | |
return status_message | |
except requests.exceptions.Timeout: | |
status_message = "Submission Failed: The request timed out." | |
logger.error(status_message) # Changed to error | |
return status_message | |
except requests.exceptions.RequestException as e: | |
status_message = f"Submission Failed: Network error - {e}" | |
logger.error(status_message) # Changed to error | |
return status_message | |
except Exception as e: | |
status_message = f"An unexpected error occurred during submission: {e}" | |
logger.error(status_message) # Changed to error | |
return status_message | |