|
|
|
|
|
import os |
|
import gradio as gr |
|
import requests |
|
import pandas as pd |
|
import base64 |
|
import json |
|
import operator |
|
from typing import Annotated, List, TypedDict |
|
|
|
from dotenv import load_dotenv |
|
from langchain_community.tools.tavily_search import TavilySearchResults |
|
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage |
|
from langchain_core.prompts import ChatPromptTemplate |
|
from langchain_core.tools import tool |
|
from langchain_google_genai import ChatGoogleGenerativeAI |
|
from langgraph.graph import END, StateGraph |
|
from langgraph.prebuilt import ToolNode |
|
|
|
API_BASE_URL = "https://agents-course-unit4-scoring.hf.space" |
|
|
|
class GaiaLangGraphAgent: |
|
def __init__(self): |
|
print("Initializing GaiaLangGraphAgent...") |
|
load_dotenv() |
|
|
|
class AgentState(TypedDict): |
|
question: str |
|
intermediate_steps: Annotated[List[BaseMessage], operator.add] |
|
self.AgentState = AgentState |
|
|
|
web_search_tool = TavilySearchResults(max_results=4) |
|
|
|
@tool |
|
def calculator(expression: str) -> str: |
|
"""Evaluates a simple mathematical expression.""" |
|
try: |
|
import numexpr |
|
return str(numexpr.evaluate(expression).item()) |
|
except Exception as e: return f"Error: {e}" |
|
|
|
llm_vision = ChatGoogleGenerativeAI(model="gemini-1.5-pro-latest") |
|
|
|
def get_file_path(file_name: str) -> str: |
|
if not os.path.exists("task_files"): os.makedirs("task_files") |
|
return os.path.join("task_files", file_name) |
|
|
|
@tool |
|
def file_reader(file_name: str) -> str: |
|
"""Reads a file, downloading if necessary. Handles text and images.""" |
|
local_path = get_file_path(file_name) |
|
if not os.path.exists(local_path): |
|
download_url = f"{API_BASE_URL}/files/{file_name}" |
|
print(f"Downloading: {download_url}") |
|
try: |
|
response = requests.get(download_url); response.raise_for_status() |
|
with open(local_path, "wb") as f: f.write(response.content) |
|
except Exception as e: return f"Error downloading {file_name}: {e}" |
|
try: |
|
if any(file_name.lower().endswith(ext) for ext in ['.png', '.jpg', '.jpeg', '.webp']): |
|
with open(local_path, "rb") as image_file: b64_image = base64.b64encode(image_file.read()).decode('utf-8') |
|
vision_prompt = HumanMessage(content=[ |
|
{"type": "text", "text": "Describe this image in detail, focusing on text or identifiable objects."}, |
|
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{b64_image}"}} |
|
]) |
|
return llm_vision.invoke([vision_prompt]).content |
|
else: |
|
with open(local_path, 'r', encoding='utf-8') as f: return f.read() |
|
except Exception as e: return f"Error processing {file_name}: {e}" |
|
|
|
tools = [web_search_tool, file_reader, calculator] |
|
|
|
llm = ChatGoogleGenerativeAI(model="gemini-1.5-flash-latest", temperature=0, convert_system_message_to_human=True) |
|
llm_with_tools = llm.bind_tools(tools) |
|
|
|
planner_prompt = ChatPromptTemplate.from_messages([ |
|
("system", """You are a world-class AI assistant. |
|
**Principles:** 1. Analyze the question for nuances. 2. Create multi-step plans. 3. Use tools intelligently (search, file read, calculator) or solve logic puzzles directly. 4. Provide exact-match answers. |
|
**Execution:** Loop through plan->act cycles until you have the final answer."""), |
|
("human", "{question}\n\n{intermediate_steps}"), |
|
]) |
|
|
|
def planner_node(state: AgentState): |
|
print("\n---PLANNER---") |
|
chain = planner_prompt | llm_with_tools |
|
response = chain.invoke(state) |
|
print(f"Planner decision: {'Tool call' if response.tool_calls else 'Final Answer'}") |
|
return {'intermediate_steps': [response]} |
|
|
|
tool_node = ToolNode(tools) |
|
|
|
def should_continue(state: AgentState): |
|
last_message = state['intermediate_steps'][-1] |
|
if isinstance(last_message, AIMessage): |
|
if len(getattr(last_message, "tool_calls", [])) > 0: return "action" |
|
return END |
|
|
|
workflow = StateGraph(AgentState) |
|
workflow.add_node("planner", planner_node) |
|
workflow.add_node("action", tool_node) |
|
workflow.set_entry_point("planner") |
|
workflow.add_conditional_edges("planner", should_continue) |
|
workflow.add_edge("action", "planner") |
|
self.app = workflow.compile() |
|
print("GaiaLangGraphAgent initialized successfully.") |
|
|
|
def __call__(self, question: str) -> str: |
|
print(f"\n>>>>>> AGENT EXECUTING FOR QUESTION: {question[:70]}...") |
|
initial_state = {"question": question, "intermediate_steps": []} |
|
final_state = self.app.invoke(initial_state, config={"recursion_limit": 15}) |
|
final_answer = final_state["intermediate_steps"][-1].content |
|
print(f"<<<<<< AGENT FINISHED. FINAL ANSWER: {final_answer}") |
|
return final_answer |
|
|
|
def run_and_submit_all(profile: gr.OAuthProfile | None): |
|
if not profile: return "Please Login to Hugging Face with the button first.", None |
|
space_id = os.getenv("SPACE_ID") |
|
if not space_id: return "CRITICAL ERROR: SPACE_ID not found. Run this from a deployed Hugging Face Space.", None |
|
username = profile.username |
|
print(f"User logged in: {username}") |
|
questions_url = f"{API_BASE_URL}/questions" |
|
submit_url = f"{API_BASE_URL}/submit" |
|
try: |
|
agent = GaiaLangGraphAgent() |
|
except Exception as e: return f"Error initializing agent: {e}", None |
|
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" |
|
print(f"Fetching questions from: {questions_url}") |
|
try: |
|
response = requests.get(questions_url, timeout=20); response.raise_for_status() |
|
questions_data = response.json() |
|
except Exception as e: return f"Error fetching questions: {e}", None |
|
results_log, answers_payload = [], [] |
|
print(f"Running agent on {len(questions_data)} questions. This may take several minutes...") |
|
for item in questions_data: |
|
task_id, question_text = item.get("task_id"), item.get("question") |
|
try: |
|
submitted_answer = agent(question_text) |
|
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) |
|
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) |
|
except Exception as e: |
|
results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"}) |
|
submission_data = {"username": username, "agent_code": agent_code, "answers": answers_payload} |
|
print(f"Submitting {len(answers_payload)} answers...") |
|
try: |
|
response = requests.post(submit_url, json=submission_data, timeout=60); response.raise_for_status() |
|
result_data = response.json() |
|
final_status = (f"Submission Successful!\nUser: {result_data.get('username')}\n" |
|
f"Score: {result_data.get('score', 'N/A')}% " |
|
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)") |
|
return final_status, pd.DataFrame(results_log) |
|
except Exception as e: return f"Submission Failed: {e}", pd.DataFrame(results_log) |
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown("# GAIA - Advanced Agent Runner") |
|
gr.Markdown("Log in and click 'Run' to evaluate the agent.") |
|
gr.LoginButton() |
|
run_button = gr.Button("Run Evaluation & Submit All Answers") |
|
status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) |
|
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) |
|
run_button.click(fn=run_and_submit_all, outputs=[status_output, results_table]) |
|
|
|
if __name__ == "__main__": |
|
print("Launching Gradio Interface...") |
|
demo.launch() |