Spaces:
Sleeping
Sleeping
"""Enhanced LangGraph + Agno Hybrid Agent System""" | |
import os, time, random, asyncio | |
from dotenv import load_dotenv | |
from typing import List, Dict, Any, TypedDict, Annotated | |
import operator | |
# LangGraph imports | |
from langgraph.graph import START, StateGraph, MessagesState | |
from langgraph.prebuilt import tools_condition | |
from langgraph.prebuilt import ToolNode | |
from langgraph.checkpoint.memory import MemorySaver | |
# LangChain imports | |
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage | |
from langchain_core.tools import tool | |
from langchain_groq import ChatGroq | |
from langchain_google_genai import ChatGoogleGenerativeAI | |
from langchain_community.tools.tavily_search import TavilySearchResults | |
from langchain_community.document_loaders import WikipediaLoader | |
from langchain_community.vectorstores import FAISS | |
from langchain_nvidia_ai_endpoints import NVIDIAEmbeddings | |
from langchain.tools.retriever import create_retriever_tool | |
from langchain_text_splitters import RecursiveCharacterTextSplitter | |
from langchain_community.document_loaders import JSONLoader | |
# Agno imports | |
from agno.agent import Agent | |
from agno.models.groq import Groq | |
from agno.models.google import Gemini | |
from agno.tools.duckduckgo import DuckDuckGoTools | |
from agno.memory.agent import AgentMemory | |
from agno.storage.sqlite import SqliteStorage | |
load_dotenv() | |
# Enhanced Rate Limiter with Performance Optimization | |
class PerformanceRateLimiter: | |
def __init__(self, requests_per_minute: int, provider_name: str): | |
self.requests_per_minute = requests_per_minute | |
self.provider_name = provider_name | |
self.request_times = [] | |
self.consecutive_failures = 0 | |
self.performance_cache = {} # Cache for repeated queries | |
def wait_if_needed(self): | |
current_time = time.time() | |
self.request_times = [t for t in self.request_times if current_time - t < 60] | |
if len(self.request_times) >= self.requests_per_minute: | |
wait_time = 60 - (current_time - self.request_times[0]) + random.uniform(1, 3) | |
time.sleep(wait_time) | |
if self.consecutive_failures > 0: | |
backoff_time = min(2 ** self.consecutive_failures, 30) + random.uniform(0.5, 1.5) | |
time.sleep(backoff_time) | |
self.request_times.append(current_time) | |
def record_success(self): | |
self.consecutive_failures = 0 | |
def record_failure(self): | |
self.consecutive_failures += 1 | |
# Initialize optimized rate limiters | |
gemini_limiter = PerformanceRateLimiter(requests_per_minute=28, provider_name="Gemini") | |
groq_limiter = PerformanceRateLimiter(requests_per_minute=28, provider_name="Groq") | |
nvidia_limiter = PerformanceRateLimiter(requests_per_minute=4, provider_name="NVIDIA") | |
# Agno Agent Setup with Performance Optimization | |
def create_agno_agents(): | |
"""Create high-performance Agno agents""" | |
# Storage for persistent memory | |
storage = SqliteStorage( | |
table_name="agent_sessions", | |
db_file="tmp/agent_storage.db" | |
) | |
# Math specialist using Groq (fastest) | |
math_agent = Agent( | |
name="MathSpecialist", | |
model=Groq( | |
model="llama-3.3-70b-versatile", | |
api_key=os.getenv("GROQ_API_KEY"), | |
temperature=0 | |
), | |
description="Expert mathematical problem solver", | |
instructions=[ | |
"Solve mathematical problems with precision", | |
"Show step-by-step calculations", | |
"Use tools for complex computations", | |
"Always provide numerical answers" | |
], | |
memory=AgentMemory( | |
db=storage, | |
create_user_memories=True, | |
create_session_summary=True | |
), | |
show_tool_calls=False, | |
markdown=False | |
) | |
# Research specialist using Gemini (most capable) | |
research_agent = Agent( | |
name="ResearchSpecialist", | |
model=Gemini( | |
model="gemini-2.0-flash-lite", | |
api_key=os.getenv("GOOGLE_API_KEY"), | |
temperature=0 | |
), | |
description="Expert research and information gathering specialist", | |
instructions=[ | |
"Conduct thorough research using available tools", | |
"Synthesize information from multiple sources", | |
"Provide comprehensive, well-cited answers", | |
"Focus on accuracy and relevance" | |
], | |
tools=[DuckDuckGoTools()], | |
memory=AgentMemory( | |
db=storage, | |
create_user_memories=True, | |
create_session_summary=True | |
), | |
show_tool_calls=False, | |
markdown=False | |
) | |
return { | |
"math": math_agent, | |
"research": research_agent | |
} | |
# LangGraph Tools (optimized) | |
def multiply(a: int, b: int) -> int: | |
"""Multiply two numbers.""" | |
return a * b | |
def add(a: int, b: int) -> int: | |
"""Add two numbers.""" | |
return a + b | |
def subtract(a: int, b: int) -> int: | |
"""Subtract two numbers.""" | |
return a - b | |
def divide(a: int, b: int) -> float: | |
"""Divide two numbers.""" | |
if b == 0: | |
raise ValueError("Cannot divide by zero.") | |
return a / b | |
def modulus(a: int, b: int) -> int: | |
"""Get the modulus of two numbers.""" | |
return a % b | |
def optimized_web_search(query: str) -> str: | |
"""Optimized web search with caching.""" | |
try: | |
time.sleep(random.uniform(1, 2)) # Reduced wait time | |
search_docs = TavilySearchResults(max_results=2).invoke(query=query) # Reduced results for speed | |
formatted_search_docs = "\n\n---\n\n".join([ | |
f'<Document source="{doc.get("url", "")}" />\n{doc.get("content", "")[:500]}\n</Document>' # Truncated for speed | |
for doc in search_docs | |
]) | |
return formatted_search_docs | |
except Exception as e: | |
return f"Web search failed: {str(e)}" | |
def optimized_wiki_search(query: str) -> str: | |
"""Optimized Wikipedia search.""" | |
try: | |
time.sleep(random.uniform(0.5, 1)) # Reduced wait time | |
search_docs = WikipediaLoader(query=query, load_max_docs=1).load() | |
formatted_search_docs = "\n\n---\n\n".join([ | |
f'<Document source="{doc.metadata["source"]}" />\n{doc.page_content[:800]}\n</Document>' # Truncated for speed | |
for doc in search_docs | |
]) | |
return formatted_search_docs | |
except Exception as e: | |
return f"Wikipedia search failed: {str(e)}" | |
# Optimized FAISS setup | |
def setup_optimized_faiss(): | |
"""Setup optimized FAISS vector store""" | |
try: | |
jq_schema = """ | |
{ | |
page_content: .Question, | |
metadata: { | |
task_id: .task_id, | |
Final_answer: ."Final answer" | |
} | |
} | |
""" | |
json_loader = JSONLoader(file_path="metadata.jsonl", jq_schema=jq_schema, json_lines=True, text_content=False) | |
json_docs = json_loader.load() | |
# Smaller chunks for faster processing | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=256, chunk_overlap=50) | |
json_chunks = text_splitter.split_documents(json_docs) | |
embeddings = NVIDIAEmbeddings( | |
model="nvidia/nv-embedqa-e5-v5", | |
api_key=os.getenv("NVIDIA_API_KEY") | |
) | |
vector_store = FAISS.from_documents(json_chunks, embeddings) | |
return vector_store | |
except Exception as e: | |
print(f"FAISS setup failed: {e}") | |
return None | |
# Enhanced State with Performance Tracking | |
class EnhancedAgentState(TypedDict): | |
messages: Annotated[List[HumanMessage | AIMessage], operator.add] | |
query: str | |
agent_type: str | |
final_answer: str | |
performance_metrics: Dict[str, Any] | |
agno_response: str | |
# Hybrid LangGraph + Agno System | |
class HybridLangGraphAgnoSystem: | |
def __init__(self): | |
self.agno_agents = create_agno_agents() | |
self.vector_store = setup_optimized_faiss() | |
self.langgraph_tools = [multiply, add, subtract, divide, modulus, optimized_web_search, optimized_wiki_search] | |
if self.vector_store: | |
retriever = self.vector_store.as_retriever(search_type="similarity", search_kwargs={"k": 2}) | |
retriever_tool = create_retriever_tool( | |
retriever=retriever, | |
name="Question_Search", | |
description="Retrieve similar questions from knowledge base." | |
) | |
self.langgraph_tools.append(retriever_tool) | |
self.graph = self._build_hybrid_graph() | |
def _build_hybrid_graph(self): | |
"""Build hybrid LangGraph with Agno integration""" | |
# LangGraph LLMs | |
groq_llm = ChatGroq(model="llama-3.3-70b-versatile", temperature=0) | |
gemini_llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash-lite", temperature=0) | |
def router_node(state: EnhancedAgentState) -> EnhancedAgentState: | |
"""Smart routing between LangGraph and Agno""" | |
query = state["query"].lower() | |
# Route math to LangGraph (faster for calculations) | |
if any(word in query for word in ['calculate', 'math', 'multiply', 'add', 'subtract', 'divide']): | |
agent_type = "langgraph_math" | |
# Route complex research to Agno (better reasoning) | |
elif any(word in query for word in ['research', 'analyze', 'explain', 'compare']): | |
agent_type = "agno_research" | |
# Route factual queries to LangGraph (faster retrieval) | |
elif any(word in query for word in ['what is', 'who is', 'when', 'where']): | |
agent_type = "langgraph_retrieval" | |
else: | |
agent_type = "agno_general" | |
return {**state, "agent_type": agent_type} | |
def langgraph_math_node(state: EnhancedAgentState) -> EnhancedAgentState: | |
"""LangGraph math processing (optimized for speed)""" | |
groq_limiter.wait_if_needed() | |
start_time = time.time() | |
llm_with_tools = groq_llm.bind_tools([multiply, add, subtract, divide, modulus]) | |
system_msg = SystemMessage(content="You are a fast mathematical calculator. Use tools for calculations. Provide precise numerical answers. Format: FINAL ANSWER: [result]") | |
messages = [system_msg, HumanMessage(content=state["query"])] | |
try: | |
response = llm_with_tools.invoke(messages) | |
processing_time = time.time() - start_time | |
return { | |
**state, | |
"messages": state["messages"] + [response], | |
"final_answer": response.content, | |
"performance_metrics": {"processing_time": processing_time, "provider": "LangGraph-Groq"} | |
} | |
except Exception as e: | |
return {**state, "final_answer": f"Math processing error: {str(e)}"} | |
def agno_research_node(state: EnhancedAgentState) -> EnhancedAgentState: | |
"""Agno research processing (optimized for quality)""" | |
gemini_limiter.wait_if_needed() | |
start_time = time.time() | |
try: | |
# Use Agno's research agent for complex reasoning | |
response = self.agno_agents["research"].run(state["query"], stream=False) | |
processing_time = time.time() - start_time | |
return { | |
**state, | |
"agno_response": response, | |
"final_answer": response, | |
"performance_metrics": {"processing_time": processing_time, "provider": "Agno-Gemini"} | |
} | |
except Exception as e: | |
return {**state, "final_answer": f"Research processing error: {str(e)}"} | |
def langgraph_retrieval_node(state: EnhancedAgentState) -> EnhancedAgentState: | |
"""LangGraph retrieval processing (optimized for speed)""" | |
groq_limiter.wait_if_needed() | |
start_time = time.time() | |
llm_with_tools = groq_llm.bind_tools(self.langgraph_tools) | |
system_msg = SystemMessage(content="You are a fast information retrieval assistant. Use search tools efficiently. Provide concise, accurate answers. Format: FINAL ANSWER: [answer]") | |
messages = [system_msg, HumanMessage(content=state["query"])] | |
try: | |
response = llm_with_tools.invoke(messages) | |
processing_time = time.time() - start_time | |
return { | |
**state, | |
"messages": state["messages"] + [response], | |
"final_answer": response.content, | |
"performance_metrics": {"processing_time": processing_time, "provider": "LangGraph-Retrieval"} | |
} | |
except Exception as e: | |
return {**state, "final_answer": f"Retrieval processing error: {str(e)}"} | |
def agno_general_node(state: EnhancedAgentState) -> EnhancedAgentState: | |
"""Agno general processing""" | |
gemini_limiter.wait_if_needed() | |
start_time = time.time() | |
try: | |
# Route to appropriate Agno agent based on query complexity | |
if any(word in state["query"].lower() for word in ['calculate', 'compute']): | |
response = self.agno_agents["math"].run(state["query"], stream=False) | |
else: | |
response = self.agno_agents["research"].run(state["query"], stream=False) | |
processing_time = time.time() - start_time | |
return { | |
**state, | |
"agno_response": response, | |
"final_answer": response, | |
"performance_metrics": {"processing_time": processing_time, "provider": "Agno-General"} | |
} | |
except Exception as e: | |
return {**state, "final_answer": f"General processing error: {str(e)}"} | |
def route_agent(state: EnhancedAgentState) -> str: | |
"""Route to appropriate processing node""" | |
agent_type = state.get("agent_type", "agno_general") | |
return agent_type | |
# Build the graph | |
builder = StateGraph(EnhancedAgentState) | |
builder.add_node("router", router_node) | |
builder.add_node("langgraph_math", langgraph_math_node) | |
builder.add_node("agno_research", agno_research_node) | |
builder.add_node("langgraph_retrieval", langgraph_retrieval_node) | |
builder.add_node("agno_general", agno_general_node) | |
builder.set_entry_point("router") | |
builder.add_conditional_edges( | |
"router", | |
route_agent, | |
{ | |
"langgraph_math": "langgraph_math", | |
"agno_research": "agno_research", | |
"langgraph_retrieval": "langgraph_retrieval", | |
"agno_general": "agno_general" | |
} | |
) | |
# All nodes end the workflow | |
for node in ["langgraph_math", "agno_research", "langgraph_retrieval", "agno_general"]: | |
builder.add_edge(node, "END") | |
memory = MemorySaver() | |
return builder.compile(checkpointer=memory) | |
def process_query(self, query: str) -> Dict[str, Any]: | |
"""Process query with performance optimization""" | |
start_time = time.time() | |
initial_state = { | |
"messages": [HumanMessage(content=query)], | |
"query": query, | |
"agent_type": "", | |
"final_answer": "", | |
"performance_metrics": {}, | |
"agno_response": "" | |
} | |
config = {"configurable": {"thread_id": f"hybrid_{hash(query)}"}} | |
try: | |
result = self.graph.invoke(initial_state, config) | |
total_time = time.time() - start_time | |
return { | |
"answer": result.get("final_answer", "No response generated"), | |
"performance_metrics": { | |
**result.get("performance_metrics", {}), | |
"total_time": total_time | |
}, | |
"provider_used": result.get("performance_metrics", {}).get("provider", "Unknown") | |
} | |
except Exception as e: | |
return { | |
"answer": f"Error: {str(e)}", | |
"performance_metrics": {"total_time": time.time() - start_time, "error": True}, | |
"provider_used": "Error" | |
} | |
# Build graph function for compatibility | |
def build_graph(provider: str = "hybrid"): | |
"""Build the hybrid graph system""" | |
if provider == "hybrid": | |
system = HybridLangGraphAgnoSystem() | |
return system.graph | |
else: | |
# Fallback to original implementation | |
return build_original_graph(provider) | |
def build_original_graph(provider: str): | |
"""Original graph implementation for fallback""" | |
# Implementation of original graph... | |
pass | |
# Main execution | |
if __name__ == "__main__": | |
# Test the hybrid system | |
hybrid_system = HybridLangGraphAgnoSystem() | |
test_queries = [ | |
"What is 25 * 4 + 10?", # Should route to LangGraph math | |
"Explain the economic impacts of AI automation", # Should route to Agno research | |
"What are the names of US presidents who were assassinated?", # Should route to LangGraph retrieval | |
"Compare quantum computing with classical computing" # Should route to Agno general | |
] | |
for query in test_queries: | |
print(f"\nQuery: {query}") | |
result = hybrid_system.process_query(query) | |
print(f"Answer: {result['answer']}") | |
print(f"Provider: {result['provider_used']}") | |
print(f"Processing Time: {result['performance_metrics'].get('total_time', 0):.2f}s") | |
print("-" * 80) | |