Spaces:
Sleeping
Sleeping
""" | |
Final Working Multi-LLM Agent System | |
Robust fallback system that works even when Agno fails | |
""" | |
import os | |
import time | |
import random | |
import operator | |
from typing import List, Dict, Any, TypedDict, Annotated, Optional | |
from dotenv import load_dotenv | |
# Core LangChain imports | |
from langchain_core.tools import tool | |
from langchain_community.tools.tavily_search import TavilySearchResults | |
from langchain_community.document_loaders import WikipediaLoader | |
from langgraph.graph import StateGraph, END | |
from langgraph.checkpoint.memory import MemorySaver | |
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage | |
from langchain_groq import ChatGroq | |
load_dotenv() | |
# System prompt for proper question answering | |
SYSTEM_PROMPT = """You are a helpful assistant tasked with answering questions using available tools. | |
Guidelines: | |
1. Use available tools to gather information when needed | |
2. Provide precise, factual answers | |
3. For numbers: don't use commas or units unless specified | |
4. For strings: don't use articles or abbreviations, write digits in plain text | |
5. Always end with 'FINAL ANSWER: [YOUR ANSWER]' | |
6. Be concise but thorough | |
7. If you cannot find the answer, state that clearly""" | |
# ---- Tool Definitions ---- | |
def multiply(a: int, b: int) -> int: | |
"""Multiply two integers and return the product.""" | |
return a * b | |
def add(a: int, b: int) -> int: | |
"""Add two integers and return the sum.""" | |
return a + b | |
def subtract(a: int, b: int) -> int: | |
"""Subtract the second integer from the first and return the difference.""" | |
return a - b | |
def divide(a: int, b: int) -> float: | |
"""Divide the first integer by the second and return the quotient.""" | |
if b == 0: | |
raise ValueError("Cannot divide by zero.") | |
return a / b | |
def modulus(a: int, b: int) -> int: | |
"""Return the remainder when dividing the first integer by the second.""" | |
return a % b | |
def web_search(query: str) -> str: | |
"""Search the web for information.""" | |
try: | |
if os.getenv("TAVILY_API_KEY"): | |
time.sleep(random.uniform(0.5, 1.0)) | |
search_tool = TavilySearchResults(max_results=3) | |
docs = search_tool.invoke({"query": query}) | |
return "\n\n---\n\n".join( | |
f"<Doc url='{d.get('url','')}'>{d.get('content','')[:600]}</Doc>" | |
for d in docs | |
) | |
else: | |
return "Web search not available - no API key" | |
except Exception as e: | |
return f"Web search failed: {e}" | |
def wiki_search(query: str) -> str: | |
"""Search Wikipedia for information.""" | |
try: | |
time.sleep(random.uniform(0.3, 0.8)) | |
docs = WikipediaLoader(query=query, load_max_docs=2).load() | |
return "\n\n---\n\n".join( | |
f"<Doc src='Wikipedia'>{d.page_content[:800]}</Doc>" | |
for d in docs | |
) | |
except Exception as e: | |
return f"Wikipedia search failed: {e}" | |
# ---- Enhanced Agent State ---- | |
class EnhancedAgentState(TypedDict): | |
messages: Annotated[List[HumanMessage | AIMessage], operator.add] | |
query: str | |
agent_type: str | |
final_answer: str | |
perf: Dict[str, Any] | |
tools_used: List[str] | |
# ---- Working Multi-LLM System ---- | |
class WorkingMultiLLMSystem: | |
"""Reliable multi-LLM system that actually works""" | |
def __init__(self): | |
self.tools = [multiply, add, subtract, divide, modulus, web_search, wiki_search] | |
self.graph = self._build_graph() | |
print("✅ Working Multi-LLM System initialized") | |
def _get_llm(self, model_name: str = "llama3-70b-8192"): | |
"""Get Groq LLM instance""" | |
return ChatGroq( | |
model=model_name, | |
temperature=0, | |
api_key=os.getenv("GROQ_API_KEY") | |
) | |
def _build_graph(self) -> StateGraph: | |
"""Build the working LangGraph system""" | |
def router(st: EnhancedAgentState) -> EnhancedAgentState: | |
"""Route queries to appropriate processing""" | |
q = st["query"].lower() | |
if any(keyword in q for keyword in ["calculate", "multiply", "add", "subtract", "divide", "math"]): | |
agent_type = "math" | |
elif any(keyword in q for keyword in ["search", "find", "information", "about"]): | |
agent_type = "search" | |
elif any(keyword in q for keyword in ["wikipedia", "wiki"]): | |
agent_type = "wiki" | |
else: | |
agent_type = "general" | |
return {**st, "agent_type": agent_type, "tools_used": []} | |
def math_node(st: EnhancedAgentState) -> EnhancedAgentState: | |
"""Handle mathematical queries""" | |
t0 = time.time() | |
try: | |
llm = self._get_llm("llama3-70b-8192") | |
enhanced_query = f""" | |
Question: {st["query"]} | |
This is a mathematical question. Please solve it step by step and provide the exact numerical answer. | |
""" | |
sys_msg = SystemMessage(content=SYSTEM_PROMPT) | |
response = llm.invoke([sys_msg, HumanMessage(content=enhanced_query)]) | |
answer = response.content.strip() | |
if "FINAL ANSWER:" in answer: | |
answer = answer.split("FINAL ANSWER:")[-1].strip() | |
return {**st, | |
"final_answer": answer, | |
"perf": {"time": time.time() - t0, "provider": "Groq-Math"}} | |
except Exception as e: | |
return {**st, "final_answer": f"Error: {e}", "perf": {"error": str(e)}} | |
def search_node(st: EnhancedAgentState) -> EnhancedAgentState: | |
"""Handle search queries""" | |
t0 = time.time() | |
try: | |
# Perform web search | |
search_results = web_search.invoke({"query": st["query"]}) | |
llm = self._get_llm("llama3-70b-8192") | |
enhanced_query = f""" | |
Question: {st["query"]} | |
Search Results: | |
{search_results} | |
Based on the search results above, provide a direct answer to the question. | |
""" | |
sys_msg = SystemMessage(content=SYSTEM_PROMPT) | |
response = llm.invoke([sys_msg, HumanMessage(content=enhanced_query)]) | |
answer = response.content.strip() | |
if "FINAL ANSWER:" in answer: | |
answer = answer.split("FINAL ANSWER:")[-1].strip() | |
return {**st, | |
"final_answer": answer, | |
"tools_used": ["web_search"], | |
"perf": {"time": time.time() - t0, "provider": "Groq-Search"}} | |
except Exception as e: | |
return {**st, "final_answer": f"Error: {e}", "perf": {"error": str(e)}} | |
def wiki_node(st: EnhancedAgentState) -> EnhancedAgentState: | |
"""Handle Wikipedia queries""" | |
t0 = time.time() | |
try: | |
# Perform Wikipedia search | |
wiki_results = wiki_search.invoke({"query": st["query"]}) | |
llm = self._get_llm("llama3-70b-8192") | |
enhanced_query = f""" | |
Question: {st["query"]} | |
Wikipedia Results: | |
{wiki_results} | |
Based on the Wikipedia information above, provide a direct answer to the question. | |
""" | |
sys_msg = SystemMessage(content=SYSTEM_PROMPT) | |
response = llm.invoke([sys_msg, HumanMessage(content=enhanced_query)]) | |
answer = response.content.strip() | |
if "FINAL ANSWER:" in answer: | |
answer = answer.split("FINAL ANSWER:")[-1].strip() | |
return {**st, | |
"final_answer": answer, | |
"tools_used": ["wiki_search"], | |
"perf": {"time": time.time() - t0, "provider": "Groq-Wiki"}} | |
except Exception as e: | |
return {**st, "final_answer": f"Error: {e}", "perf": {"error": str(e)}} | |
def general_node(st: EnhancedAgentState) -> EnhancedAgentState: | |
"""Handle general queries""" | |
t0 = time.time() | |
try: | |
llm = self._get_llm("llama3-70b-8192") | |
enhanced_query = f""" | |
Question: {st["query"]} | |
Please provide a direct, accurate answer to this question. | |
""" | |
sys_msg = SystemMessage(content=SYSTEM_PROMPT) | |
response = llm.invoke([sys_msg, HumanMessage(content=enhanced_query)]) | |
answer = response.content.strip() | |
if "FINAL ANSWER:" in answer: | |
answer = answer.split("FINAL ANSWER:")[-1].strip() | |
return {**st, | |
"final_answer": answer, | |
"perf": {"time": time.time() - t0, "provider": "Groq-General"}} | |
except Exception as e: | |
return {**st, "final_answer": f"Error: {e}", "perf": {"error": str(e)}} | |
# Build graph | |
g = StateGraph(EnhancedAgentState) | |
g.add_node("router", router) | |
g.add_node("math", math_node) | |
g.add_node("search", search_node) | |
g.add_node("wiki", wiki_node) | |
g.add_node("general", general_node) | |
g.set_entry_point("router") | |
g.add_conditional_edges("router", lambda s: s["agent_type"], { | |
"math": "math", | |
"search": "search", | |
"wiki": "wiki", | |
"general": "general" | |
}) | |
for node in ["math", "search", "wiki", "general"]: | |
g.add_edge(node, END) | |
return g.compile(checkpointer=MemorySaver()) | |
def process_query(self, query: str) -> str: | |
"""Process a query through the working system""" | |
state = { | |
"messages": [HumanMessage(content=query)], | |
"query": query, | |
"agent_type": "", | |
"final_answer": "", | |
"perf": {}, | |
"tools_used": [] | |
} | |
config = {"configurable": {"thread_id": f"working_{hash(query)}"}} | |
try: | |
result = self.graph.invoke(state, config) | |
answer = result.get("final_answer", "").strip() | |
# Validation | |
if not answer or answer == query or len(answer.strip()) == 0: | |
return "Information not available" | |
return answer | |
except Exception as e: | |
return f"Error processing query: {e}" | |
# ---- Compatibility Classes ---- | |
class UnifiedAgnoEnhancedSystem: | |
"""Compatibility wrapper for the working system""" | |
def __init__(self): | |
print("Initializing working system...") | |
self.agno_system = None # Not using Agno | |
self.working_system = WorkingMultiLLMSystem() | |
self.graph = self.working_system.graph | |
def process_query(self, query: str) -> str: | |
return self.working_system.process_query(query) | |
def get_system_info(self) -> Dict[str, Any]: | |
return { | |
"system": "working_multi_llm", | |
"agno_available": False, | |
"total_models": 1, | |
"active_agents": ["math", "search", "wiki", "general"] | |
} | |
# For backward compatibility | |
AgnoEnhancedAgentSystem = WorkingMultiLLMSystem | |
AgnoEnhancedModelManager = WorkingMultiLLMSystem | |
def build_graph(provider: str = "working"): | |
"""Build working graph""" | |
system = WorkingMultiLLMSystem() | |
return system.graph | |
if __name__ == "__main__": | |
# Test the working system | |
system = WorkingMultiLLMSystem() | |
test_questions = [ | |
"How many studio albums were published by Mercedes Sosa between 2000 and 2009?", | |
"What is 25 multiplied by 17?", | |
"Who nominated the only Featured Article on English Wikipedia about a dinosaur?" | |
] | |
print("Testing Working Multi-LLM System:") | |
for i, question in enumerate(test_questions, 1): | |
print(f"\nQuestion {i}: {question}") | |
answer = system.process_query(question) | |
print(f"Answer: {answer}") | |