Spaces:
Runtime error
Runtime error
"""LangGraph Agent""" | |
import os | |
from dotenv import load_dotenv | |
from langgraph.graph import START, StateGraph, MessagesState | |
from langgraph.prebuilt import tools_condition | |
from langgraph.prebuilt import ToolNode | |
from langchain_google_genai import ChatGoogleGenerativeAI | |
from langchain_groq import ChatGroq | |
from langchain_huggingface import ChatHuggingFace, HuggingFaceEndpoint, HuggingFaceEmbeddings | |
from langchain_tavily import TavilySearch | |
from langchain_community.document_loaders import WikipediaLoader | |
from langchain_community.document_loaders import ArxivLoader | |
from langchain_community.vectorstores import SupabaseVectorStore | |
from langchain_core.messages import SystemMessage, HumanMessage | |
from langchain_core.tools import tool | |
from langchain.tools.retriever import create_retriever_tool | |
from langchain_openai import ChatOpenAI | |
from langchain_anthropic import ChatAnthropic | |
from supabase.client import Client, create_client | |
import re | |
from langchain_community.document_loaders import WikipediaLoader | |
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound | |
import sympy | |
import wolframalpha | |
import sys | |
import requests | |
load_dotenv() | |
def multiply(a: int, b: int) -> int: | |
"""Multiply two numbers. | |
Args: | |
a: first int | |
b: second int | |
""" | |
return a * b | |
def add(a: int, b: int) -> int: | |
"""Add two numbers. | |
Args: | |
a: first int | |
b: second int | |
""" | |
return a + b | |
def subtract(a: int, b: int) -> int: | |
"""Subtract two numbers. | |
Args: | |
a: first int | |
b: second int | |
""" | |
return a - b | |
def divide(a: int, b: int) -> int: | |
"""Divide two numbers. | |
Args: | |
a: first int | |
b: second int | |
""" | |
if b == 0: | |
raise ValueError("Cannot divide by zero.") | |
return a / b | |
def modulus(a: int, b: int) -> int: | |
"""Get the modulus of two numbers. | |
Args: | |
a: first int | |
b: second int | |
""" | |
return a % b | |
def wiki_search(query: str) -> str: | |
"""Search Wikipedia for a query and return maximum 2 results. | |
Args: | |
query: The search query.""" | |
search_docs = WikipediaLoader(query=query, load_max_docs=2).load() | |
formatted_search_docs = "\n\n---\n\n".join( | |
[ | |
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>' | |
for doc in search_docs | |
]) | |
#return {"wiki_results": formatted_search_docs} | |
return formatted_search_docs | |
def web_search(query: str) -> str: | |
"""Search Tavily for a query and return maximum 3 results. | |
Args: | |
query: The search query.""" | |
search_docs = TavilySearch(max_results=3).invoke(query=query) | |
formatted_search_docs = "\n\n---\n\n".join( | |
[ | |
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>' | |
for doc in search_docs | |
]) | |
return {"web_results": formatted_search_docs} | |
def arvix_search(query: str) -> str: | |
"""Search Arxiv for a query and return maximum 3 result. | |
Args: | |
query: The search query.""" | |
search_docs = ArxivLoader(query=query, load_max_docs=3).load() | |
formatted_search_docs = "\n\n---\n\n".join( | |
[ | |
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content[:1000]}\n</Document>' | |
for doc in search_docs | |
]) | |
return {"arvix_results": formatted_search_docs} | |
def filtered_wiki_search(query: str, start_year: int = None, end_year: int = None) -> dict: | |
"""Search Wikipedia for a query and filter results by year if provided.""" | |
search_docs = WikipediaLoader(query=query, load_max_docs=5).load() | |
def contains_year(text, start, end): | |
years = re.findall(r'\b(19\d{2}|20\d{2})\b', text) | |
for y in years: | |
y_int = int(y) | |
if start <= y_int <= end: | |
return True | |
return False | |
filtered_docs = [] | |
for doc in search_docs: | |
if start_year and end_year: | |
if contains_year(doc.page_content, start_year, end_year): | |
filtered_docs.append(doc) | |
else: | |
filtered_docs.append(doc) | |
formatted_search_docs = "\n\n---\n\n".join( | |
[ | |
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>' | |
for doc in filtered_docs | |
]) | |
return {"wiki_results": formatted_search_docs} | |
def wolfram_alpha_query(query: str) -> str: | |
"""Query Wolfram Alpha with the given question and return the result.""" | |
client = wolframalpha.Client(os.environ['WOLFRAM_APP_ID']) | |
res = client.query(query) | |
try: | |
return next(res.results).text | |
except StopIteration: | |
return "No result found." | |
def youtube_transcript(url: str) -> str: | |
"""Fetch YouTube transcript text from a video URL.""" | |
try: | |
video_id = url.split("v=")[-1].split("&")[0] | |
transcript_list = YouTubeTranscriptApi.get_transcript(video_id) | |
transcript = " ".join([segment['text'] for segment in transcript_list]) | |
return transcript | |
except (TranscriptsDisabled, NoTranscriptFound): | |
return "Transcript not available for this video." | |
except Exception as e: | |
return f"Error fetching transcript: {str(e)}" | |
def solve_algebraic_expression(expression: str) -> str: | |
"""Solve or simplify the given algebraic expression.""" | |
try: | |
expr = sympy.sympify(expression) | |
simplified = sympy.simplify(expr) | |
return str(simplified) | |
except Exception as e: | |
return f"Error solving expression: {str(e)}" | |
def run_python_code(code: str) -> str: | |
"""Execute python code and return the result of variable 'result' if defined.""" | |
try: | |
local_vars = {} | |
exec(code, {}, local_vars) | |
if 'result' in local_vars: | |
return str(local_vars['result']) | |
else: | |
return "Code executed successfully but no 'result' variable found." | |
except Exception as e: | |
return f"Error executing code: {str(e)}" | |
def wikidata_query(sparql_query: str) -> str: | |
"""Run a SPARQL query against Wikidata and return the JSON results.""" | |
endpoint = "https://query.wikidata.org/sparql" | |
headers = {"Accept": "application/sparql-results+json"} | |
try: | |
response = requests.get(endpoint, params={"query": sparql_query}, headers=headers) | |
response.raise_for_status() | |
data = response.json() | |
return str(data) # Or format as needed | |
except Exception as e: | |
return f"Error querying Wikidata: {str(e)}" | |
# load the system prompt from the file | |
with open("system_prompt.txt", "r", encoding="utf-8") as f: | |
system_prompt = f.read() | |
# System message | |
sys_msg = SystemMessage(content=system_prompt) | |
# build a retriever | |
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2") # dim=768 | |
supabase: Client = create_client( | |
os.environ.get("SUPABASE_URL"), | |
os.environ.get("SUPABASE_SERVICE_KEY")) | |
vector_store = SupabaseVectorStore( | |
client=supabase, | |
embedding= embeddings, | |
table_name="documents", | |
query_name="match_documents_langchain", | |
) | |
retriever_tool = create_retriever_tool( | |
retriever=vector_store.as_retriever(), | |
name="Question Search", | |
description="A tool to retrieve similar questions from a vector store.", | |
) | |
tools = [ | |
multiply, | |
add, | |
subtract, | |
divide, | |
modulus, | |
wiki_search, | |
filtered_wiki_search, | |
web_search, | |
arvix_search, | |
wolfram_alpha_query, | |
retriever_tool, | |
youtube_transcript, | |
solve_algebraic_expression, | |
run_python_code, | |
wikidata_query | |
] | |
# Build graph function | |
def build_graph(provider: str = "huggingface"): | |
"""Build the graph""" | |
# Load environment variables from .env file | |
if provider == "openai": | |
llm = ChatOpenAI(model_name="gpt-3.5-turbo", temperature=0) | |
elif provider == "anthropic": | |
llm = ChatAnthropic(model="claude-v1", temperature=0) | |
elif provider == "google": | |
llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash", temperature=0) | |
elif provider == "groq": | |
llm = ChatGroq(model="qwen-qwq-32b", temperature=0) # optional : qwen-qwq-32b gemma2-9b-it | |
elif provider == "huggingface": | |
llm = ChatHuggingFace( | |
llm = HuggingFaceEndpoint( | |
endpoint_url="https://api-inference.huggingface.co/models/Meta-DeepLearning/llama-2-7b-chat-hf", | |
temperature=0, | |
), | |
) | |
else: | |
raise ValueError("Invalid provider. Choose 'google', 'groq' or 'huggingface'.") | |
# Bind tools to LLM | |
llm_with_tools = llm.bind_tools(tools) | |
# Node | |
def assistant(state: MessagesState): | |
messages_with_sys = [sys_msg] + state["messages"] | |
return {"messages": [llm_with_tools.invoke(messages_with_sys)]} | |
def retriever(state: MessagesState): | |
"""Retriever node""" | |
similar_question = vector_store.similarity_search(state["messages"][0].content) | |
if not similar_question: | |
# No similar documents found, fallback message | |
example_msg = HumanMessage( | |
content="Sorry, I could not find any similar questions in the vector store." | |
) | |
else: | |
example_msg = HumanMessage( | |
content=f"Here I provide a similar question and answer for reference: \n\n{similar_question[0].page_content}", | |
) | |
return {"messages": [sys_msg] + state["messages"] + [example_msg]} | |
builder = StateGraph(MessagesState) | |
builder.add_node("retriever", retriever) | |
builder.add_node("assistant", assistant) | |
builder.add_node("tools", ToolNode(tools)) | |
builder.add_edge(START, "retriever") | |
builder.add_edge("retriever", "assistant") | |
builder.add_conditional_edges( | |
"assistant", | |
tools_condition, | |
) | |
builder.add_edge("tools", "assistant") | |
# Compile graph | |
return builder.compile() | |
# test | |
if __name__ == "__main__": | |
question = "When was a picture of St. Thomas Aquinas first added to the Wikipedia page on the Principle of double effect?" | |
# Build the graph | |
graph = build_graph(provider="groq") | |
# Run the graph | |
messages = [HumanMessage(content=question)] | |
messages = graph.invoke({"messages": messages}) | |
for m in messages["messages"]: | |
m.pretty_print() | |