Spaces:
Runtime error
Runtime error
import json | |
import os | |
import requests | |
import sys # Added for command-line arguments | |
from agent import BasicAgent | |
# --- Constants --- | |
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
DOWNLOAD_DIR = "downloaded_task_files" # Directory to save downloaded files | |
QUESTIONS_FILE = "data/questions.json" | |
# QUESTION_INDEX = 3 # Removed global constant | |
def run_single_question_test(question_index: int): # Added question_index parameter | |
""" | |
Fetches a single question by index, downloads its associated file (if any), | |
runs the BasicAgent on it, and prints the answer. | |
""" | |
# Create download directory if it doesn't exist | |
try: | |
os.makedirs(DOWNLOAD_DIR, exist_ok=True) | |
print(f"Ensured download directory exists: {DOWNLOAD_DIR}") | |
except OSError as e: | |
print(f"Error creating download directory {DOWNLOAD_DIR}: {e}") | |
return | |
# 1. Load Questions | |
try: | |
with open(QUESTIONS_FILE, 'r') as f: | |
questions_data = json.load(f) | |
if not questions_data or not isinstance(questions_data, list) or len(questions_data) <= question_index: | |
print(f"Error: Could not load question at index {question_index} (0-indexed) from {QUESTIONS_FILE}. Total questions: {len(questions_data) if isinstance(questions_data, list) else 'N/A'}") | |
return | |
item = questions_data[question_index] | |
print(f"Loaded question {question_index + 1} (from 1-indexed input): {item.get('question')}") | |
except FileNotFoundError: | |
print(f"Error: Questions file not found at {QUESTIONS_FILE}") | |
return | |
except json.JSONDecodeError: | |
print(f"Error: Could not decode JSON from {QUESTIONS_FILE}") | |
return | |
except Exception as e: | |
print(f"Error loading or parsing {QUESTIONS_FILE}: {e}") | |
return | |
# 2. Instantiate Agent | |
try: | |
agent = BasicAgent() | |
print("Agent instantiated successfully.") | |
except Exception as e: | |
print(f"Error instantiating agent: {e}") | |
return | |
task_id = item.get("task_id") | |
question_text = item.get("question") | |
file_name = item.get("file_name") | |
local_file_path = None | |
if not task_id or question_text is None: | |
print(f"Skipping item with missing task_id or question: {item}") | |
return | |
if file_name: | |
download_url = f"{DEFAULT_API_URL}/files/{task_id}" | |
local_file_path = os.path.join(DOWNLOAD_DIR, file_name) | |
print(f"Attempting to download file for task {task_id} (using task_id in URL): {file_name} from {download_url}") | |
try: | |
file_response = requests.get(download_url, stream=True, timeout=30) | |
file_response.raise_for_status() | |
with open(local_file_path, 'wb') as f: | |
for chunk in file_response.iter_content(chunk_size=8192): | |
f.write(chunk) | |
print(f"Successfully downloaded to {local_file_path}") | |
except requests.exceptions.RequestException as e: | |
print(f"Failed to download {file_name}: {e}") | |
local_file_path = None # Ensure agent doesn't get a path to a non-existent/failed file | |
except Exception as e: | |
print(f"An unexpected error occurred downloading {file_name}: {e}") | |
local_file_path = None | |
else: | |
print("No file associated with this question.") | |
# 3. Run Agent on the single question | |
try: | |
print(f"Running agent on question: {question_text}") | |
if local_file_path: | |
print(f"Providing file: {local_file_path}") | |
submitted_answer = agent(question_text, file_path=local_file_path) | |
print(f"\n--- Agent's Answer for Task ID: {task_id} ---") | |
print(submitted_answer) | |
print("--- End of Answer ---") | |
except Exception as e: | |
print(f"Error running agent on task {task_id}: {e}") | |
def run_all_questions_test(): | |
""" | |
Runs the test for all questions in the QUESTIONS_FILE. | |
""" | |
print("Attempting to run tests for all questions...") | |
try: | |
with open(QUESTIONS_FILE, 'r') as f: | |
questions_data = json.load(f) | |
if not questions_data or not isinstance(questions_data, list): | |
print(f"Error: Could not load questions from {QUESTIONS_FILE} or it's not a list.") | |
return | |
num_questions = len(questions_data) | |
if num_questions == 0: | |
print(f"No questions found in {QUESTIONS_FILE}.") | |
return | |
print(f"Found {num_questions} questions to test.") | |
for i in range(num_questions): | |
print(f"\n--- Running Test for Question {i+1}/{num_questions} ---") | |
run_single_question_test(i) | |
print(f"--- Finished Test for Question {i+1}/{num_questions} ---\n") | |
print("All question tests completed.") | |
except FileNotFoundError: | |
print(f"Error: Questions file not found at {QUESTIONS_FILE}") | |
except json.JSONDecodeError: | |
print(f"Error: Could not decode JSON from {QUESTIONS_FILE}") | |
except Exception as e: | |
print(f"An unexpected error occurred while running all questions: {e}") | |
if __name__ == "__main__": | |
if len(sys.argv) < 2: | |
print("Usage: python test_single_q.py <question_number | 'all'>") | |
print("Example (single question): python test_single_q.py 4") | |
print("Example (all questions): python test_single_q.py all") | |
run_all_questions_test() # Default to running all if no argument is provided | |
sys.exit(0) | |
argument = sys.argv[1] | |
if argument.lower() == "all": | |
run_all_questions_test() | |
else: | |
try: | |
question_number_arg = int(argument) | |
if question_number_arg <= 0: | |
print("Error: Question number must be a positive integer.") | |
sys.exit(1) | |
# Convert 1-indexed input to 0-indexed for list access | |
question_idx_0_based = question_number_arg - 1 | |
run_single_question_test(question_idx_0_based) | |
except ValueError: | |
print("Error: Invalid argument. Please enter an integer for a specific question or 'all'.") | |
sys.exit(1) | |