Spaces:
Sleeping
Sleeping
import os | |
import time | |
from typing import TypedDict, Annotated, Sequence | |
import operator | |
from langgraph.graph import StateGraph, END | |
from langgraph.prebuilt import ToolNode | |
from langchain_google_genai import ChatGoogleGenerativeAI | |
from langchain_community.tools import DuckDuckGoSearchRun | |
from langchain_community.document_loaders import WikipediaLoader | |
from langchain_core.messages import BaseMessage, SystemMessage, AIMessage | |
from langchain.agents import Tool | |
from langchain_core.tools import tool | |
# Load environment variables | |
google_api_key = os.getenv("GOOGLE_API_KEY") or os.environ.get("GOOGLE_API_KEY") | |
if not google_api_key: | |
raise ValueError("Missing GOOGLE_API_KEY environment variable") | |
# Load system prompt | |
with open("System_Prompt.txt", "r", encoding="utf-8") as f: | |
system_prompt = f.read() | |
sys_msg = SystemMessage(content=system_prompt) | |
# Tool Definitions | |
def multiply(a: int, b: int) -> int: | |
"""Multiply two integers together.""" | |
return a * b | |
def add(a: int, b: int) -> int: | |
"""Add two integers together.""" | |
return a + b | |
def subtract(a: int, b: int) -> int: | |
"""Subtract b from a.""" | |
return a - b | |
def divide(a: int, b: int) -> float: | |
"""Divide a by b. Returns float. Raises error if b is zero.""" | |
if b == 0: | |
raise ValueError("Cannot divide by zero.") | |
return a / b | |
def wiki_search(query: str) -> str: | |
"""Search Wikipedia and return up to 2 relevant documents.""" | |
docs = WikipediaLoader(query=query, load_max_docs=2).load() | |
if not docs: | |
return "No Wikipedia results found." | |
return "\n\n".join([d.page_content[:1000] for d in docs]) | |
# Tool inventory with valid names | |
tools = [ | |
Tool(name="Math_Multiply", func=multiply, description="Multiplies two integers"), | |
Tool(name="Math_Add", func=add, description="Adds two integers"), | |
Tool(name="Math_Subtract", func=subtract, description="Subtracts two integers"), | |
Tool(name="Math_Divide", func=divide, description="Divides two numbers"), | |
Tool(name="Search_Wikipedia", func=wiki_search, description="Searches Wikipedia"), | |
Tool( | |
name="Search_Web", | |
func=DuckDuckGoSearchRun().run, | |
description="Searches the web using DuckDuckGo" | |
) | |
] | |
# Graph Definition | |
class AgentState(TypedDict): | |
"""State definition for the agent workflow""" | |
messages: Annotated[Sequence[BaseMessage], operator.add] | |
def build_graph(): | |
"""Constructs and compiles the LangGraph workflow""" | |
# Initialize LLM with Gemini 2.0 Flash and rate limiting | |
llm = ChatGoogleGenerativeAI( | |
model="gemini-2.0-flash-exp", | |
temperature=0.3, | |
google_api_key=google_api_key, | |
max_retries=3, # 添加内置重试 | |
request_timeout=30 # 设置超时 | |
) | |
llm_with_tools = llm.bind_tools(tools) | |
# Custom tool condition to avoid '__end__' issues | |
def custom_tools_condition(state: AgentState): | |
"""Determines whether to use tools or end the workflow""" | |
last_message = state["messages"][-1] | |
# Check if it's an AI message with tool calls | |
if hasattr(last_message, "tool_calls") and last_message.tool_calls: | |
return "use_tools" | |
# Check if it's an error message | |
if isinstance(last_message, AIMessage) and "ERROR" in last_message.content: | |
return "end" | |
# Check if it's a final answer | |
if "FINAL ANSWER" in last_message.content: | |
return "end" | |
return "use_tools" | |
# Node definitions with error handling | |
def agent_node(state: AgentState): | |
"""Main agent node that processes messages with retry logic""" | |
try: | |
# Add rate limiting | |
time.sleep(1) # 1 second delay between requests | |
response = llm_with_tools.invoke(state["messages"]) | |
return {"messages": [response]} | |
except Exception as e: | |
# Handle specific errors | |
error_type = "UNKNOWN" | |
if "429" in str(e): | |
error_type = "QUOTA_EXCEEDED" | |
elif "400" in str(e): | |
error_type = "INVALID_REQUEST" | |
error_msg = f"AGENT ERROR ({error_type}): {str(e)[:200]}" | |
return {"messages": [AIMessage(content=error_msg)]} | |
# Graph construction | |
workflow = StateGraph(AgentState) | |
# Add nodes to the workflow | |
workflow.add_node("agent", agent_node) | |
workflow.add_node("tools", ToolNode(tools)) | |
# Configure graph flow with clear endpoints | |
workflow.set_entry_point("agent") | |
# Add conditional edges with custom condition | |
workflow.add_conditional_edges( | |
"agent", | |
custom_tools_condition, | |
{ | |
"use_tools": "tools", | |
"end": END # Directly to END | |
} | |
) | |
# Add edge from tools back to agent | |
workflow.add_edge("tools", "agent") | |
# Ensure END has no incoming edges except through condition | |
return workflow.compile() | |
# Initialize the agent graph | |
agent_graph = build_graph() |