Yago Bolivar
feat: add GAIA Agent and local testing scripts, including setup and requirements for development
2abc50d
raw
history blame
24.9 kB
# /Users/yagoairm2/Desktop/agents/final project/HF_Agents_Final_Project/app2.py
import os
import gradio as gr
import requests
import pandas as pd
import json
from typing import Dict, List, Optional, Union, Any
import re
from dataclasses import dataclass
from abc import ABC, abstractmethod
import time
import logging
from dotenv import load_dotenv
import tempfile
import io
import sys
import contextlib
from urllib.parse import urlparse
from pathlib import Path
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
# --- Constants ---
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
DEFAULT_FILES_DIR = "dataset"
SYSTEM_PROMPT = """
You are a general AI assistant. I will ask you a question. Report your thoughts, and finish your answer with the following template: FINAL ANSWER: [YOUR FINAL ANSWER]. YOUR FINAL ANSWER should be a number OR as few words as possible OR a comma separated list of numbers and/or strings. If you are asked for a number, don't use comma to write your number neither use units such as $ or percent sign unless specified otherwise. If you are asked for a string, don't use articles, neither abbreviations (e.g. for cities), and write the digits in plain text unless specified otherwise. If you are asked for a comma separated list, apply the above rules depending of whether the element to be put in the list is a number or a string.
"""
# --- Tool Interface ---
class Tool(ABC):
"""Base class for all tools that agent can use."""
name: str
description: str
@abstractmethod
def run(self, **kwargs) -> Dict[str, Any]:
"""Execute the tool with the provided arguments."""
pass
# --- Tools Implementation ---
class WebSearchTool(Tool):
"""Tool for performing web searches."""
name = "web_search"
description = "Search the web for information about a topic."
def __init__(self):
# Initialize any search API clients or session objects here
pass
def run(self, query: str) -> Dict[str, Any]:
"""
Perform a web search with the given query.
Args:
query: The search query
Returns:
Dict with search results
"""
# In a real implementation, this would use a search API
logger.info(f"WebSearchTool: Searching for '{query}'")
# Mock implementation - would be replaced with real search API
# You'd implement this with a proper search API like SerpAPI, Google Custom Search, etc.
time.sleep(1) # Simulate network delay
return {
"status": "success",
"results": [
{"title": f"Mock result for {query}", "snippet": "This is a placeholder for search results.", "url": "https://example.com"}
]
}
class FileReaderTool(Tool):
"""Tool for reading and processing different types of files."""
name = "file_reader"
description = "Read and process files of various formats."
def __init__(self, files_dir: str = DEFAULT_FILES_DIR):
self.files_dir = files_dir
def run(self, task_id: str, file_name: str) -> Dict[str, Any]:
"""
Read and process a file associated with a task.
Args:
task_id: The task identifier
file_name: Name of the file to process
Returns:
Dict with file content or error message
"""
try:
# First, try to find the file locally
file_path = os.path.join(self.files_dir, task_id, file_name)
if not os.path.exists(file_path):
# If file doesn't exist locally, try to download it
file_path = self._download_file(task_id, file_name)
# Process the file based on its extension
file_ext = os.path.splitext(file_name)[1].lower()
if file_ext in ['.txt', '.md', '.py', '.json', '.csv']:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
return {"status": "success", "content": content, "file_type": "text"}
elif file_ext in ['.png', '.jpg', '.jpeg']:
# For images, we'd use a vision model in the full implementation
return {"status": "success", "content": f"Image file: {file_path}", "file_type": "image"}
elif file_ext in ['.mp3', '.wav', '.ogg']:
# For audio, we'd use a speech-to-text model in the full implementation
return {"status": "success", "content": f"Audio file: {file_path}", "file_type": "audio"}
elif file_ext in ['.xlsx', '.xls']:
# For Excel files, we'd use pandas in the full implementation
return {"status": "success", "content": f"Excel file: {file_path}", "file_type": "spreadsheet"}
else:
return {"status": "error", "error": f"Unsupported file type: {file_ext}"}
except Exception as e:
logger.error(f"Error processing file {file_name}: {e}")
return {"status": "error", "error": str(e)}
def _download_file(self, task_id: str, file_name: str) -> str:
"""Download a file from the API and save it locally."""
api_url = f"{DEFAULT_API_URL}/files/{task_id}"
logger.info(f"Downloading file for task {task_id}")
response = requests.get(api_url, timeout=30)
if response.status_code != 200:
raise Exception(f"Failed to download file: {response.status_code}")
# Create directory if it doesn't exist
os.makedirs(os.path.join(self.files_dir, task_id), exist_ok=True)
# Save file
file_path = os.path.join(self.files_dir, task_id, file_name)
with open(file_path, 'wb') as f:
f.write(response.content)
logger.info(f"File saved to {file_path}")
return file_path
class CodeInterpreterTool(Tool):
"""Tool for executing Python code safely."""
name = "code_interpreter"
description = "Execute Python code and return the result."
def run(self, code: str) -> Dict[str, Any]:
"""
Execute Python code and capture output.
Args:
code: The Python code to execute
Returns:
Dict with execution results
"""
logger.info("Running code interpreter")
output = io.StringIO()
error = io.StringIO()
try:
# Capture stdout and stderr
with contextlib.redirect_stdout(output), contextlib.redirect_stderr(error):
# Execute the code in a restricted environment
exec_globals = {"__builtins__": {}}
# Add safe modules to globals
for safe_module in ["math", "random", "datetime", "re"]:
try:
exec_globals[safe_module] = __import__(safe_module)
except ImportError:
pass
# Execute the code
exec(code, exec_globals)
return {
"status": "success",
"stdout": output.getvalue(),
"stderr": error.getvalue()
}
except Exception as e:
return {
"status": "error",
"error": str(e),
"stdout": output.getvalue(),
"stderr": error.getvalue()
}
# --- LLM Interaction Module ---
class LLMModule:
"""Module for interacting with an LLM."""
def __init__(self, model_name: str = "Meta-Llama-3-8B-Instruct.Q4_0.gguf"):
"""Initialize the LLM module with a specified model."""
self.model_name = model_name
try:
from gpt4all import GPT4All
logger.info(f"Initializing GPT4All model: {model_name}")
self.model = GPT4All(model_name, allow_download=True)
logger.info("GPT4All model initialized successfully")
self.use_mock = False
except Exception as e:
logger.warning(f"Failed to initialize GPT4All model: {e}")
logger.warning("Using mock responses instead")
self.use_mock = True
def generate(self, prompt: str, system_prompt: str = None) -> str:
"""
Generate text using the LLM.
Args:
prompt: The user prompt
system_prompt: Optional system prompt
Returns:
Generated text
"""
logger.info(f"LLM: Generating response for prompt (first 50 chars): {prompt[:50]}...")
if self.use_mock:
# Fall back to mock response if model initialization failed
logger.warning("Using mock response")
response = f"This is a mock LLM response. I'm simulating thinking about: {prompt[:30]}...\n\nFINAL ANSWER: Mock answer"
return response
try:
# Combine system prompt and user prompt if system prompt is provided
full_prompt = prompt
if system_prompt:
full_prompt = f"{system_prompt}\n\n{prompt}"
# Generate response using GPT4All
with self.model.chat_session():
response = self.model.generate(full_prompt, max_tokens=1024, temp=0.7)
logger.info(f"LLM response (first 50 chars): {response[:50]}...")
return response
except Exception as e:
logger.error(f"Error generating response: {e}")
# Fall back to mock response if generation fails
response = f"Error generating LLM response. Falling back to mock response.\n\nFINAL ANSWER: Error occurred"
return response
def extract_final_answer(self, text: str) -> str:
"""Extract the final answer from LLM output using regex."""
match = re.search(r"FINAL ANSWER:\s*(.*?)(?:\n|$)", text, re.IGNORECASE)
if match:
return match.group(1).strip()
return text.strip()
# --- GAIA Agent Implementation ---
class GAIAAgent:
"""
Agent designed to answer questions from the GAIA benchmark.
This agent analyzes questions, selects appropriate tools, and generates answers.
"""
def __init__(self):
"""Initialize the GAIA agent with its tools and LLM."""
logger.info("Initializing GAIA Agent")
# Initialize LLM
self.llm = LLMModule()
# Initialize tools
self.tools = {
"web_search": WebSearchTool(),
"file_reader": FileReaderTool(),
"code_interpreter": CodeInterpreterTool()
}
def __call__(self, question: str) -> str:
"""
Answer a question using the agent's tools and reasoning capabilities.
Args:
question: The question to answer
Returns:
The agent's answer
"""
logger.info(f"Agent received question: {question[:100]}...")
# Step 1: Analyze the question to determine the approach
plan = self._plan_approach(question)
# Step 2: Execute the plan using tools if needed
tool_results = self._execute_plan(plan, question)
# Step 3: Generate the final answer
answer = self._generate_answer(question, plan, tool_results)
logger.info(f"Agent returning answer: {answer}")
return answer
def _plan_approach(self, question: str) -> Dict[str, Any]:
"""
Analyze the question and plan how to answer it.
Args:
question: The question to analyze
Returns:
Dict with the plan details
"""
# In a full implementation, this would use the LLM to analyze the question
# and determine what tools are needed
# For now, using a simple keyword-based approach
plan = {
"tools_needed": [],
"reasoning": "Determining how to approach this question..."
}
# Check for mentions of files
file_pattern = r"file[:\s]+([^\s.,?!]+)"
file_match = re.search(file_pattern, question, re.IGNORECASE)
if file_match:
plan["tools_needed"].append("file_reader")
plan["file_name"] = file_match.group(1)
# Check for mentions of websites, URLs, or internet searches
if any(term in question.lower() for term in ["website", "url", "search", "internet", "online", "web", "wikipedia"]):
plan["tools_needed"].append("web_search")
# Check for code execution needs
if any(term in question.lower() for term in ["code", "python", "execute", "run", "script", "program"]):
plan["tools_needed"].append("code_interpreter")
return plan
def _execute_plan(self, plan: Dict[str, Any], question: str) -> Dict[str, Any]:
"""
Execute the plan using the appropriate tools.
Args:
plan: The plan created by _plan_approach
question: The original question
Returns:
Dict with results from tool executions
"""
results = {}
for tool_name in plan.get("tools_needed", []):
if tool_name in self.tools:
tool = self.tools[tool_name]
if tool_name == "web_search":
# Extract search terms from the question
search_query = question # In a full implementation, you'd extract key terms
results[tool_name] = tool.run(query=search_query)
elif tool_name == "file_reader" and "file_name" in plan:
# In a full implementation, you'd extract task_id from context
task_id = "sample_task_id"
file_name = plan["file_name"]
results[tool_name] = tool.run(task_id=task_id, file_name=file_name)
elif tool_name == "code_interpreter" and "code" in plan:
code = plan["code"]
results[tool_name] = tool.run(code=code)
return results
def _generate_answer(self, question: str, plan: Dict[str, Any], tool_results: Dict[str, Any]) -> str:
"""
Generate the final answer based on the question, plan, and tool results.
Args:
question: The original question
plan: The plan that was executed
tool_results: Results from tool executions
Returns:
The final answer
"""
# Construct a prompt for the LLM that includes the question, tool results, and
# instructions to format the answer properly
prompt_parts = [
f"Question: {question}\n\n",
"I need to answer this question. Here's what I know:\n\n"
]
# Add tool results to the prompt
for tool_name, result in tool_results.items():
prompt_parts.append(f"Results from {tool_name}:\n{json.dumps(result, indent=2)}\n\n")
prompt_parts.append(
"Based on the above information, answer the question. "
"Remember to provide your reasoning first, then clearly state your final answer "
"in the format: FINAL ANSWER: [your concise answer]"
)
prompt = "".join(prompt_parts)
# Get response from LLM
llm_response = self.llm.generate(prompt, system_prompt=SYSTEM_PROMPT)
# Extract the final answer
final_answer = self.llm.extract_final_answer(llm_response)
return final_answer
# --- Runner Function for Gradio Interface ---
def run_and_submit_all(profile: gr.OAuthProfile | None, test_username: str = ""):
"""
Fetches all questions, runs the GAIA Agent on them, submits all answers,
and displays the results.
"""
# --- Determine HF Space Runtime URL and Repo URL ---
space_id = os.getenv("SPACE_ID") # Get the SPACE_ID for sending link to the code
# Check if we're using a test username (for local development)
if test_username:
username = test_username
print(f"Using test username: {username}")
elif profile:
username = f"{profile.username}"
print(f"User logged in: {username}")
else:
print("User not logged in.")
return "Please Login to Hugging Face with the button or provide a test username.", None
api_url = DEFAULT_API_URL
questions_url = f"{api_url}/questions"
submit_url = f"{api_url}/submit"
# 1. Instantiate Agent
try:
agent = GAIAAgent()
except Exception as e:
print(f"Error instantiating agent: {e}")
return f"Error initializing agent: {e}", None
# In the case of an app running as a Hugging Face space, this link points toward your codebase
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main"
print(agent_code)
# 2. Fetch Questions
print(f"Fetching questions from: {questions_url}")
try:
response = requests.get(questions_url, timeout=15)
response.raise_for_status()
questions_data = response.json()
if not questions_data:
print("Fetched questions list is empty.")
return "Fetched questions list is empty or invalid format.", None
print(f"Fetched {len(questions_data)} questions.")
except requests.exceptions.RequestException as e:
print(f"Error fetching questions: {e}")
return f"Error fetching questions: {e}", None
except requests.exceptions.JSONDecodeError as e:
print(f"Error decoding JSON response from questions endpoint: {e}")
print(f"Response text: {response.text[:500]}")
return f"Error decoding server response for questions: {e}", None
except Exception as e:
print(f"An unexpected error occurred fetching questions: {e}")
return f"An unexpected error occurred fetching questions: {e}", None
# 3. Run your Agent
results_log = []
answers_payload = []
print(f"Running agent on {len(questions_data)} questions...")
for item in questions_data:
task_id = item.get("task_id")
question_text = item.get("Question") # Note: Capital 'Q' in the JSON file
if not task_id or question_text is None:
print(f"Skipping item with missing task_id or Question: {item}")
continue
try:
submitted_answer = agent(question_text)
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer})
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer})
except Exception as e:
print(f"Error running agent on task {task_id}: {e}")
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"})
if not answers_payload:
print("Agent did not produce any answers to submit.")
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log)
# 4. Prepare Submission
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload}
status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..."
print(status_update)
# 5. Submit
print(f"Submitting {len(answers_payload)} answers to: {submit_url}")
try:
response = requests.post(submit_url, json=submission_data, timeout=60)
response.raise_for_status()
result_data = response.json()
final_status = (
f"Submission Successful!\n"
f"User: {result_data.get('username')}\n"
f"Overall Score: {result_data.get('score', 'N/A')}% "
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n"
f"Message: {result_data.get('message', 'No message received.')}"
)
print("Submission successful.")
results_df = pd.DataFrame(results_log)
return final_status, results_df
except requests.exceptions.HTTPError as e:
error_detail = f"Server responded with status {e.response.status_code}."
try:
error_json = e.response.json()
error_detail += f" Detail: {error_json.get('detail', e.response.text)}"
except requests.exceptions.JSONDecodeError:
error_detail += f" Response: {e.response.text[:500]}"
status_message = f"Submission Failed: {error_detail}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
except requests.exceptions.Timeout:
status_message = "Submission Failed: The request timed out."
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
except requests.exceptions.RequestException as e:
status_message = f"Submission Failed: Network error - {e}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
except Exception as e:
status_message = f"An unexpected error occurred during submission: {e}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
# --- Build Gradio Interface using Blocks ---
with gr.Blocks() as demo:
gr.Markdown("# GAIA Agent Evaluation Runner")
gr.Markdown(
"""
**Instructions:**
1. Log in to your Hugging Face account using the button below. This uses your HF username for submission.
2. Click 'Run Evaluation & Submit All Answers' to fetch questions, run the GAIA agent, submit answers, and see the score.
This agent is capable of:
- Performing web searches for information
- Processing various file types (text, code, images, audio, etc.)
- Executing code safely for computational questions
- Reasoning through complex multi-step problems
The agent will automatically select the appropriate tools based on the question.
"""
)
with gr.Row():
login_button = gr.LoginButton()
test_username = gr.Textbox(label="Or enter test username for local development", placeholder="test_user")
run_button = gr.Button("Run Evaluation & Submit All Answers")
status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False)
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True)
run_button.click(
fn=run_and_submit_all,
inputs=[login_button, test_username],
outputs=[status_output, results_table]
)
if __name__ == "__main__":
print("\n" + "-"*30 + " GAIA Agent Starting " + "-"*30)
# Check for environment variables
load_dotenv() # Load environment variables from .env file if it exists
space_host_startup = os.getenv("SPACE_HOST")
space_id_startup = os.getenv("SPACE_ID")
if space_host_startup:
print(f"✅ SPACE_HOST found: {space_host_startup}")
print(f" Runtime URL should be: https://{space_host_startup}.hf.space")
else:
print("ℹ️ SPACE_HOST environment variable not found (running locally?).")
if space_id_startup:
print(f"✅ SPACE_ID found: {space_id_startup}")
print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}")
print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main")
else:
print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.")
print("-"*(60 + len(" GAIA Agent Starting ")) + "\n")
print("Launching Gradio Interface for GAIA Agent Evaluation...")
# When running locally, disable OAuth to avoid login issues
is_local = not (space_host_startup or space_id_startup)
if is_local:
print("⚠️ Running in local mode - OAuth features will be disabled")
demo.launch(debug=True, share=False, auth=None)
else:
demo.launch(debug=True, share=False)