import os import time import json import re import calendar from datetime import datetime from dotenv import load_dotenv from langgraph.graph import StateGraph, END from langchain_google_genai import ChatGoogleGenerativeAI from langchain_community.tools import DuckDuckGoSearchRun from langchain_community.document_loaders import WikipediaLoader, ArxivLoader from langchain_core.messages import SystemMessage, AIMessage, HumanMessage from langchain_core.tools import tool from tenacity import retry, stop_after_attempt, wait_exponential from typing import TypedDict, Annotated, Sequence, List, Dict, Union import operator # Load environment variables load_dotenv() google_api_key = os.getenv("GOOGLE_API_KEY") or os.environ.get("GOOGLE_API_KEY") if not google_api_key: raise ValueError("Missing GOOGLE_API_KEY environment variable") # --- Math Tools --- @tool def multiply(a: int, b: int) -> int: """Multiply two integers.""" return a * b @tool def add(a: int, b: int) -> int: """Add two integers.""" return a + b @tool def subtract(a: int, b: int) -> int: """Subtract b from a.""" return a - b @tool def divide(a: int, b: int) -> float: """Divide a by b, error on zero.""" if b == 0: raise ValueError("Cannot divide by zero.") return a / b @tool def modulus(a: int, b: int) -> int: """Compute a mod b.""" return a % b # --- Browser Tools --- @tool def wiki_search(query: str) -> str: """Search Wikipedia and return up to 3 relevant documents.""" try: docs = WikipediaLoader(query=query, load_max_docs=3).load() if not docs: return "No Wikipedia results found." results = [] for doc in docs: title = doc.metadata.get('title', 'Unknown Title') content = doc.page_content[:2000] # Limit content length results.append(f"Title: {title}\nContent: {content}") return "\n\n---\n\n".join(results) except Exception as e: return f"Wikipedia search error: {str(e)}" @tool def arxiv_search(query: str) -> str: """Search Arxiv and return up to 3 relevant papers.""" try: docs = ArxivLoader(query=query, load_max_docs=3).load() if not docs: return "No arXiv papers found." results = [] for doc in docs: title = doc.metadata.get('Title', 'Unknown Title') authors = ", ".join(doc.metadata.get('Authors', [])) content = doc.page_content[:2000] # Limit content length results.append(f"Title: {title}\nAuthors: {authors}\nContent: {content}") return "\n\n---\n\n".join(results) except Exception as e: return f"arXiv search error: {str(e)}" @tool def web_search(query: str) -> str: """Search the web using DuckDuckGo and return top results.""" try: search = DuckDuckGoSearchRun() result = search.run(query) return f"Web search results for '{query}':\n{result[:2000]}" # Limit content length except Exception as e: return f"Web search error: {str(e)}" # --- Enhanced Tools --- @tool def filter_by_year(items: List[Dict], year_range: str) -> List[Dict]: """Filter items containing year information, returning only those within specified range""" try: start_year, end_year = map(int, year_range.split('-')) filtered = [] for item in items: # Extract year from different possible keys year = item.get('year') or item.get('release_year') or item.get('date') if not year: continue # Convert to integer if possible if isinstance(year, str) and year.isdigit(): year = int(year) if isinstance(year, int) and start_year <= year <= end_year: filtered.append(item) return filtered except Exception as e: return f"Filter error: {str(e)}" @tool def extract_albums(text: str) -> List[Dict]: """Extract album information from text, automatically detecting names and years""" albums = [] # Pattern 1: Album Name (Year) pattern1 = r'\"?(.+?)\"?\s*[\(\[](\d{4})[\)\]]' # Pattern 2: Year: Album Name pattern2 = r'(\d{4}):\s*\"?(.+?)\"?[\n\,]' for pattern in [pattern1, pattern2]: matches = re.findall(pattern, text) for match in matches: # Handle different match group orders if len(match) == 2: if match[0].isdigit(): # Year comes first year, name = match else: # Name comes first name, year = match try: year = int(year) albums.append({"name": name.strip(), "year": year}) except ValueError: continue return albums @tool def compare_values(a: Union[str, int, float], b: Union[str, int, float]) -> str: """Compare two values with automatic type detection (number/date/string)""" try: # Attempt numeric comparison a_num = float(a) if isinstance(a, str) else a b_num = float(b) if isinstance(b, str) else b if a_num == b_num: return "equal" return "greater" if a_num > b_num else "less" except (ValueError, TypeError): pass # Attempt date comparison date_formats = [ "%Y-%m-%d", "%d %B %Y", "%B %d, %Y", "%m/%d/%Y", "%Y", "%B %Y", "%b %d, %Y", "%d/%m/%Y" ] for fmt in date_formats: try: a_date = datetime.strptime(str(a), fmt) b_date = datetime.strptime(str(b), fmt) if a_date == b_date: return "equal" return "greater" if a_date > b_date else "less" except ValueError: continue # String comparison as fallback a_str = str(a).lower().strip() b_str = str(b).lower().strip() if a_str == b_str: return "equal" return "greater" if a_str > b_str else "less" @tool def count_items(items: List) -> int: """Count the number of items in a list""" return len(items) # --- Load system prompt --- with open("system_prompt.txt", "r", encoding="utf-8") as f: system_prompt = f.read() # --- Tool Setup --- tools = [ multiply, add, subtract, divide, modulus, wiki_search, arxiv_search, web_search, filter_by_year, # Enhanced tool extract_albums, # Enhanced tool compare_values, # Enhanced tool count_items # Enhanced tool ] # --- Graph Builder --- def build_graph(): # Initialize model with Gemini 2.5 Flash llm = ChatGoogleGenerativeAI( model="gemini-2.5-flash", temperature=0.3, google_api_key=google_api_key, max_retries=3 ) # Bind tools to LLM llm_with_tools = llm.bind_tools(tools) # 1. Define state structure class AgentState(TypedDict): messages: Annotated[Sequence, operator.add] structured_data: dict # New field for structured information # 2. Create graph workflow = StateGraph(AgentState) # 3. Define node functions def agent_node(state: AgentState): """Main agent node""" try: # Remove forced delay to improve performance # time.sleep(1) # Commented out for performance # Call with retry mechanism @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) def invoke_with_retry(): return llm_with_tools.invoke(state["messages"]) response = invoke_with_retry() return {"messages": [response]} except Exception as e: error_type = "UNKNOWN" if "429" in str(e): error_type = "QUOTA_EXCEEDED" elif "400" in str(e): error_type = "INVALID_REQUEST" error_msg = f"AGENT ERROR ({error_type}): {str(e)[:200]}" return {"messages": [AIMessage(content=error_msg)]} def tool_node(state: AgentState): """Tool execution node""" last_msg = state["messages"][-1] tool_calls = last_msg.additional_kwargs.get("tool_calls", []) responses = [] for call in tool_calls: tool_name = call["function"]["name"] tool_args = call["function"].get("arguments", {}) # Find the tool tool_func = next((t for t in tools if t.name == tool_name), None) if not tool_func: responses.append(f"Tool {tool_name} not available") continue try: # Parse arguments if isinstance(tool_args, str): tool_args = json.loads(tool_args) # Execute tool result = tool_func.invoke(tool_args) # Store structured results if tool_name in ["extract_albums", "filter_by_year"]: state["structured_data"][tool_name] = result responses.append(f"{tool_name} result: {str(result)[:1000]}") # Limit result length except Exception as e: responses.append(f"{tool_name} error: {str(e)}") tool_response_content = "\n".join(responses) return {"messages": [AIMessage(content=tool_response_content)]} # 4. Add nodes to workflow workflow.add_node("agent", agent_node) workflow.add_node("tools", tool_node) # 5. Set entry point workflow.set_entry_point("agent") # 6. Define conditional edges def should_continue(state: AgentState): last_msg = state["messages"][-1] # End on error if "AGENT ERROR" in last_msg.content: return "end" # Go to tools if there are tool calls if hasattr(last_msg, "tool_calls") and last_msg.tool_calls: return "tools" # End if final answer is present if "FINAL ANSWER" in last_msg.content: return "end" # Otherwise continue with agent return "agent" workflow.add_conditional_edges( "agent", should_continue, { "agent": "agent", "tools": "tools", "end": END } ) # 7. Define flow after tool node workflow.add_edge("tools", "agent") # 8. Compile graph return workflow.compile() # Initialize agent graph agent_graph = build_graph()