Agents_Course_Final_Assignment / test_single_q.py
sanjeed5's picture
Update dependencies in pyproject.toml and requirements.txt, and add test_single_q.py for running single and all question tests. Introduce tools.py for Perplexity API integration with enhanced error handling and logging.
555dbe6
import json
import os
import requests
import sys # Added for command-line arguments
from agent import BasicAgent
# --- Constants ---
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
DOWNLOAD_DIR = "downloaded_task_files" # Directory to save downloaded files
QUESTIONS_FILE = "data/questions.json"
# QUESTION_INDEX = 3 # Removed global constant
def run_single_question_test(question_index: int): # Added question_index parameter
"""
Fetches a single question by index, downloads its associated file (if any),
runs the BasicAgent on it, and prints the answer.
"""
# Create download directory if it doesn't exist
try:
os.makedirs(DOWNLOAD_DIR, exist_ok=True)
print(f"Ensured download directory exists: {DOWNLOAD_DIR}")
except OSError as e:
print(f"Error creating download directory {DOWNLOAD_DIR}: {e}")
return
# 1. Load Questions
try:
with open(QUESTIONS_FILE, 'r') as f:
questions_data = json.load(f)
if not questions_data or not isinstance(questions_data, list) or len(questions_data) <= question_index:
print(f"Error: Could not load question at index {question_index} (0-indexed) from {QUESTIONS_FILE}. Total questions: {len(questions_data) if isinstance(questions_data, list) else 'N/A'}")
return
item = questions_data[question_index]
print(f"Loaded question {question_index + 1} (from 1-indexed input): {item.get('question')}")
except FileNotFoundError:
print(f"Error: Questions file not found at {QUESTIONS_FILE}")
return
except json.JSONDecodeError:
print(f"Error: Could not decode JSON from {QUESTIONS_FILE}")
return
except Exception as e:
print(f"Error loading or parsing {QUESTIONS_FILE}: {e}")
return
# 2. Instantiate Agent
try:
agent = BasicAgent()
print("Agent instantiated successfully.")
except Exception as e:
print(f"Error instantiating agent: {e}")
return
task_id = item.get("task_id")
question_text = item.get("question")
file_name = item.get("file_name")
local_file_path = None
if not task_id or question_text is None:
print(f"Skipping item with missing task_id or question: {item}")
return
if file_name:
download_url = f"{DEFAULT_API_URL}/files/{task_id}"
local_file_path = os.path.join(DOWNLOAD_DIR, file_name)
print(f"Attempting to download file for task {task_id} (using task_id in URL): {file_name} from {download_url}")
try:
file_response = requests.get(download_url, stream=True, timeout=30)
file_response.raise_for_status()
with open(local_file_path, 'wb') as f:
for chunk in file_response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"Successfully downloaded to {local_file_path}")
except requests.exceptions.RequestException as e:
print(f"Failed to download {file_name}: {e}")
local_file_path = None # Ensure agent doesn't get a path to a non-existent/failed file
except Exception as e:
print(f"An unexpected error occurred downloading {file_name}: {e}")
local_file_path = None
else:
print("No file associated with this question.")
# 3. Run Agent on the single question
try:
print(f"Running agent on question: {question_text}")
if local_file_path:
print(f"Providing file: {local_file_path}")
submitted_answer = agent(question_text, file_path=local_file_path)
print(f"\n--- Agent's Answer for Task ID: {task_id} ---")
print(submitted_answer)
print("--- End of Answer ---")
except Exception as e:
print(f"Error running agent on task {task_id}: {e}")
def run_all_questions_test():
"""
Runs the test for all questions in the QUESTIONS_FILE.
"""
print("Attempting to run tests for all questions...")
try:
with open(QUESTIONS_FILE, 'r') as f:
questions_data = json.load(f)
if not questions_data or not isinstance(questions_data, list):
print(f"Error: Could not load questions from {QUESTIONS_FILE} or it's not a list.")
return
num_questions = len(questions_data)
if num_questions == 0:
print(f"No questions found in {QUESTIONS_FILE}.")
return
print(f"Found {num_questions} questions to test.")
for i in range(num_questions):
print(f"\n--- Running Test for Question {i+1}/{num_questions} ---")
run_single_question_test(i)
print(f"--- Finished Test for Question {i+1}/{num_questions} ---\n")
print("All question tests completed.")
except FileNotFoundError:
print(f"Error: Questions file not found at {QUESTIONS_FILE}")
except json.JSONDecodeError:
print(f"Error: Could not decode JSON from {QUESTIONS_FILE}")
except Exception as e:
print(f"An unexpected error occurred while running all questions: {e}")
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: python test_single_q.py <question_number | 'all'>")
print("Example (single question): python test_single_q.py 4")
print("Example (all questions): python test_single_q.py all")
run_all_questions_test() # Default to running all if no argument is provided
sys.exit(0)
argument = sys.argv[1]
if argument.lower() == "all":
run_all_questions_test()
else:
try:
question_number_arg = int(argument)
if question_number_arg <= 0:
print("Error: Question number must be a positive integer.")
sys.exit(1)
# Convert 1-indexed input to 0-indexed for list access
question_idx_0_based = question_number_arg - 1
run_single_question_test(question_idx_0_based)
except ValueError:
print("Error: Invalid argument. Please enter an integer for a specific question or 'all'.")
sys.exit(1)