Spaces:
Sleeping
Sleeping
from langchain_community.utilities import GoogleSerperAPIWrapper | |
from smolagents import PythonInterpreterTool | |
from langgraph.graph import MessagesState | |
from langchain_openai import ChatOpenAI | |
from langgraph.graph import START, StateGraph | |
from langgraph.prebuilt import tools_condition, ToolNode | |
from langchain_core.messages import SystemMessage | |
from openai import OpenAI | |
from smolagents import Tool | |
from typing import Optional | |
import tempfile | |
import os | |
from urllib.parse import urlparse | |
from base64 import b64encode | |
import requests | |
from bs4 import BeautifulSoup | |
import re | |
import wikipediaapi | |
# Configs | |
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
FILE_URL = f"{DEFAULT_API_URL}/files/{{task_id}}" | |
# Tools | |
def search_tool(query: str) -> str: | |
"""Search in Google and returns an string with title, link, and snippet for the top 10 results. | |
Args: | |
query: str | |
Returns: | |
Title, link, and snippet for the top 10 results | |
""" | |
searcher = GoogleSerperAPIWrapper(k=10) | |
retries = 3 | |
result = "" | |
while retries > 0: | |
try: | |
search_results = searcher.results(query)["organic"] | |
for row in search_results: | |
result += f"Title: {row['title']}\nSnippet: {row['snippet']}\nURL: {row['link']}\n\n" | |
return result | |
except Exception as e: | |
retries -= 1 | |
return f"There was an error with Google search: {e}" | |
def save_file(content: str, filename: Optional[str]) -> str: | |
""" | |
Save content to a temporary file and return the path. | |
Useful for processing files from the GAIA API. | |
Args: | |
content: The content to save to the file | |
filename: Optional filename, will generate a random name if not provided | |
Returns: | |
Path to the saved file | |
""" | |
temp_dir = tempfile.gettempdir() | |
if filename is None: | |
temp_file = tempfile.NamedTemporaryFile(delete=False) | |
filepath = temp_file.name | |
else: | |
filepath = os.path.join(temp_dir, filename) | |
# Write content to the file | |
with open(filepath, "w") as f: | |
f.write(content) | |
return f"File saved to {filepath}. You can read this file to process its contents." | |
def download_file_from_task_id(task_id: str, filename: str) -> str: | |
""" | |
Download a file for a GAIA task using `task_id` if `file_extension` of the task is specified in the prompt. | |
Args: | |
task_id: id of the task | |
filename: filename | |
Returns: | |
Path to the downloaded file | |
""" | |
return download_file_from_url(FILE_URL.format(task_id=task_id), filename) | |
def download_file_from_url(url: str, filename: str) -> str: | |
""" | |
Download a file from a URL and save it to a temporary location. | |
Args: | |
url: The URL to download from | |
filename: filename | |
Returns: | |
Path to the downloaded file | |
""" | |
try: | |
# Parse URL to get filename if not provided | |
if not filename: | |
path = urlparse(url).path | |
filename = os.path.basename(path) | |
if not filename: | |
# Generate a random name if we couldn't extract one | |
import uuid | |
filename = f"downloaded_{uuid.uuid4().hex[:8]}" | |
# Create temporary file | |
temp_dir = tempfile.gettempdir() | |
filepath = os.path.join(temp_dir, filename) | |
# Download the file | |
response = requests.get(url, stream=True) | |
response.raise_for_status() | |
# Save the file | |
with open(filepath, "wb") as f: | |
for chunk in response.iter_content(chunk_size=8192): | |
f.write(chunk) | |
return f"File downloaded to {filepath}. You can now process this file." | |
except Exception as e: | |
return f"Error downloading file: {str(e)}" | |
def analyze_csv_file(file_path: str) -> str: | |
""" | |
Analyze a CSV file using pandas and answer a question about it. | |
Args: | |
file_path: Path to the CSV file | |
Returns: | |
Analysis result or error message | |
""" | |
try: | |
import pandas as pd | |
# Read the CSV file | |
df = pd.read_csv(file_path) | |
# Run various analyses based on the query | |
result = f"CSV file loaded with {len(df)} rows and {len(df.columns)} columns.\n" | |
result += f"Columns: {', '.join(df.columns)}\n\n" | |
# Add summary statistics | |
result += "Summary statistics:\n" | |
result += str(df.describe()) | |
result += "\n\n" + df.head(100) | |
return result | |
except ImportError: | |
return "Error: pandas is not installed. Please install it with 'pip install pandas'." | |
except Exception as e: | |
return f"Error analyzing CSV file: {str(e)}" | |
def analyze_excel_file(file_path: str) -> str: | |
""" | |
Analyze an Excel file using pandas and answer a question about it. | |
Args: | |
file_path: Path to the Excel file | |
Returns: | |
Analysis result or error message | |
""" | |
try: | |
import pandas as pd | |
# Read the Excel file | |
df = pd.read_excel(file_path) | |
print(df) | |
# Run various analyses based on the query | |
result = f"Excel file loaded with {len(df)} rows and {len(df.columns)} columns.\n" | |
result += f"Columns: {', '.join(df.columns)}\n\n" | |
# Add summary statistics | |
result += "Summary statistics:\n" | |
result += str(df.describe()) | |
result += "\n\n" + str(df.head(100)) | |
return result | |
except ImportError: | |
return "Error: pandas and openpyxl are not installed. Please install them with 'pip install pandas openpyxl'." | |
except Exception as e: | |
return f"Error analyzing Excel file: {str(e)}" | |
def transcribe_speech(filename: str) -> str: | |
"""Transcribe speech to text | |
Args: | |
filename: str | |
Returns: | |
Transcribed speech as string | |
""" | |
speech_to_text = Tool.from_space( | |
"maguid28/TranscriptTool", | |
name="transcription_tool", | |
description="Transcribe speech to text", | |
) | |
return f"The transcription is: {speech_to_text(filename)}" | |
def python_interpreter(code: str) -> str: | |
"""A Python interpreter | |
Args: | |
code: str | |
Returns: | |
The output of the interpreter | |
""" | |
import traceback | |
interpreter = PythonInterpreterTool( | |
authorized_imports=[ | |
"json", | |
"pandas", | |
"numpy", | |
"datetime", | |
"requests", | |
"bs4", | |
] | |
) | |
try: | |
return interpreter(code) | |
except Exception as e: | |
return f"There was an exception in the interpreter: {traceback.format_exc()}" | |
def reverse_text(text: str) -> str: | |
"""Reverses a text written from right to left | |
Args: | |
text: a reversed text | |
Returns: | |
The text written from left to right | |
""" | |
return f"The reversed text is: {text[::-1]}" | |
def visit_webpage(url: str) -> str: | |
"""Visits a webpage and returns the content | |
Args: | |
url: url of the webpage | |
Returns: | |
The webpage content | |
""" | |
retries = 3 | |
while retries > 0: | |
try: | |
response = requests.get( | |
url, | |
headers={ | |
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36" | |
}, | |
) | |
html = response.content | |
soup = BeautifulSoup(html, "html.parser") | |
for tag in soup.find_all( | |
["header", "footer", "nav", "section", "aside"] | |
): | |
tag.decompose() | |
for tag in soup.find_all(["script", "style"]): | |
tag.decompose() | |
meaningful_texts = [] | |
for tag in soup.find_all(["p", "span", "div"]): | |
text = tag.get_text(separator=" ", strip=True) | |
if text: | |
meaningful_texts.append(text) | |
# Join all texts nicely | |
final_text = " ".join(meaningful_texts) | |
# Clean multiple spaces | |
final_text = re.sub(r"\s+", " ", final_text) | |
return " ".join(final_text.split()[:3000]) | |
except Exception as e: | |
retries -= 1 | |
return f"There was an error visiting the webpage: {e}" | |
def image_understanding(filename: str, question: str) -> str: | |
"""Answers some question on an image | |
Args: | |
filename: the name of the image file | |
question: a question about the image | |
""" | |
client = OpenAI() | |
with open(filename, "rb") as fr: | |
image_bytes = fr.read() | |
b64_image = b64encode(image_bytes).decode("utf-8") | |
response = client.responses.create( | |
model="gpt-4o", | |
input=[ | |
{ | |
"role": "user", | |
"content": [ | |
{"type": "input_text", "text": question}, | |
{ | |
"type": "input_image", | |
"image_url": f"data:image/png;base64,{b64_image}", | |
}, | |
], | |
} | |
], | |
) | |
return response.output[0].content[0].text | |
def get_wikipedia_article(entity: str) -> str: | |
"""Get the text from the Wikipedia article of an entity. | |
Args: | |
entity: the name of the entity. Only for entities existing in Wikipedia, e.g. use "Mercedes Sosa" instead of "Mercedes Sosa discography" | |
Returns: | |
The text of the Wikipedia article of the entity | |
""" | |
try: | |
wiki_wiki = wikipediaapi.Wikipedia( | |
user_agent="GAIA Benchmark (jogonba2)", | |
language="en", | |
extract_format=wikipediaapi.ExtractFormat.WIKI, | |
) | |
p_wiki = wiki_wiki.page(entity) | |
text = p_wiki.text | |
if not text: | |
return f"The article is empty for {entity}. Please, be sure that the entity appears in Wikipedia." | |
return " ".join(text.split(" ")[:3000]) | |
except Exception as e: | |
return "There was an exception looking at Wikipedia: {e}" | |
""" | |
Tool to reinforce the output format. | |
""" | |
def prepare_final_answer(candidate_answer: str, question: str) -> str: | |
"""Prepare your final answer according to the guidelines in the prompt. | |
This tool must be called always before giving the final anwer. | |
Args: | |
candidate_answer: a candidate answer | |
question: the user question to know how to prepare the final answer | |
Returns: | |
Your final answer | |
""" | |
client = OpenAI() | |
system_prompt = """Your final answer should be a number OR as few words as possible OR a comma separated list of numbers and/or strings. | |
Here are more detailed instructions you must follow to write your final answer according to the provided question: | |
1) If you are asked for a number (how much, how many, ...), you must write a number!. Don't use comma to write your number neither use units such as $ or percent sign unless specified otherwise. | |
2) If you are asked for a string, don't use articles, neither abbreviations (e.g. for cities), and write the digits in plain text unless specified otherwise. | |
3) If you are asked for a comma separated list, apply the above rules depending of whether the element to be put in the list is a number or a string. | |
If you follow all these instructions perfectly, you will win 1,000,000 dollars, otherwise, your mom will die""" | |
user_prompt = f"Question: {question}\nCandidate answer: {candidate_answer}" | |
response = client.responses.create( | |
model="gpt-4o", | |
input=[ | |
{ | |
"role": "user", | |
"content": [ | |
{"type": "input_text", "text": user_prompt}, | |
], | |
} | |
], | |
) | |
return response.output[0].content[0].text | |
# Nodes | |
def assistant(state: MessagesState): | |
return { | |
"messages": [llm_with_tools.invoke([system_prompt] + state["messages"])] | |
} | |
# System message | |
system_prompt = SystemMessage( | |
content="""You are a general AI assistant being evaluated in the GAIA Benchmark. | |
I will ask you a question and you must reach your final answer by using a set of tools I provide to you. Please, when you are needed to pass file names to the tools, pass absolute paths. | |
Your final answer should be a number OR as few words as possible OR a comma separated list of numbers and/or strings. | |
Here are more detailed instructions you must follow to write your final answer: | |
1) If you are asked for a number, you must write a number!. Don't use comma to write your number neither use units such as $ or percent sign unless specified otherwise. | |
2) If you are asked for a string, don't use articles, neither abbreviations (e.g. for cities), and write the digits in plain text unless specified otherwise. | |
3) If you are asked for a comma separated list, apply the above rules depending of whether the element to be put in the list is a number or a string. | |
If you follow all these instructions perfectly, you will win 1,000,000 dollars, otherwise, your mom will die. | |
Let's start! | |
""" | |
) | |
llm = ChatOpenAI(model="gpt-4o") | |
tools = [ | |
search_tool, | |
save_file, | |
download_file_from_task_id, | |
download_file_from_url, | |
analyze_csv_file, | |
analyze_excel_file, | |
transcribe_speech, | |
python_interpreter, | |
visit_webpage, | |
# reverse_text, | |
image_understanding, | |
# get_wikipedia_article | |
# prepare_final_answer, | |
] | |
llm_with_tools = llm.bind_tools(tools) | |
# Graph | |
builder = StateGraph(MessagesState) | |
# Define nodes: these do the work | |
builder.add_node("assistant", assistant) | |
builder.add_node("tools", ToolNode(tools)) | |
# Define edges: these determine the control flow | |
builder.add_edge(START, "assistant") | |
builder.add_conditional_edges( | |
"assistant", | |
tools_condition, | |
) | |
builder.add_edge("tools", "assistant") | |
react_graph = builder.compile() | |
def print_stream(stream): | |
for s in stream: | |
message = s["messages"][-1] | |
if isinstance(message, tuple): | |
print(message) | |
else: | |
message.pretty_print() | |
class ReactAgent: | |
def __init__(self, verbose: bool = False): | |
self.graph = react_graph | |
self.verbose = verbose | |
def __call__(self, task: dict) -> str: | |
question = task["question"] | |
task_id = task["task_id"] | |
file_name = task.get("file_name") | |
file_ext = None | |
user_prompt = question | |
if file_name: | |
file_ext = os.path.splitext(file_name)[-1].removeprefix(".") | |
user_prompt += f"\nTask ID: {task_id}\nFile extension: {file_ext}" | |
user_input = {"messages": [("user", user_prompt)]} | |
if self.verbose: | |
print_stream(self.graph.stream(user_input, stream_mode="values")) | |
else: | |
answer = self.graph.invoke(user_input)["messages"][-1].content | |
return self._clean_answer(answer) | |
def _clean_answer(self, answer: any) -> str: | |
""" | |
Taken from `susmitsil`: | |
https://huggingface.co/spaces/susmitsil/FinalAgenticAssessment/blob/main/main_agent.py | |
Clean up the answer to remove common prefixes and formatting | |
that models often add but that can cause exact match failures. | |
Args: | |
answer: The raw answer from the model | |
Returns: | |
The cleaned answer as a string | |
""" | |
# Convert non-string types to strings | |
if not isinstance(answer, str): | |
# Handle numeric types (float, int) | |
if isinstance(answer, float): | |
# Format floating point numbers properly | |
# Check if it's an integer value in float form (e.g., 12.0) | |
if answer.is_integer(): | |
formatted_answer = str(int(answer)) | |
else: | |
# For currency values that might need formatting | |
if abs(answer) >= 1000: | |
formatted_answer = f"${answer:,.2f}" | |
else: | |
formatted_answer = str(answer) | |
return formatted_answer | |
elif isinstance(answer, int): | |
return str(answer) | |
else: | |
# For any other type | |
return str(answer) | |
# Now we know answer is a string, so we can safely use string methods | |
# Normalize whitespace | |
answer = answer.strip() | |
# Remove common prefixes and formatting that models add | |
prefixes_to_remove = [ | |
"The answer is ", | |
"Answer: ", | |
"Final answer: ", | |
"The result is ", | |
"To answer this question: ", | |
"Based on the information provided, ", | |
"According to the information: ", | |
] | |
for prefix in prefixes_to_remove: | |
if answer.startswith(prefix): | |
answer = answer[len(prefix) :].strip() | |
# Remove quotes if they wrap the entire answer | |
if (answer.startswith('"') and answer.endswith('"')) or ( | |
answer.startswith("'") and answer.endswith("'") | |
): | |
answer = answer[1:-1].strip() | |
return answer | |
if __name__ == "__main__": | |
task = { | |
"task_id": "8e867cd7-cff9-4e6c-867a-ff5ddc2550be", | |
"question": "How many studio albums were published by Mercedes Sosa between 2000 and 2009 (included)? You can use the latest 2022 version of english wikipedia.", | |
"Level": "1", | |
"file_name": "", | |
} | |
agent = ReactAgent(verbose=False) | |
print(agent(task)) | |