|
""" |
|
Intelligent AI Agent using LlamaIndex with websearch capabilities |
|
This module contains the agent class with advanced tools and reasoning. |
|
""" |
|
|
|
import os |
|
import asyncio |
|
import io |
|
import contextlib |
|
import ast |
|
import traceback |
|
from typing import Any, Dict, Tuple, List |
|
|
|
|
|
try: |
|
from dotenv import load_dotenv |
|
load_dotenv() |
|
print("✅ .env file loaded successfully") |
|
except ImportError: |
|
print("⚠️ python-dotenv not available, .env file not loaded") |
|
except Exception as e: |
|
print(f"⚠️ Error loading .env file: {e}") |
|
|
|
|
|
try: |
|
from llama_index.core.agent.workflow import ( |
|
ToolCall, |
|
ToolCallResult, |
|
FunctionAgent, |
|
AgentStream, |
|
) |
|
|
|
from llama_index.core.tools import FunctionTool |
|
from llama_index.tools.wikipedia import WikipediaToolSpec |
|
from llama_index.tools.tavily_research.base import TavilyToolSpec |
|
|
|
from llama_index.llms.bedrock_converse import BedrockConverse |
|
|
|
LLAMA_INDEX_AVAILABLE = True |
|
except ImportError as e: |
|
print(f"LlamaIndex imports not available: {e}") |
|
LLAMA_INDEX_AVAILABLE = False |
|
|
|
|
|
|
|
class BasicAgent: |
|
""" |
|
Advanced AI Agent using LlamaIndex with CodeAct capabilities and multiple tools. |
|
""" |
|
|
|
def __init__(self): |
|
"""Initialize the agent with LLM, tools, and code executor.""" |
|
print("Initializing Advanced AI Agent with LlamaIndex...") |
|
|
|
|
|
self.hf_token = os.getenv("HUGGINGFACE_TOKEN") |
|
if not self.hf_token: |
|
print("Warning: HUGGINGFACE_TOKEN not found. Using default model.") |
|
|
|
|
|
self._initialize_llm() |
|
|
|
|
|
self._initialize_tools() |
|
|
|
|
|
|
|
|
|
|
|
self._initialize_agent() |
|
|
|
print("Advanced AI Agent initialized successfully.") |
|
|
|
def _initialize_llm(self): |
|
"""Initialize the Hugging Face LLM.""" |
|
if not LLAMA_INDEX_AVAILABLE: |
|
print("LlamaIndex not available, using basic mode") |
|
self.llm = None |
|
return |
|
|
|
try: |
|
|
|
|
|
self.llm = BedrockConverse( |
|
model="amazon.nova-pro-v1:0", |
|
temperature=0.5, |
|
aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"), |
|
aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"), |
|
region_name=os.getenv("AWS_REGION"), |
|
|
|
) |
|
|
|
print("✅ LLM initialized successfully") |
|
except Exception as e: |
|
print(f"Error initializing LLM: {e}") |
|
|
|
self.llm = None |
|
|
|
def _initialize_tools(self): |
|
"""Initialize all available tools.""" |
|
self.tools = [] |
|
|
|
|
|
self.math_functions = { |
|
'add': lambda a, b: a + b, |
|
'subtract': lambda a, b: a - b, |
|
'multiply': lambda a, b: a * b, |
|
'divide': lambda a, b: a / b if b != 0 else "Error: Division by zero", |
|
'power': lambda a, b: a ** b, |
|
'percentage': lambda v, p: (v * p) / 100, |
|
} |
|
|
|
if not LLAMA_INDEX_AVAILABLE: |
|
print("Tools initialization skipped - LlamaIndex not available") |
|
return |
|
|
|
|
|
def add_numbers(a: float, b: float) -> float: |
|
"""Add two numbers together.""" |
|
return a + b |
|
|
|
def subtract_numbers(a: float, b: float) -> float: |
|
"""Subtract second number from first number.""" |
|
return a - b |
|
|
|
def multiply_numbers(a: float, b: float) -> float: |
|
"""Multiply two numbers.""" |
|
return a * b |
|
|
|
def divide_numbers(a: float, b: float) -> float: |
|
"""Divide first number by second number.""" |
|
if b == 0: |
|
return "Error: Division by zero" |
|
return a / b |
|
|
|
def power_numbers(a: float, b: float) -> float: |
|
"""Raise first number to the power of second number.""" |
|
return a ** b |
|
|
|
def calculate_percentage(value: float, percentage: float) -> float: |
|
"""Calculate percentage of a value.""" |
|
return (value * percentage) / 100 |
|
|
|
def get_modulus(a: float, b: float) -> float: |
|
"""Get the modulus of two numbers.""" |
|
return a % b |
|
|
|
|
|
try: |
|
math_tools = [ |
|
FunctionTool.from_defaults(fn=add_numbers, name="add_numbers", description="Add two numbers together"), |
|
FunctionTool.from_defaults(fn=subtract_numbers, name="subtract_numbers", description="Subtract second number from first number"), |
|
FunctionTool.from_defaults(fn=multiply_numbers, name="multiply_numbers", description="Multiply two numbers"), |
|
FunctionTool.from_defaults(fn=divide_numbers, name="divide_numbers", description="Divide first number by second number"), |
|
FunctionTool.from_defaults(fn=power_numbers, name="power_numbers", description="Raise first number to the power of second number"), |
|
FunctionTool.from_defaults(fn=calculate_percentage, name="calculate_percentage", description="Calculate percentage of a value"), |
|
FunctionTool.from_defaults(fn=get_modulus, name="get_modulus", description="Get the modulus of two numbers"), |
|
] |
|
self.tools.extend(math_tools) |
|
print("✅ Math tools initialized") |
|
except Exception as e: |
|
print(f"Warning: Could not initialize math tools: {e}") |
|
|
|
|
|
try: |
|
|
|
search_spec = TavilyToolSpec( |
|
api_key=os.getenv("TAVILY_API_KEY"), |
|
) |
|
search_tool = search_spec.to_tool_list() |
|
self.tools.extend(search_tool) |
|
print("✅ DuckDuckGo search tool initialized") |
|
except Exception as e: |
|
print(f"Warning: Could not initialize DuckDuckGo tool: {e}") |
|
|
|
try: |
|
|
|
wiki_spec = WikipediaToolSpec() |
|
wiki_tools = FunctionTool.from_defaults(wiki_spec.wikipedia_search, name="wikipedia_search", description="Search Wikipedia for information") |
|
self.tools.extend(wiki_tools) |
|
print("✅ Wikipedia tool initialized") |
|
except Exception as e: |
|
print(f"Warning: Could not initialize Wikipedia tool: {e}") |
|
|
|
""" try: |
|
# Web requests tool |
|
requests_spec = RequestsToolSpec() |
|
requests_tools = requests_spec.to_tool_list() |
|
self.tools.extend(requests_tools) |
|
print("✅ Web requests tool initialized") |
|
except Exception as e: |
|
print(f"Warning: Could not initialize requests tool: {e}") """ |
|
|
|
print(f"✅ Total {len(self.tools)} tools initialized") |
|
|
|
|
|
def _initialize_agent(self): |
|
"""Initialize the CodeAct Agent (deferred initialization).""" |
|
if not self.llm: |
|
print("Warning: No LLM available, using basic mode") |
|
self.agent = None |
|
self.context = None |
|
return |
|
|
|
|
|
self._agent_params = { |
|
|
|
'llm': self.llm, |
|
'tools': self.tools |
|
} |
|
self.agent = None |
|
self.context = None |
|
print("✅ CodeAct Agent parameters prepared (deferred initialization)") |
|
|
|
def _ensure_agent_initialized(self): |
|
"""Ensure the CodeAct agent is initialized when needed.""" |
|
if self.agent is None and hasattr(self, '_agent_params'): |
|
try: |
|
|
|
if hasattr(self, 'context') and self.context: |
|
try: |
|
|
|
self.context = None |
|
except: |
|
pass |
|
|
|
|
|
|
|
|
|
enhanced_prompt = f""" |
|
You are an intelligent AI assistant equipped with powerful tools to help solve problems. You must think step-by-step and use the available tools when needed. |
|
|
|
## AVAILABLE TOOLS: |
|
You have access to the following tools - use them strategically: |
|
|
|
### Mathematical Tools: |
|
- add_numbers(a, b): Add two numbers together |
|
- subtract_numbers(a, b): Subtract second number from first number |
|
- multiply_numbers(a, b): Multiply two numbers |
|
- divide_numbers(a, b): Divide first number by second number |
|
- power_numbers(a, b): Raise first number to the power of second number |
|
- calculate_percentage(value, percentage): Calculate percentage of a value |
|
- get_modulus(a, b): Get the modulus of two numbers |
|
|
|
### Research Tools: |
|
- tavily_search(query): Search the web for current information |
|
- wikipedia_search(query): Search Wikipedia for factual information |
|
|
|
## INSTRUCTIONS: |
|
1. **Read the question carefully** and identify what type of answer is needed |
|
2. **Think step-by-step** - break down complex problems into smaller parts |
|
3. **Use tools when necessary** - don't try to guess calculations or current information |
|
4. **For mathematical problems**: Use the math tools for accuracy |
|
5. **For factual questions**: Use search tools to get current/accurate information |
|
6. **Show your reasoning** - explain your thought process before the final answer |
|
|
|
## ANSWER FORMAT: |
|
Always end your response with exactly this format: |
|
FINAL ANSWER: [YOUR ANSWER] |
|
|
|
## FORMATTING RULES FOR FINAL ANSWER: |
|
- **Numbers**: Write plain numbers without commas, units, or symbols (unless specifically asked) |
|
- **Text**: Use simple words, no articles (a, an, the), no abbreviations |
|
- **Lists**: Comma-separated, following the above rules for each element |
|
- **Calculations**: Show the result as a plain number |
|
|
|
## EXAMPLES: |
|
Question: "What is 25% of 200?" |
|
Reasoning: I need to calculate 25% of 200 using the percentage tool. |
|
[Use calculate_percentage(200, 25)] |
|
FINAL ANSWER: 50 |
|
|
|
Question: "What is the capital of France?" |
|
Reasoning: This is a factual question about geography. |
|
[Use wikipedia_search("France capital")] |
|
FINAL ANSWER: Paris |
|
|
|
Remember: Think carefully, use tools when helpful, and always provide your final answer in the specified format. |
|
""" |
|
|
|
self.agent = FunctionAgent( |
|
tools=self.tools, |
|
llm=self.llm, |
|
system_prompt=enhanced_prompt, |
|
) |
|
print("✅ CodeAct Agent initialized (deferred)") |
|
|
|
except Exception as e: |
|
print(f"Error in deferred agent initialization: {e}") |
|
print("Continuing with fallback mode...") |
|
return False |
|
return self.agent is not None |
|
|
|
async def __call__(self, question: str) -> str: |
|
""" |
|
Main method that processes a question and returns an answer. |
|
""" |
|
print(f"Agent received question (first 100 chars): {question[:100]}...") |
|
|
|
|
|
self._ensure_agent_initialized() |
|
|
|
|
|
if self.agent: |
|
try: |
|
|
|
response = await self._async_agent_run(question) |
|
return response |
|
except Exception as e: |
|
print(f"Error with agent: {e}") |
|
return f"FINAL ANSWER: Error processing question - {str(e)}" |
|
else: |
|
return "FINAL ANSWER: Agent not properly initialized" |
|
|
|
|
|
async def _async_agent_run(self, question: str) -> str: |
|
"""Run the agent asynchronously.""" |
|
try: |
|
|
|
|
|
print("Agent running...") |
|
print(self.agent) |
|
handler = self.agent.run(question) |
|
|
|
iterationsNumber = 0 |
|
async for event in handler.stream_events(): |
|
iterationsNumber += 1 |
|
|
|
|
|
|
|
|
|
if isinstance(event, ToolCall): |
|
print(f"\n-----------\nevent:\n{event}") |
|
elif isinstance(event, AgentStream): |
|
print(f"{event.delta}", end="", flush=True) |
|
""" if iterationsNumber > 5: |
|
print("Too many iterations, stopping...") |
|
break """ |
|
response = await handler |
|
print(f'response.response: {response.response.content}') |
|
return response.response.content.split("FINAL ANSWER: ")[1] |
|
except Exception as e: |
|
print(f"Async agent error: {e}") |
|
return f"FINAL ANSWER: Error in agent processing - {str(e)}" |
|
|
|
|