Yago Bolivar
feat: add GAIA Agent and local testing scripts, including setup and requirements for development
2abc50d
# /Users/yagoairm2/Desktop/agents/final project/HF_Agents_Final_Project/app2.py | |
import os | |
import gradio as gr | |
import requests | |
import pandas as pd | |
import json | |
from typing import Dict, List, Optional, Union, Any | |
import re | |
from dataclasses import dataclass | |
from abc import ABC, abstractmethod | |
import time | |
import logging | |
from dotenv import load_dotenv | |
import tempfile | |
import io | |
import sys | |
import contextlib | |
from urllib.parse import urlparse | |
from pathlib import Path | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s', | |
handlers=[logging.StreamHandler()] | |
) | |
logger = logging.getLogger(__name__) | |
# --- Constants --- | |
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
DEFAULT_FILES_DIR = "dataset" | |
SYSTEM_PROMPT = """ | |
You are a general AI assistant. I will ask you a question. Report your thoughts, and finish your answer with the following template: FINAL ANSWER: [YOUR FINAL ANSWER]. YOUR FINAL ANSWER should be a number OR as few words as possible OR a comma separated list of numbers and/or strings. If you are asked for a number, don't use comma to write your number neither use units such as $ or percent sign unless specified otherwise. If you are asked for a string, don't use articles, neither abbreviations (e.g. for cities), and write the digits in plain text unless specified otherwise. If you are asked for a comma separated list, apply the above rules depending of whether the element to be put in the list is a number or a string. | |
""" | |
# --- Tool Interface --- | |
class Tool(ABC): | |
"""Base class for all tools that agent can use.""" | |
name: str | |
description: str | |
def run(self, **kwargs) -> Dict[str, Any]: | |
"""Execute the tool with the provided arguments.""" | |
pass | |
# --- Tools Implementation --- | |
class WebSearchTool(Tool): | |
"""Tool for performing web searches.""" | |
name = "web_search" | |
description = "Search the web for information about a topic." | |
def __init__(self): | |
# Initialize any search API clients or session objects here | |
pass | |
def run(self, query: str) -> Dict[str, Any]: | |
""" | |
Perform a web search with the given query. | |
Args: | |
query: The search query | |
Returns: | |
Dict with search results | |
""" | |
# In a real implementation, this would use a search API | |
logger.info(f"WebSearchTool: Searching for '{query}'") | |
# Mock implementation - would be replaced with real search API | |
# You'd implement this with a proper search API like SerpAPI, Google Custom Search, etc. | |
time.sleep(1) # Simulate network delay | |
return { | |
"status": "success", | |
"results": [ | |
{"title": f"Mock result for {query}", "snippet": "This is a placeholder for search results.", "url": "https://example.com"} | |
] | |
} | |
class FileReaderTool(Tool): | |
"""Tool for reading and processing different types of files.""" | |
name = "file_reader" | |
description = "Read and process files of various formats." | |
def __init__(self, files_dir: str = DEFAULT_FILES_DIR): | |
self.files_dir = files_dir | |
def run(self, task_id: str, file_name: str) -> Dict[str, Any]: | |
""" | |
Read and process a file associated with a task. | |
Args: | |
task_id: The task identifier | |
file_name: Name of the file to process | |
Returns: | |
Dict with file content or error message | |
""" | |
try: | |
# First, try to find the file locally | |
file_path = os.path.join(self.files_dir, task_id, file_name) | |
if not os.path.exists(file_path): | |
# If file doesn't exist locally, try to download it | |
file_path = self._download_file(task_id, file_name) | |
# Process the file based on its extension | |
file_ext = os.path.splitext(file_name)[1].lower() | |
if file_ext in ['.txt', '.md', '.py', '.json', '.csv']: | |
with open(file_path, 'r', encoding='utf-8') as f: | |
content = f.read() | |
return {"status": "success", "content": content, "file_type": "text"} | |
elif file_ext in ['.png', '.jpg', '.jpeg']: | |
# For images, we'd use a vision model in the full implementation | |
return {"status": "success", "content": f"Image file: {file_path}", "file_type": "image"} | |
elif file_ext in ['.mp3', '.wav', '.ogg']: | |
# For audio, we'd use a speech-to-text model in the full implementation | |
return {"status": "success", "content": f"Audio file: {file_path}", "file_type": "audio"} | |
elif file_ext in ['.xlsx', '.xls']: | |
# For Excel files, we'd use pandas in the full implementation | |
return {"status": "success", "content": f"Excel file: {file_path}", "file_type": "spreadsheet"} | |
else: | |
return {"status": "error", "error": f"Unsupported file type: {file_ext}"} | |
except Exception as e: | |
logger.error(f"Error processing file {file_name}: {e}") | |
return {"status": "error", "error": str(e)} | |
def _download_file(self, task_id: str, file_name: str) -> str: | |
"""Download a file from the API and save it locally.""" | |
api_url = f"{DEFAULT_API_URL}/files/{task_id}" | |
logger.info(f"Downloading file for task {task_id}") | |
response = requests.get(api_url, timeout=30) | |
if response.status_code != 200: | |
raise Exception(f"Failed to download file: {response.status_code}") | |
# Create directory if it doesn't exist | |
os.makedirs(os.path.join(self.files_dir, task_id), exist_ok=True) | |
# Save file | |
file_path = os.path.join(self.files_dir, task_id, file_name) | |
with open(file_path, 'wb') as f: | |
f.write(response.content) | |
logger.info(f"File saved to {file_path}") | |
return file_path | |
class CodeInterpreterTool(Tool): | |
"""Tool for executing Python code safely.""" | |
name = "code_interpreter" | |
description = "Execute Python code and return the result." | |
def run(self, code: str) -> Dict[str, Any]: | |
""" | |
Execute Python code and capture output. | |
Args: | |
code: The Python code to execute | |
Returns: | |
Dict with execution results | |
""" | |
logger.info("Running code interpreter") | |
output = io.StringIO() | |
error = io.StringIO() | |
try: | |
# Capture stdout and stderr | |
with contextlib.redirect_stdout(output), contextlib.redirect_stderr(error): | |
# Execute the code in a restricted environment | |
exec_globals = {"__builtins__": {}} | |
# Add safe modules to globals | |
for safe_module in ["math", "random", "datetime", "re"]: | |
try: | |
exec_globals[safe_module] = __import__(safe_module) | |
except ImportError: | |
pass | |
# Execute the code | |
exec(code, exec_globals) | |
return { | |
"status": "success", | |
"stdout": output.getvalue(), | |
"stderr": error.getvalue() | |
} | |
except Exception as e: | |
return { | |
"status": "error", | |
"error": str(e), | |
"stdout": output.getvalue(), | |
"stderr": error.getvalue() | |
} | |
# --- LLM Interaction Module --- | |
class LLMModule: | |
"""Module for interacting with an LLM.""" | |
def __init__(self, model_name: str = "Meta-Llama-3-8B-Instruct.Q4_0.gguf"): | |
"""Initialize the LLM module with a specified model.""" | |
self.model_name = model_name | |
try: | |
from gpt4all import GPT4All | |
logger.info(f"Initializing GPT4All model: {model_name}") | |
self.model = GPT4All(model_name, allow_download=True) | |
logger.info("GPT4All model initialized successfully") | |
self.use_mock = False | |
except Exception as e: | |
logger.warning(f"Failed to initialize GPT4All model: {e}") | |
logger.warning("Using mock responses instead") | |
self.use_mock = True | |
def generate(self, prompt: str, system_prompt: str = None) -> str: | |
""" | |
Generate text using the LLM. | |
Args: | |
prompt: The user prompt | |
system_prompt: Optional system prompt | |
Returns: | |
Generated text | |
""" | |
logger.info(f"LLM: Generating response for prompt (first 50 chars): {prompt[:50]}...") | |
if self.use_mock: | |
# Fall back to mock response if model initialization failed | |
logger.warning("Using mock response") | |
response = f"This is a mock LLM response. I'm simulating thinking about: {prompt[:30]}...\n\nFINAL ANSWER: Mock answer" | |
return response | |
try: | |
# Combine system prompt and user prompt if system prompt is provided | |
full_prompt = prompt | |
if system_prompt: | |
full_prompt = f"{system_prompt}\n\n{prompt}" | |
# Generate response using GPT4All | |
with self.model.chat_session(): | |
response = self.model.generate(full_prompt, max_tokens=1024, temp=0.7) | |
logger.info(f"LLM response (first 50 chars): {response[:50]}...") | |
return response | |
except Exception as e: | |
logger.error(f"Error generating response: {e}") | |
# Fall back to mock response if generation fails | |
response = f"Error generating LLM response. Falling back to mock response.\n\nFINAL ANSWER: Error occurred" | |
return response | |
def extract_final_answer(self, text: str) -> str: | |
"""Extract the final answer from LLM output using regex.""" | |
match = re.search(r"FINAL ANSWER:\s*(.*?)(?:\n|$)", text, re.IGNORECASE) | |
if match: | |
return match.group(1).strip() | |
return text.strip() | |
# --- GAIA Agent Implementation --- | |
class GAIAAgent: | |
""" | |
Agent designed to answer questions from the GAIA benchmark. | |
This agent analyzes questions, selects appropriate tools, and generates answers. | |
""" | |
def __init__(self): | |
"""Initialize the GAIA agent with its tools and LLM.""" | |
logger.info("Initializing GAIA Agent") | |
# Initialize LLM | |
self.llm = LLMModule() | |
# Initialize tools | |
self.tools = { | |
"web_search": WebSearchTool(), | |
"file_reader": FileReaderTool(), | |
"code_interpreter": CodeInterpreterTool() | |
} | |
def __call__(self, question: str) -> str: | |
""" | |
Answer a question using the agent's tools and reasoning capabilities. | |
Args: | |
question: The question to answer | |
Returns: | |
The agent's answer | |
""" | |
logger.info(f"Agent received question: {question[:100]}...") | |
# Step 1: Analyze the question to determine the approach | |
plan = self._plan_approach(question) | |
# Step 2: Execute the plan using tools if needed | |
tool_results = self._execute_plan(plan, question) | |
# Step 3: Generate the final answer | |
answer = self._generate_answer(question, plan, tool_results) | |
logger.info(f"Agent returning answer: {answer}") | |
return answer | |
def _plan_approach(self, question: str) -> Dict[str, Any]: | |
""" | |
Analyze the question and plan how to answer it. | |
Args: | |
question: The question to analyze | |
Returns: | |
Dict with the plan details | |
""" | |
# In a full implementation, this would use the LLM to analyze the question | |
# and determine what tools are needed | |
# For now, using a simple keyword-based approach | |
plan = { | |
"tools_needed": [], | |
"reasoning": "Determining how to approach this question..." | |
} | |
# Check for mentions of files | |
file_pattern = r"file[:\s]+([^\s.,?!]+)" | |
file_match = re.search(file_pattern, question, re.IGNORECASE) | |
if file_match: | |
plan["tools_needed"].append("file_reader") | |
plan["file_name"] = file_match.group(1) | |
# Check for mentions of websites, URLs, or internet searches | |
if any(term in question.lower() for term in ["website", "url", "search", "internet", "online", "web", "wikipedia"]): | |
plan["tools_needed"].append("web_search") | |
# Check for code execution needs | |
if any(term in question.lower() for term in ["code", "python", "execute", "run", "script", "program"]): | |
plan["tools_needed"].append("code_interpreter") | |
return plan | |
def _execute_plan(self, plan: Dict[str, Any], question: str) -> Dict[str, Any]: | |
""" | |
Execute the plan using the appropriate tools. | |
Args: | |
plan: The plan created by _plan_approach | |
question: The original question | |
Returns: | |
Dict with results from tool executions | |
""" | |
results = {} | |
for tool_name in plan.get("tools_needed", []): | |
if tool_name in self.tools: | |
tool = self.tools[tool_name] | |
if tool_name == "web_search": | |
# Extract search terms from the question | |
search_query = question # In a full implementation, you'd extract key terms | |
results[tool_name] = tool.run(query=search_query) | |
elif tool_name == "file_reader" and "file_name" in plan: | |
# In a full implementation, you'd extract task_id from context | |
task_id = "sample_task_id" | |
file_name = plan["file_name"] | |
results[tool_name] = tool.run(task_id=task_id, file_name=file_name) | |
elif tool_name == "code_interpreter" and "code" in plan: | |
code = plan["code"] | |
results[tool_name] = tool.run(code=code) | |
return results | |
def _generate_answer(self, question: str, plan: Dict[str, Any], tool_results: Dict[str, Any]) -> str: | |
""" | |
Generate the final answer based on the question, plan, and tool results. | |
Args: | |
question: The original question | |
plan: The plan that was executed | |
tool_results: Results from tool executions | |
Returns: | |
The final answer | |
""" | |
# Construct a prompt for the LLM that includes the question, tool results, and | |
# instructions to format the answer properly | |
prompt_parts = [ | |
f"Question: {question}\n\n", | |
"I need to answer this question. Here's what I know:\n\n" | |
] | |
# Add tool results to the prompt | |
for tool_name, result in tool_results.items(): | |
prompt_parts.append(f"Results from {tool_name}:\n{json.dumps(result, indent=2)}\n\n") | |
prompt_parts.append( | |
"Based on the above information, answer the question. " | |
"Remember to provide your reasoning first, then clearly state your final answer " | |
"in the format: FINAL ANSWER: [your concise answer]" | |
) | |
prompt = "".join(prompt_parts) | |
# Get response from LLM | |
llm_response = self.llm.generate(prompt, system_prompt=SYSTEM_PROMPT) | |
# Extract the final answer | |
final_answer = self.llm.extract_final_answer(llm_response) | |
return final_answer | |
# --- Runner Function for Gradio Interface --- | |
def run_and_submit_all(profile: gr.OAuthProfile | None, test_username: str = ""): | |
""" | |
Fetches all questions, runs the GAIA Agent on them, submits all answers, | |
and displays the results. | |
""" | |
# --- Determine HF Space Runtime URL and Repo URL --- | |
space_id = os.getenv("SPACE_ID") # Get the SPACE_ID for sending link to the code | |
# Check if we're using a test username (for local development) | |
if test_username: | |
username = test_username | |
print(f"Using test username: {username}") | |
elif profile: | |
username = f"{profile.username}" | |
print(f"User logged in: {username}") | |
else: | |
print("User not logged in.") | |
return "Please Login to Hugging Face with the button or provide a test username.", None | |
api_url = DEFAULT_API_URL | |
questions_url = f"{api_url}/questions" | |
submit_url = f"{api_url}/submit" | |
# 1. Instantiate Agent | |
try: | |
agent = GAIAAgent() | |
except Exception as e: | |
print(f"Error instantiating agent: {e}") | |
return f"Error initializing agent: {e}", None | |
# In the case of an app running as a Hugging Face space, this link points toward your codebase | |
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" | |
print(agent_code) | |
# 2. Fetch Questions | |
print(f"Fetching questions from: {questions_url}") | |
try: | |
response = requests.get(questions_url, timeout=15) | |
response.raise_for_status() | |
questions_data = response.json() | |
if not questions_data: | |
print("Fetched questions list is empty.") | |
return "Fetched questions list is empty or invalid format.", None | |
print(f"Fetched {len(questions_data)} questions.") | |
except requests.exceptions.RequestException as e: | |
print(f"Error fetching questions: {e}") | |
return f"Error fetching questions: {e}", None | |
except requests.exceptions.JSONDecodeError as e: | |
print(f"Error decoding JSON response from questions endpoint: {e}") | |
print(f"Response text: {response.text[:500]}") | |
return f"Error decoding server response for questions: {e}", None | |
except Exception as e: | |
print(f"An unexpected error occurred fetching questions: {e}") | |
return f"An unexpected error occurred fetching questions: {e}", None | |
# 3. Run your Agent | |
results_log = [] | |
answers_payload = [] | |
print(f"Running agent on {len(questions_data)} questions...") | |
for item in questions_data: | |
task_id = item.get("task_id") | |
question_text = item.get("Question") # Note: Capital 'Q' in the JSON file | |
if not task_id or question_text is None: | |
print(f"Skipping item with missing task_id or Question: {item}") | |
continue | |
try: | |
submitted_answer = agent(question_text) | |
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) | |
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) | |
except Exception as e: | |
print(f"Error running agent on task {task_id}: {e}") | |
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"}) | |
if not answers_payload: | |
print("Agent did not produce any answers to submit.") | |
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) | |
# 4. Prepare Submission | |
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} | |
status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." | |
print(status_update) | |
# 5. Submit | |
print(f"Submitting {len(answers_payload)} answers to: {submit_url}") | |
try: | |
response = requests.post(submit_url, json=submission_data, timeout=60) | |
response.raise_for_status() | |
result_data = response.json() | |
final_status = ( | |
f"Submission Successful!\n" | |
f"User: {result_data.get('username')}\n" | |
f"Overall Score: {result_data.get('score', 'N/A')}% " | |
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" | |
f"Message: {result_data.get('message', 'No message received.')}" | |
) | |
print("Submission successful.") | |
results_df = pd.DataFrame(results_log) | |
return final_status, results_df | |
except requests.exceptions.HTTPError as e: | |
error_detail = f"Server responded with status {e.response.status_code}." | |
try: | |
error_json = e.response.json() | |
error_detail += f" Detail: {error_json.get('detail', e.response.text)}" | |
except requests.exceptions.JSONDecodeError: | |
error_detail += f" Response: {e.response.text[:500]}" | |
status_message = f"Submission Failed: {error_detail}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
except requests.exceptions.Timeout: | |
status_message = "Submission Failed: The request timed out." | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
except requests.exceptions.RequestException as e: | |
status_message = f"Submission Failed: Network error - {e}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
except Exception as e: | |
status_message = f"An unexpected error occurred during submission: {e}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
# --- Build Gradio Interface using Blocks --- | |
with gr.Blocks() as demo: | |
gr.Markdown("# GAIA Agent Evaluation Runner") | |
gr.Markdown( | |
""" | |
**Instructions:** | |
1. Log in to your Hugging Face account using the button below. This uses your HF username for submission. | |
2. Click 'Run Evaluation & Submit All Answers' to fetch questions, run the GAIA agent, submit answers, and see the score. | |
This agent is capable of: | |
- Performing web searches for information | |
- Processing various file types (text, code, images, audio, etc.) | |
- Executing code safely for computational questions | |
- Reasoning through complex multi-step problems | |
The agent will automatically select the appropriate tools based on the question. | |
""" | |
) | |
with gr.Row(): | |
login_button = gr.LoginButton() | |
test_username = gr.Textbox(label="Or enter test username for local development", placeholder="test_user") | |
run_button = gr.Button("Run Evaluation & Submit All Answers") | |
status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) | |
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) | |
run_button.click( | |
fn=run_and_submit_all, | |
inputs=[login_button, test_username], | |
outputs=[status_output, results_table] | |
) | |
if __name__ == "__main__": | |
print("\n" + "-"*30 + " GAIA Agent Starting " + "-"*30) | |
# Check for environment variables | |
load_dotenv() # Load environment variables from .env file if it exists | |
space_host_startup = os.getenv("SPACE_HOST") | |
space_id_startup = os.getenv("SPACE_ID") | |
if space_host_startup: | |
print(f"✅ SPACE_HOST found: {space_host_startup}") | |
print(f" Runtime URL should be: https://{space_host_startup}.hf.space") | |
else: | |
print("ℹ️ SPACE_HOST environment variable not found (running locally?).") | |
if space_id_startup: | |
print(f"✅ SPACE_ID found: {space_id_startup}") | |
print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") | |
print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") | |
else: | |
print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.") | |
print("-"*(60 + len(" GAIA Agent Starting ")) + "\n") | |
print("Launching Gradio Interface for GAIA Agent Evaluation...") | |
# When running locally, disable OAuth to avoid login issues | |
is_local = not (space_host_startup or space_id_startup) | |
if is_local: | |
print("⚠️ Running in local mode - OAuth features will be disabled") | |
demo.launch(debug=True, share=False, auth=None) | |
else: | |
demo.launch(debug=True, share=False) | |