Spaces:
Sleeping
Sleeping
from langgraph.prebuilt import create_react_agent | |
from langchain_community.tools.tavily_search import TavilySearchResults | |
from langchain_community.document_loaders import WikipediaLoader | |
from langchain_community.document_loaders import ArxivLoader | |
from dotenv import load_dotenv, find_dotenv | |
from langchain_core.tools import tool | |
from langchain_huggingface import HuggingFaceEmbeddings | |
from langchain_community.vectorstores import SupabaseVectorStore | |
from langchain_core.messages import HumanMessage | |
from supabase import create_client, Client | |
import os | |
load_dotenv(find_dotenv()) | |
DEFAULT_PROMPT = """ | |
You are a helpful assistant tasked with answering questions using a set of tools. | |
Now, I will ask you a question. Report your thoughts, and finish your answer with the following template: | |
FINAL ANSWER: [YOUR FINAL ANSWER]. | |
YOUR FINAL ANSWER should be a number OR as few words as possible OR a comma separated list of numbers and/or strings. If you are asked for a number, don't use comma to write your number neither use units such as $ or percent sign unless specified otherwise. If you are asked for a string, don't use articles, neither abbreviations (e.g. for cities), and write the digits in plain text unless specified otherwise. If you are asked for a comma separated list, apply the above rules depending of whether the element to be put in the list is a number or a string. | |
Your answer should only start with "FINAL ANSWER: ", then follows with the answer. | |
""" | |
def wiki_search(query: str) -> str: | |
"""Search Wikipedia for a query and return maximum 2 results. | |
Args: | |
query: The search query.""" | |
search_docs = WikipediaLoader(query=query, load_max_docs=2).load() | |
formatted_search_docs = "\n\n---\n\n".join( | |
[ | |
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>' | |
for doc in search_docs | |
] | |
) | |
return {"wiki_results": formatted_search_docs} | |
def web_search(query: str) -> str: | |
"""Search Tavily for a query and return maximum 3 results. | |
Args: | |
query: The search query.""" | |
search_docs = TavilySearchResults(max_results=3).invoke(query=query) | |
formatted_search_docs = "\n\n---\n\n".join( | |
[ | |
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>' | |
for doc in search_docs | |
] | |
) | |
return {"web_results": formatted_search_docs} | |
def arvix_search(query: str) -> str: | |
"""Search Arxiv for a query and return maximum 3 result. | |
Args: | |
query: The search query.""" | |
search_docs = ArxivLoader(query=query, load_max_docs=3).load() | |
formatted_search_docs = "\n\n---\n\n".join( | |
[ | |
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content[:1000]}\n</Document>' | |
for doc in search_docs | |
] | |
) | |
return {"arvix_results": formatted_search_docs} | |
def multiply(a: int, b: int) -> int: | |
"""Multiply two numbers. | |
Args: | |
a: first int | |
b: second int | |
""" | |
return a * b | |
def add(a: int, b: int) -> int: | |
"""Add two numbers. | |
Args: | |
a: first int | |
b: second int | |
""" | |
return a + b | |
def subtract(a: int, b: int) -> int: | |
"""Subtract two numbers. | |
Args: | |
a: first int | |
b: second int | |
""" | |
return a - b | |
def divide(a: int, b: int) -> int: | |
"""Divide two numbers. | |
Args: | |
a: first int | |
b: second int | |
""" | |
if b == 0: | |
raise ValueError("Cannot divide by zero.") | |
return a / b | |
def modulus(a: int, b: int) -> int: | |
"""Get the modulus of two numbers. | |
Args: | |
a: first int | |
b: second int | |
""" | |
return a % b | |
class CustomAgent: | |
def __init__(self): | |
print("CustomAgent initialized.") | |
# Initialize embeddings and vector store | |
self.embeddings = HuggingFaceEmbeddings( | |
model_name="sentence-transformers/all-mpnet-base-v2" | |
) | |
self.supabase: Client = create_client( | |
os.environ.get("SUPABASE_URL"), os.environ.get("SUPABASE_SERVICE_ROLE_KEY") | |
) | |
self.vector_store = SupabaseVectorStore( | |
client=self.supabase, | |
embedding=self.embeddings, | |
table_name="documents_1", | |
query_name="match_documents_1", | |
) | |
# Create the agent | |
self.agent = create_react_agent( | |
model="openai:gpt-4.1", | |
tools=[ | |
web_search, | |
add, | |
subtract, | |
multiply, | |
divide, | |
modulus, | |
wiki_search, | |
arvix_search, | |
], | |
prompt=DEFAULT_PROMPT, | |
) | |
def retriever(self, query: str): | |
"""Retriever""" | |
similar_question = self.vector_store.similarity_search(query) | |
return HumanMessage( | |
content=f"Here I provide a similar question and answer for reference, you can use it to answer the question: \n\n{similar_question[0].page_content}", | |
) | |
def __call__(self, question: str) -> str: | |
"""Run the agent on a question and return the answer.""" | |
print(f"CustomAgent received question (first 50 chars): {question[:50]}...") | |
try: | |
answer = self.agent.invoke( | |
{ | |
"messages": [ | |
self.retriever(question), | |
HumanMessage(content=question), | |
] | |
} | |
) | |
result = answer["messages"][-1].content | |
if "FINAL ANSWER: " in result: | |
final_answer_start = result.find("FINAL ANSWER: ") + len( | |
"FINAL ANSWER: " | |
) | |
extracted_answer = result[final_answer_start:].strip() | |
print(f"CustomAgent extracted answer: {extracted_answer}") | |
return extracted_answer | |
else: | |
print( | |
f"CustomAgent returning full answer (no FINAL ANSWER found): {result}" | |
) | |
return result | |
except Exception as e: | |
print(f"Error in CustomAgent: {e}") | |
return f"Error: {e}" | |
if __name__ == "__main__": | |
agent = CustomAgent() | |
agent( | |
"How many studio albums were published by Mercedes Sosa between 2000 and 2009 (included)? You can use the latest 2022 version of english wikipedia." | |
) | |
agent( | |
"How many at bats did the Yankee with the most walks in the 1977 regular season have that same season?" | |
) | |
agent( | |
"In the video https://www.youtube.com/watch?v=L1vXCYZAYYM, what is the highest number of bird species to be on camera simultaneously?" | |
) |