Spaces:
Sleeping
Sleeping
import tempfile | |
from urllib.parse import urlparse | |
from langchain.schema import HumanMessage, AIMessage, SystemMessage | |
from langchain_openai import ChatOpenAI | |
from langchain_core.messages import AnyMessage, SystemMessage | |
from langchain_core.tools import tool | |
from langchain_community.document_loaders import WikipediaLoader | |
from langchain_community.document_loaders import ArxivLoader | |
from langchain_community.tools.tavily_search import TavilySearchResults | |
from langchain.tools.retriever import create_retriever_tool | |
from langgraph.graph.message import add_messages | |
from langgraph.graph import START, StateGraph, MessagesState, END | |
from langgraph.prebuilt import tools_condition, ToolNode | |
import os | |
from dotenv import load_dotenv | |
from typing import TypedDict, Annotated, Optional | |
from langchain_community.tools import DuckDuckGoSearchResults | |
from langchain_huggingface import ( | |
ChatHuggingFace, | |
HuggingFaceEndpoint, | |
HuggingFaceEmbeddings, | |
) | |
from langchain_google_genai import ChatGoogleGenerativeAI | |
import requests | |
from huggingface_hub import login | |
load_dotenv() | |
def save_and_read_file(content: str, filename: Optional[str] = None) -> str: | |
""" | |
Save content to a temporary file and return the path. | |
Useful for processing files from the GAIA API. | |
Args: | |
content: The content to save to the file | |
filename: Optional filename, will generate a random name if not provided | |
Returns: | |
Path to the saved file | |
""" | |
temp_dir = tempfile.gettempdir() | |
if filename is None: | |
temp_file = tempfile.NamedTemporaryFile(delete=False) | |
filepath = temp_file.name | |
else: | |
filepath = os.path.join(temp_dir, filename) | |
# Write content to the file | |
with open(filepath, "w") as f: | |
f.write(content) | |
return f"File saved to {filepath}. You can read this file to process its contents." | |
def wiki_search(query: str) -> str: | |
"""Search Wikipedia for a query and return maximum 2 results. | |
Args: | |
query: The search query.""" | |
search_docs = WikipediaLoader(query=query, load_max_docs=2).load() | |
formatted_search_docs = "\n\n---\n\n".join( | |
[ | |
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>' | |
for doc in search_docs | |
] | |
) | |
return {"wiki_results": formatted_search_docs} | |
def web_search(query: str) -> str: | |
"""Search Tavily for a query and return maximum 3 results. | |
Args: | |
query: The search query.""" | |
search_docs = TavilySearchResults(max_results=3).invoke(query=query) | |
formatted_search_docs = "\n\n---\n\n".join( | |
[ | |
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>' | |
for doc in search_docs | |
] | |
) | |
return {"web_results": formatted_search_docs} | |
def arvix_search(query: str) -> str: | |
"""Search Arxiv for a query and return maximum 3 result. | |
Args: | |
query: The search query.""" | |
search_docs = ArxivLoader(query=query, load_max_docs=3).load() | |
formatted_search_docs = "\n\n---\n\n".join( | |
[ | |
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content[:1000]}\n</Document>' | |
for doc in search_docs | |
] | |
) | |
return {"arvix_results": formatted_search_docs} | |
def download_file_from_url(url: str, filename: Optional[str] = None) -> str: | |
""" | |
Download a file from a URL and save it to a temporary location. | |
Args: | |
url: The URL to download from | |
filename: Optional filename, will generate one based on URL if not provided | |
Returns: | |
Path to the downloaded file | |
""" | |
try: | |
# Parse URL to get filename if not provided | |
if not filename: | |
path = urlparse(url).path | |
filename = os.path.basename(path) | |
if not filename: | |
# Generate a random name if we couldn't extract one | |
import uuid | |
filename = f"downloaded_{uuid.uuid4().hex[:8]}" | |
# Create temporary file | |
temp_dir = tempfile.gettempdir() | |
filepath = os.path.join(temp_dir, filename) | |
# Download the file | |
response = requests.get(url, stream=True) | |
response.raise_for_status() | |
# Save the file | |
with open(filepath, "wb") as f: | |
for chunk in response.iter_content(chunk_size=8192): | |
f.write(chunk) | |
return f"File downloaded to {filepath}. You can now process this file." | |
except Exception as e: | |
return f"Error downloading file: {str(e)}" | |
def extract_text_from_image(image_path: str) -> str: | |
""" | |
Extract text from an image using pytesseract (if available). | |
Args: | |
image_path: Path to the image file | |
Returns: | |
Extracted text or error message | |
""" | |
try: | |
# Try to import pytesseract | |
import pytesseract | |
from PIL import Image | |
# Open the image | |
image = Image.open(image_path) | |
# Extract text | |
text = pytesseract.image_to_string(image) | |
return f"Extracted text from image:\n\n{text}" | |
except ImportError: | |
return "Error: pytesseract is not installed. Please install it with 'pip install pytesseract' and ensure Tesseract OCR is installed on your system." | |
except Exception as e: | |
return f"Error extracting text from image: {str(e)}" | |
def analyze_csv_file(file_path: str, query: str) -> str: | |
""" | |
Analyze a CSV file using pandas and answer a question about it. | |
Args: | |
file_path: Path to the CSV file | |
query: Question about the data | |
Returns: | |
Analysis result or error message | |
""" | |
try: | |
import pandas as pd | |
# Read the CSV file | |
df = pd.read_csv(file_path) | |
# Run various analyses based on the query | |
result = f"CSV file loaded with {len(df)} rows and {len(df.columns)} columns.\n" | |
result += f"Columns: {', '.join(df.columns)}\n\n" | |
# Add summary statistics | |
result += "Summary statistics:\n" | |
result += str(df.describe()) | |
return result | |
except ImportError: | |
return "Error: pandas is not installed. Please install it with 'pip install pandas'." | |
except Exception as e: | |
return f"Error analyzing CSV file: {str(e)}" | |
def analyze_excel_file(file_path: str, query: str) -> str: | |
""" | |
Analyze an Excel file using pandas and answer a question about it. | |
Args: | |
file_path: Path to the Excel file | |
query: Question about the data | |
Returns: | |
Analysis result or error message | |
""" | |
try: | |
import pandas as pd | |
# Read the Excel file | |
df = pd.read_excel(file_path) | |
# Run various analyses based on the query | |
result = ( | |
f"Excel file loaded with {len(df)} rows and {len(df.columns)} columns.\n" | |
) | |
result += f"Columns: {', '.join(df.columns)}\n\n" | |
# Add summary statistics | |
result += "Summary statistics:\n" | |
result += str(df.describe()) | |
return result | |
except ImportError: | |
return "Error: pandas and openpyxl are not installed. Please install them with 'pip install pandas openpyxl'." | |
except Exception as e: | |
return f"Error analyzing Excel file: {str(e)}" | |
# Initialize the DuckDuckGo search tool | |
search_tool = DuckDuckGoSearchResults() | |
# # Load LLM model | |
# llm = ChatOpenAI( | |
# model="gpt-4o", | |
# base_url="https://models.inference.ai.azure.com", | |
# api_key=os.environ["GITHUB_TOKEN"], | |
# temperature=0.2, | |
# max_tokens=4096, | |
# ) | |
# llm = ChatHuggingFace( | |
# llm=HuggingFaceEndpoint( | |
# repo_id="Qwen/Qwen3-4B", | |
# # repo_id="meta-llama/Llama-3-70B-Instruct", | |
# temperature=0, | |
# huggingfacehub_api_token=os.environ["HUGGINGFACEHUB_API_TOKEN"], | |
# ), | |
# verbose=True, | |
# ) | |
llm = ChatGoogleGenerativeAI( | |
model="gemini-2.0-flash-exp", google_api_key=os.environ["GOOGLE_API_KEY"] | |
) | |
tools = [ | |
analyze_csv_file, | |
analyze_excel_file, | |
extract_text_from_image, | |
download_file_from_url, | |
save_and_read_file, | |
web_search, | |
wiki_search, | |
arvix_search, | |
] | |
# Bind the tools to the LLM | |
model_with_tools = llm.bind_tools(tools) | |
tool_node = ToolNode(tools) | |
class AgentState(TypedDict): | |
"""State of the agent.""" | |
input_file: Optional[str] | |
messages: Annotated[list[AnyMessage], add_messages] | |
def build_agent_workflow(): | |
"""Build the agent workflow.""" | |
def call_model(state: AgentState): | |
print("State:", state["messages"]) | |
question = state["messages"][-1].content | |
context = f""" | |
You are a helpful assistant tasked with answering questions using a set of tools. | |
""" | |
# System message | |
if state.get("input_file"): | |
try: | |
with open(state.get("input_file"), "r") as f: | |
file_content = f.read() | |
print("File content:", file_content) | |
# Determine file type from extension | |
file_ext = os.path.splitext(state.get("input_file"))[1].lower() | |
context = f""" | |
Question: {question} | |
This question has an associated file. Here is the file content: | |
```{file_ext} | |
{file_content} | |
``` | |
Analyze the file content above to answer the question.""" | |
except Exception as file_e: | |
context = f""" Question: {state["message"]} | |
This question has an associated file at path: {state.get("input_file")} | |
However, there was an error reading the file: {file_e} | |
You can still try to answer the question based on the information provided. | |
""" | |
if question.startswith(".") or ".rewsna eht sa" in question: | |
print("Reversed text detected.") | |
print(state.get("messages")[::-1]) | |
context = f""" | |
This question appears to be in reversed text. your task to reverse the sentence. Here's the reversed example: | |
.rewsna eht sa "tfel" drow eht fo etisoppo eht etirw ,ecnetnes siht dnatsrednu uoy fI | |
and the answer is: | |
"If you understand this sentence, write the opposite of the word "left" as the answer." | |
Now rewrite in to proper formate the {question}. Remember to format your answer exactly as requested. | |
""" | |
system_prompt = SystemMessage( | |
f"""{context} | |
When answering, provide ONLY the precise answer requested. | |
Do not include explanations, steps, reasoning, or additional text. | |
Be direct and specific. GAIA benchmark requires exact matching answers. | |
For example, if asked "What is the capital of France?", respond simply with "Paris". | |
""" | |
) | |
return { | |
"messages": [model_with_tools.invoke([system_prompt] + state["messages"])], | |
# "input_file": state["input_file"], | |
} | |
# Define the state graph | |
workflow = StateGraph(MessagesState) | |
workflow.add_node("agent", call_model) | |
workflow.add_node("tools", tool_node) | |
workflow.add_edge(START, "agent") | |
workflow.add_conditional_edges("agent", tools_condition) | |
workflow.add_edge("tools", "agent") | |
app = workflow.compile() | |
return app | |
if __name__ == "__main__": | |
question = '.rewsna eht sa "tfel" drow eht fo etisoppo eht etirw ,ecnetnes siht dnatsrednu uoy fI' | |
# Build the graph | |
graph = build_agent_workflow() | |
# Run the graph | |
messages = [HumanMessage(content=question)] | |
messages = graph.invoke({"messages": messages, "input_file": None}) | |
for m in messages["messages"]: | |
m.pretty_print() | |