Spaces:
Sleeping
Sleeping
| import os, json, time, random | |
| from dotenv import load_dotenv | |
| # Load environment variables | |
| load_dotenv() | |
| # Imports | |
| from langchain_nvidia_ai_endpoints import NVIDIAEmbeddings | |
| from langchain_groq import ChatGroq | |
| from langchain_nvidia_ai_endpoints import ChatNVIDIA | |
| from langchain_community.tools.tavily_search import TavilySearchResults | |
| from langchain_community.document_loaders import WikipediaLoader | |
| from langchain_community.document_loaders import ArxivLoader | |
| from langchain_community.vectorstores import FAISS | |
| from langchain_core.messages import SystemMessage, HumanMessage | |
| from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder | |
| from langchain_core.tools import tool | |
| from langchain.tools.retriever import create_retriever_tool | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from langchain_community.document_loaders import JSONLoader | |
| from langgraph.prebuilt import create_react_agent | |
| from langgraph.checkpoint.memory import MemorySaver | |
| from langchain_core.rate_limiters import InMemoryRateLimiter | |
| # Rate limiters for different providers | |
| groq_rate_limiter = InMemoryRateLimiter( | |
| requests_per_second=0.5, # 30 requests per minute | |
| check_every_n_seconds=0.1, | |
| max_bucket_size=10 | |
| ) | |
| google_rate_limiter = InMemoryRateLimiter( | |
| requests_per_second=0.33, # 20 requests per minute | |
| check_every_n_seconds=0.1, | |
| max_bucket_size=10 | |
| ) | |
| nvidia_rate_limiter = InMemoryRateLimiter( | |
| requests_per_second=0.25, # 15 requests per minute | |
| check_every_n_seconds=0.1, | |
| max_bucket_size=10 | |
| ) | |
| # Initialize individual LLMs | |
| groq_llm = ChatGroq( | |
| model="llama-3.3-70b-versatile", | |
| temperature=0, | |
| api_key=os.getenv("GROQ_API_KEY"), | |
| rate_limiter=groq_rate_limiter, | |
| max_retries=2, | |
| request_timeout=60 | |
| ) | |
| nvidia_llm = ChatNVIDIA( | |
| model="meta/llama-3.1-405b-instruct", | |
| temperature=0, | |
| api_key=os.getenv("NVIDIA_API_KEY"), | |
| rate_limiter=nvidia_rate_limiter, | |
| max_retries=2 | |
| ) | |
| # Create LLM tools that can be selected by the agent | |
| def groq_reasoning_tool(query: str) -> str: | |
| """Use Groq's Llama model for fast reasoning, mathematical calculations, and logical problems. | |
| Best for: Math problems, logical reasoning, quick calculations, code generation. | |
| Args: | |
| query: The question or problem to solve | |
| """ | |
| try: | |
| time.sleep(random.uniform(1, 2)) # Rate limiting | |
| response = groq_llm.invoke([HumanMessage(content=query)]) | |
| return f"Groq Response: {response.content}" | |
| except Exception as e: | |
| return f"Groq tool failed: {str(e)}" | |
| def nvidia_specialist_tool(query: str) -> str: | |
| """Use NVIDIA's large model for specialized tasks, technical questions, and domain expertise. | |
| Best for: Technical questions, specialized domains, scientific problems, detailed analysis. | |
| Args: | |
| query: The specialized question or technical problem | |
| """ | |
| try: | |
| time.sleep(random.uniform(2, 4)) # Rate limiting | |
| response = nvidia_llm.invoke([HumanMessage(content=query)]) | |
| return f"NVIDIA Response: {response.content}" | |
| except Exception as e: | |
| return f"NVIDIA tool failed: {str(e)}" | |
| # Define calculation tools | |
| def multiply(a: int | float, b: int | float) -> int | float: | |
| """Multiply two numbers. | |
| Args: | |
| a: first int | float | |
| b: second int | float | |
| """ | |
| return a * b | |
| def add(a: int | float, b: int | float) -> int | float: | |
| """Add two numbers. | |
| Args: | |
| a: first int | float | |
| b: second int | float | |
| """ | |
| return a + b | |
| def subtract(a: int | float , b: int | float) -> int | float: | |
| """Subtract two numbers. | |
| Args: | |
| a: first int | float | |
| b: second int | float | |
| """ | |
| return a - b | |
| def divide(a: int | float, b: int | float) -> int | float: | |
| """Divide two numbers. | |
| Args: | |
| a: first int | float | |
| b: second int | float | |
| """ | |
| if b == 0: | |
| raise ValueError("Cannot divide by zero.") | |
| return a / b | |
| def modulus(a: int | float, b: int | float) -> int | float: | |
| """Get the modulus of two numbers. | |
| Args: | |
| a: first int | float | |
| b: second int | float | |
| """ | |
| return a % b | |
| # Define search tools | |
| def wiki_search(query: str) -> str: | |
| """Search the wikipedia for a query and return the first paragraph | |
| args: | |
| query: the query to search for | |
| """ | |
| try: | |
| loader = WikipediaLoader(query=query, load_max_docs=1) | |
| data = loader.load() | |
| formatted_search_docs = "\n\n---\n\n".join( | |
| [ | |
| f'\n{doc.page_content}\n' | |
| for doc in data | |
| ]) | |
| return formatted_search_docs | |
| except Exception as e: | |
| return f"Wikipedia search failed: {str(e)}" | |
| def web_search(query: str) -> str: | |
| """Search Tavily for a query and return maximum 3 results. | |
| Args: | |
| query: The search query. | |
| """ | |
| try: | |
| time.sleep(random.uniform(1, 3)) | |
| search_docs = TavilySearchResults(max_results=3).invoke(query=query) | |
| formatted_search_docs = "\n\n---\n\n".join( | |
| [ | |
| f'\n{doc.get("content", "")}\n' | |
| for doc in search_docs | |
| ]) | |
| return formatted_search_docs | |
| except Exception as e: | |
| return f"Web search failed: {str(e)}" | |
| def arxiv_search(query: str) -> str: | |
| """Search Arxiv for a query and return maximum 3 result. | |
| Args: | |
| query: The search query. | |
| """ | |
| try: | |
| search_docs = ArxivLoader(query=query, load_max_docs=3).load() | |
| formatted_search_docs = "\n\n---\n\n".join( | |
| [ | |
| f'\n{doc.page_content[:1000]}\n' | |
| for doc in search_docs | |
| ]) | |
| return formatted_search_docs | |
| except Exception as e: | |
| return f"ArXiv search failed: {str(e)}" | |
| # Load and process your JSONL data | |
| jq_schema = """ | |
| { | |
| page_content: .Question, | |
| metadata: { | |
| task_id: .task_id, | |
| Level: .Level, | |
| Final_answer: ."Final answer", | |
| file_name: .file_name, | |
| Steps: .["Annotator Metadata"].Steps, | |
| Number_of_steps: .["Annotator Metadata"]["Number of steps"], | |
| How_long: .["Annotator Metadata"]["How long did this take?"], | |
| Tools: .["Annotator Metadata"].Tools, | |
| Number_of_tools: .["Annotator Metadata"]["Number of tools"] | |
| } | |
| } | |
| """ | |
| # Load documents and create vector database | |
| json_loader = JSONLoader(file_path="metadata.jsonl", jq_schema=jq_schema, json_lines=True, text_content=False) | |
| json_docs = json_loader.load() | |
| # Split documents | |
| text_splitter = RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=200) | |
| json_chunks = text_splitter.split_documents(json_docs) | |
| # Create vector database | |
| database = FAISS.from_documents(json_chunks, NVIDIAEmbeddings()) | |
| # Create retriever and retriever tool | |
| retriever = database.as_retriever(search_type="similarity", search_kwargs={"k": 3}) | |
| retriever_tool = create_retriever_tool( | |
| retriever=retriever, | |
| name="question_search", | |
| description="Search for similar questions and their solutions from the knowledge base." | |
| ) | |
| # Combine all tools including LLM tools | |
| tools = [ | |
| # Math tools | |
| multiply, | |
| add, | |
| subtract, | |
| divide, | |
| modulus, | |
| # Search tools | |
| wiki_search, | |
| web_search, | |
| arxiv_search, | |
| retriever_tool, | |
| # LLM tools - agent can choose which LLM to use | |
| groq_reasoning_tool, | |
| nvidia_specialist_tool | |
| ] | |
| # Use a lightweight coordinator LLM (Groq for speed) | |
| coordinator_llm = ChatGroq( | |
| model="llama-3.3-70b-versatile", | |
| temperature=0, | |
| api_key=os.getenv("GROQ_API_KEY"), | |
| rate_limiter=groq_rate_limiter | |
| ) | |
| # Create memory for conversation | |
| memory = MemorySaver() | |
| # Create the agent with coordinator LLM | |
| agent_executor = create_react_agent( | |
| model=coordinator_llm, | |
| tools=tools, | |
| checkpointer=memory | |
| ) | |
| # Enhanced robust agent run | |
| def robust_agent_run(query, thread_id="robust_conversation", max_retries=3): | |
| """Run agent with error handling, rate limiting, and LLM tool selection""" | |
| for attempt in range(max_retries): | |
| try: | |
| config = {"configurable": {"thread_id": f"{thread_id}_{attempt}"}} | |
| system_msg = SystemMessage(content='''You are a helpful assistant with access to multiple specialized LLM tools and other utilities. | |
| AVAILABLE LLM TOOLS: | |
| - groq_reasoning_tool: Fast reasoning, math, calculations, code (use for quick logical problems) | |
| - google_analysis_tool: Complex analysis, creative tasks, detailed explanations (use for comprehensive analysis) | |
| - nvidia_specialist_tool: Technical questions, specialized domains, scientific problems (use for expert-level tasks) | |
| TOOL SELECTION STRATEGY: | |
| - For math/calculations: Use basic math tools (add, multiply, etc.) OR groq_reasoning_tool for complex math | |
| - For factual questions: Use web_search, wiki_search, or arxiv_search first | |
| - For analysis/reasoning: Choose the most appropriate LLM tool based on complexity | |
| - For technical/scientific: Use nvidia_specialist_tool | |
| - For creative/comprehensive: Use google_analysis_tool | |
| - For quick logical problems: Use groq_reasoning_tool | |
| Always finish with: FINAL ANSWER: [YOUR FINAL ANSWER] | |
| Your answer should be a number OR few words OR comma separated list as appropriate.''') | |
| user_msg = HumanMessage(content=query) | |
| result = [] | |
| print(f"Attempt {attempt + 1}: Processing query with multi-LLM agent...") | |
| for step in agent_executor.stream( | |
| {"messages": [system_msg, user_msg]}, | |
| config, | |
| stream_mode="values" | |
| ): | |
| result = step["messages"] | |
| final_response = result[-1].content if result else "No response generated" | |
| print(f"Query processed successfully on attempt {attempt + 1}") | |
| return final_response | |
| except Exception as e: | |
| error_msg = str(e).lower() | |
| if any(keyword in error_msg for keyword in ['rate limit', 'too many requests', '429', 'quota exceeded']): | |
| wait_time = (2 ** attempt) + random.uniform(1, 3) | |
| print(f"Rate limit hit on attempt {attempt + 1}. Waiting {wait_time:.2f} seconds...") | |
| time.sleep(wait_time) | |
| if attempt == max_retries - 1: | |
| return f"Rate limit exceeded after {max_retries} attempts: {str(e)}" | |
| continue | |
| elif any(keyword in error_msg for keyword in ['api', 'connection', 'timeout', 'service unavailable']): | |
| wait_time = (2 ** attempt) + random.uniform(0.5, 1.5) | |
| print(f"API error on attempt {attempt + 1}. Retrying in {wait_time:.2f} seconds...") | |
| time.sleep(wait_time) | |
| if attempt == max_retries - 1: | |
| return f"API error after {max_retries} attempts: {str(e)}" | |
| continue | |
| else: | |
| return f"Error occurred: {str(e)}" | |
| return "Maximum retries exceeded" | |
| # Main function with request tracking | |
| request_count = 0 | |
| last_request_time = time.time() | |
| def main(query: str) -> str: | |
| """Main function to run the multi-LLM agent""" | |
| global request_count, last_request_time | |
| current_time = time.time() | |
| # Reset counter every minute | |
| if current_time - last_request_time > 60: | |
| request_count = 0 | |
| last_request_time = current_time | |
| request_count += 1 | |
| print(f"Processing request #{request_count} with multi-LLM agent") | |
| # Add delay between requests | |
| if request_count > 1: | |
| time.sleep(random.uniform(2, 5)) | |
| return robust_agent_run(query) | |
| if __name__ == "__main__": | |
| # Test the multi-LLM agent | |
| result = main("What are the names of the US presidents who were assassinated?") | |
| print(result) | |