Spaces:
Sleeping
Sleeping
"""LangGraph Agent with FAISS Vector Store and Custom Tools""" | |
import os, time, random | |
from dotenv import load_dotenv | |
from typing import List, Dict, Any, TypedDict, Annotated | |
import operator | |
# LangGraph imports | |
from langgraph.graph import START, StateGraph, MessagesState | |
from langgraph.prebuilt import tools_condition | |
from langgraph.prebuilt import ToolNode | |
from langgraph.checkpoint.memory import MemorySaver | |
# LangChain imports | |
from langchain_core.messages import SystemMessage, HumanMessage | |
from langchain_core.tools import tool | |
from langchain_groq import ChatGroq | |
from langchain_google_genai import ChatGoogleGenerativeAI | |
from langchain_nvidia_ai_endpoints import ChatNVIDIA | |
from langchain_community.tools.tavily_search import TavilySearchResults | |
from langchain_community.document_loaders import WikipediaLoader, ArxivLoader | |
from langchain_community.vectorstores import FAISS | |
from langchain_nvidia_ai_endpoints import NVIDIAEmbeddings | |
from langchain.tools.retriever import create_retriever_tool | |
from langchain_text_splitters import RecursiveCharacterTextSplitter | |
from langchain_community.document_loaders import JSONLoader | |
load_dotenv() | |
# Advanced Rate Limiter (SILENT) | |
class AdvancedRateLimiter: | |
def __init__(self, requests_per_minute: int): | |
self.requests_per_minute = requests_per_minute | |
self.request_times = [] | |
def wait_if_needed(self): | |
current_time = time.time() | |
# Clean old requests (older than 1 minute) | |
self.request_times = [t for t in self.request_times if current_time - t < 60] | |
# Check if we need to wait | |
if len(self.request_times) >= self.requests_per_minute: | |
wait_time = 60 - (current_time - self.request_times[0]) + random.uniform(2, 8) | |
time.sleep(wait_time) | |
# Record this request | |
self.request_times.append(current_time) | |
# Initialize rate limiters | |
groq_limiter = AdvancedRateLimiter(requests_per_minute=30) | |
gemini_limiter = AdvancedRateLimiter(requests_per_minute=2) | |
nvidia_limiter = AdvancedRateLimiter(requests_per_minute=5) | |
# Custom Tools | |
def multiply(a: int, b: int) -> int: | |
"""Multiply two numbers. | |
Args: | |
a: first int | |
b: second int | |
""" | |
return a * b | |
def add(a: int, b: int) -> int: | |
"""Add two numbers. | |
Args: | |
a: first int | |
b: second int | |
""" | |
return a + b | |
def subtract(a: int, b: int) -> int: | |
"""Subtract two numbers. | |
Args: | |
a: first int | |
b: second int | |
""" | |
return a - b | |
def divide(a: int, b: int) -> float: | |
"""Divide two numbers. | |
Args: | |
a: first int | |
b: second int | |
""" | |
if b == 0: | |
raise ValueError("Cannot divide by zero.") | |
return a / b | |
def modulus(a: int, b: int) -> int: | |
"""Get the modulus of two numbers. | |
Args: | |
a: first int | |
b: second int | |
""" | |
return a % b | |
def wiki_search(query: str) -> str: | |
"""Search Wikipedia for a query and return maximum 2 results. | |
Args: | |
query: The search query.""" | |
try: | |
time.sleep(random.uniform(1, 3)) | |
search_docs = WikipediaLoader(query=query, load_max_docs=2).load() | |
formatted_search_docs = "\n\n---\n\n".join( | |
[ | |
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>' | |
for doc in search_docs | |
]) | |
return formatted_search_docs | |
except Exception as e: | |
return f"Wikipedia search failed: {str(e)}" | |
def web_search(query: str) -> str: | |
"""Search Tavily for a query and return maximum 3 results. | |
Args: | |
query: The search query.""" | |
try: | |
time.sleep(random.uniform(2, 5)) | |
search_docs = TavilySearchResults(max_results=3).invoke(query=query) | |
formatted_search_docs = "\n\n---\n\n".join( | |
[ | |
f'<Document source="{doc.get("url", "")}" />\n{doc.get("content", "")}\n</Document>' | |
for doc in search_docs | |
]) | |
return formatted_search_docs | |
except Exception as e: | |
return f"Web search failed: {str(e)}" | |
def arvix_search(query: str) -> str: | |
"""Search Arxiv for a query and return maximum 3 result. | |
Args: | |
query: The search query.""" | |
try: | |
time.sleep(random.uniform(1, 4)) | |
search_docs = ArxivLoader(query=query, load_max_docs=3).load() | |
formatted_search_docs = "\n\n---\n\n".join( | |
[ | |
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content[:1000]}\n</Document>' | |
for doc in search_docs | |
]) | |
return formatted_search_docs | |
except Exception as e: | |
return f"ArXiv search failed: {str(e)}" | |
# Load and process JSONL data for FAISS vector store | |
def setup_faiss_vector_store(): | |
"""Setup FAISS vector database from JSONL metadata""" | |
try: | |
jq_schema = """ | |
{ | |
page_content: .Question, | |
metadata: { | |
task_id: .task_id, | |
Level: .Level, | |
Final_answer: ."Final answer", | |
file_name: .file_name, | |
Steps: .["Annotator Metadata"].Steps, | |
Number_of_steps: .["Annotator Metadata"]["Number of steps"], | |
How_long: .["Annotator Metadata"]["How long did this take?"], | |
Tools: .["Annotator Metadata"].Tools, | |
Number_of_tools: .["Annotator Metadata"]["Number of tools"] | |
} | |
} | |
""" | |
# Load documents | |
json_loader = JSONLoader(file_path="metadata.jsonl", jq_schema=jq_schema, json_lines=True, text_content=False) | |
json_docs = json_loader.load() | |
# Split documents | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=200) | |
json_chunks = text_splitter.split_documents(json_docs) | |
# Create FAISS vector store | |
embeddings = NVIDIAEmbeddings( | |
model="nvidia/nv-embedqa-e5-v5", | |
api_key=os.getenv("NVIDIA_API_KEY") | |
) | |
vector_store = FAISS.from_documents(json_chunks, embeddings) | |
return vector_store | |
except Exception as e: | |
print(f"FAISS vector store setup failed: {e}") | |
return None | |
# Load system prompt | |
try: | |
with open("system_prompt.txt", "r", encoding="utf-8") as f: | |
system_prompt = f.read() | |
except FileNotFoundError: | |
system_prompt = """You are a helpful assistant tasked with answering questions using a set of tools. | |
Now, I will ask you a question. Report your thoughts, and finish your answer with the following template: | |
FINAL ANSWER: [YOUR FINAL ANSWER]. | |
YOUR FINAL ANSWER should be a number OR as few words as possible OR a comma separated list of numbers and/or strings. If you are asked for a number, don't use comma to write your number neither use units such as $ or percent sign unless specified otherwise. If you are asked for a string, don't use articles, neither abbreviations (e.g. for cities), and write the digits in plain text unless specified otherwise. If you are asked for a comma separated list, apply the above rules depending of whether the element to be put in the list is a number or a string. | |
Your answer should only start with "FINAL ANSWER: ", then follows with the answer.""" | |
# System message | |
sys_msg = SystemMessage(content=system_prompt) | |
# Setup FAISS vector store and retriever | |
vector_store = setup_faiss_vector_store() | |
if vector_store: | |
retriever = vector_store.as_retriever(search_type="similarity", search_kwargs={"k": 3}) | |
retriever_tool = create_retriever_tool( | |
retriever=retriever, | |
name="Question_Search", | |
description="A tool to retrieve similar questions from a vector store.", | |
) | |
else: | |
retriever_tool = None | |
# All tools | |
all_tools = [ | |
multiply, | |
add, | |
subtract, | |
divide, | |
modulus, | |
wiki_search, | |
web_search, | |
arvix_search, | |
] | |
if retriever_tool: | |
all_tools.append(retriever_tool) | |
# Build graph function | |
def build_graph(provider: str = "groq"): | |
"""Build the LangGraph with rate limiting""" | |
# Initialize LLMs with best free models | |
if provider == "google": | |
llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash-thinking-exp", temperature=0) | |
elif provider == "groq": | |
llm = ChatGroq(model="llama-3.3-70b-versatile", temperature=0) | |
elif provider == "nvidia": | |
llm = ChatNVIDIA(model="meta/llama-3.1-70b-instruct", temperature=0) | |
else: | |
raise ValueError("Invalid provider. Choose 'google', 'groq' or 'nvidia'.") | |
# Bind tools to LLM | |
llm_with_tools = llm.bind_tools(all_tools) | |
# Node functions | |
def assistant(state: MessagesState): | |
"""Assistant node with rate limiting""" | |
if provider == "groq": | |
groq_limiter.wait_if_needed() | |
elif provider == "google": | |
gemini_limiter.wait_if_needed() | |
elif provider == "nvidia": | |
nvidia_limiter.wait_if_needed() | |
return {"messages": [llm_with_tools.invoke(state["messages"])]} | |
def retriever_node(state: MessagesState): | |
"""Retriever node""" | |
if vector_store and len(state["messages"]) > 0: | |
try: | |
similar_questions = vector_store.similarity_search(state["messages"][-1].content, k=1) | |
if similar_questions: | |
example_msg = HumanMessage( | |
content=f"Here I provide a similar question and answer for reference: \n\n{similar_questions[0].page_content}", | |
) | |
return {"messages": [sys_msg] + state["messages"] + [example_msg]} | |
except Exception as e: | |
print(f"Retriever error: {e}") | |
return {"messages": [sys_msg] + state["messages"]} | |
# Build graph | |
builder = StateGraph(MessagesState) | |
builder.add_node("retriever", retriever_node) | |
builder.add_node("assistant", assistant) | |
builder.add_node("tools", ToolNode(all_tools)) | |
builder.add_edge(START, "retriever") | |
builder.add_edge("retriever", "assistant") | |
builder.add_conditional_edges("assistant", tools_condition) | |
builder.add_edge("tools", "assistant") | |
# Compile graph with memory | |
memory = MemorySaver() | |
return builder.compile(checkpointer=memory) | |
# Test | |
if __name__ == "__main__": | |
question = "What are the names of the US presidents who were assassinated?" | |
# Build the graph | |
graph = build_graph(provider="groq") | |
# Run the graph | |
messages = [HumanMessage(content=question)] | |
config = {"configurable": {"thread_id": "test_thread"}} | |
result = graph.invoke({"messages": messages}, config) | |
for m in result["messages"]: | |
m.pretty_print() | |