|
""" |
|
Intelligent AI Agent using LlamaIndex with websearch capabilities |
|
This module contains the agent class with advanced tools and reasoning. |
|
""" |
|
|
|
import os |
|
import asyncio |
|
import io |
|
import contextlib |
|
import ast |
|
import traceback |
|
from typing import Any, Dict, Tuple, List |
|
|
|
|
|
try: |
|
from dotenv import load_dotenv |
|
load_dotenv() |
|
print("✅ .env file loaded successfully") |
|
except ImportError: |
|
print("⚠️ python-dotenv not available, .env file not loaded") |
|
except Exception as e: |
|
print(f"⚠️ Error loading .env file: {e}") |
|
|
|
|
|
try: |
|
from llama_index.core.agent.workflow import ( |
|
ToolCall, |
|
ToolCallResult, |
|
FunctionAgent, |
|
AgentStream, |
|
) |
|
|
|
from llama_index.core.tools import FunctionTool |
|
from llama_index.tools.wikipedia import WikipediaToolSpec |
|
from llama_index.tools.tavily_research.base import TavilyToolSpec |
|
|
|
from llama_index.llms.bedrock_converse import BedrockConverse |
|
|
|
LLAMA_INDEX_AVAILABLE = True |
|
except ImportError as e: |
|
print(f"LlamaIndex imports not available: {e}") |
|
LLAMA_INDEX_AVAILABLE = False |
|
|
|
|
|
|
|
class BasicAgent: |
|
""" |
|
Advanced AI Agent using LlamaIndex with CodeAct capabilities and multiple tools. |
|
""" |
|
|
|
def __init__(self): |
|
"""Initialize the agent with LLM, tools, and code executor.""" |
|
print("Initializing Advanced AI Agent with LlamaIndex...") |
|
|
|
|
|
self.hf_token = os.getenv("HUGGINGFACE_TOKEN") |
|
if not self.hf_token: |
|
print("Warning: HUGGINGFACE_TOKEN not found. Using default model.") |
|
|
|
|
|
self._initialize_llm() |
|
|
|
|
|
self._initialize_tools() |
|
|
|
|
|
|
|
|
|
|
|
self._initialize_agent() |
|
|
|
print("Advanced AI Agent initialized successfully.") |
|
|
|
def _initialize_llm(self): |
|
"""Initialize the Hugging Face LLM.""" |
|
if not LLAMA_INDEX_AVAILABLE: |
|
print("LlamaIndex not available, using basic mode") |
|
self.llm = None |
|
return |
|
|
|
try: |
|
|
|
|
|
self.llm = BedrockConverse( |
|
model="amazon.nova-pro-v1:0", |
|
temperature=0.5, |
|
aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"), |
|
aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"), |
|
region_name=os.getenv("AWS_REGION"), |
|
|
|
) |
|
|
|
print("✅ LLM initialized successfully") |
|
except Exception as e: |
|
print(f"Error initializing LLM: {e}") |
|
|
|
self.llm = None |
|
|
|
def _initialize_tools(self): |
|
"""Initialize all available tools.""" |
|
self.tools = [] |
|
|
|
|
|
self.math_functions = { |
|
'add': lambda a, b: a + b, |
|
'subtract': lambda a, b: a - b, |
|
'multiply': lambda a, b: a * b, |
|
'divide': lambda a, b: a / b if b != 0 else "Error: Division by zero", |
|
'power': lambda a, b: a ** b, |
|
'percentage': lambda v, p: (v * p) / 100, |
|
} |
|
|
|
if not LLAMA_INDEX_AVAILABLE: |
|
print("Tools initialization skipped - LlamaIndex not available") |
|
return |
|
|
|
|
|
def add_numbers(a: float, b: float) -> float: |
|
"""Add two numbers together.""" |
|
return a + b |
|
|
|
def subtract_numbers(a: float, b: float) -> float: |
|
"""Subtract second number from first number.""" |
|
return a - b |
|
|
|
def multiply_numbers(a: float, b: float) -> float: |
|
"""Multiply two numbers.""" |
|
return a * b |
|
|
|
def divide_numbers(a: float, b: float) -> float: |
|
"""Divide first number by second number.""" |
|
if b == 0: |
|
return "Error: Division by zero" |
|
return a / b |
|
|
|
def power_numbers(a: float, b: float) -> float: |
|
"""Raise first number to the power of second number.""" |
|
return a ** b |
|
|
|
def calculate_percentage(value: float, percentage: float) -> float: |
|
"""Calculate percentage of a value.""" |
|
return (value * percentage) / 100 |
|
|
|
def get_modulus(a: float, b: float) -> float: |
|
"""Get the modulus of two numbers.""" |
|
return a % b |
|
|
|
|
|
try: |
|
math_tools = [ |
|
FunctionTool.from_defaults(fn=add_numbers, name="add_numbers", description="Add two numbers together"), |
|
FunctionTool.from_defaults(fn=subtract_numbers, name="subtract_numbers", description="Subtract second number from first number"), |
|
FunctionTool.from_defaults(fn=multiply_numbers, name="multiply_numbers", description="Multiply two numbers"), |
|
FunctionTool.from_defaults(fn=divide_numbers, name="divide_numbers", description="Divide first number by second number"), |
|
FunctionTool.from_defaults(fn=power_numbers, name="power_numbers", description="Raise first number to the power of second number"), |
|
FunctionTool.from_defaults(fn=calculate_percentage, name="calculate_percentage", description="Calculate percentage of a value"), |
|
FunctionTool.from_defaults(fn=get_modulus, name="get_modulus", description="Get the modulus of two numbers"), |
|
] |
|
self.tools.extend(math_tools) |
|
print("✅ Math tools initialized") |
|
except Exception as e: |
|
print(f"Warning: Could not initialize math tools: {e}") |
|
|
|
|
|
try: |
|
|
|
search_spec = TavilyToolSpec( |
|
api_key=os.getenv("TAVILY_API_KEY"), |
|
) |
|
search_tool = search_spec.to_tool_list() |
|
self.tools.extend(search_tool) |
|
print("✅ DuckDuckGo search tool initialized") |
|
except Exception as e: |
|
print(f"Warning: Could not initialize DuckDuckGo tool: {e}") |
|
|
|
""" try: |
|
# Wikipedia search |
|
wiki_spec = WikipediaToolSpec() |
|
wiki_tools = FunctionTool.from_defaults(wiki_spec.wikipedia_search, name="wikipedia_search", description="Search Wikipedia for information") |
|
self.tools.extend(wiki_tools) |
|
print("✅ Wikipedia tool initialized") |
|
except Exception as e: |
|
print(f"Warning: Could not initialize Wikipedia tool: {e}") """ |
|
|
|
""" try: |
|
# Web requests tool |
|
requests_spec = RequestsToolSpec() |
|
requests_tools = requests_spec.to_tool_list() |
|
self.tools.extend(requests_tools) |
|
print("✅ Web requests tool initialized") |
|
except Exception as e: |
|
print(f"Warning: Could not initialize requests tool: {e}") """ |
|
|
|
print(f"✅ Total {len(self.tools)} tools initialized") |
|
|
|
|
|
def _initialize_agent(self): |
|
"""Initialize the CodeAct Agent (deferred initialization).""" |
|
if not self.llm: |
|
print("Warning: No LLM available, using basic mode") |
|
self.agent = None |
|
self.context = None |
|
return |
|
|
|
|
|
self._agent_params = { |
|
|
|
'llm': self.llm, |
|
'tools': self.tools |
|
} |
|
self.agent = None |
|
self.context = None |
|
print("✅ CodeAct Agent parameters prepared (deferred initialization)") |
|
|
|
def _ensure_agent_initialized(self): |
|
"""Ensure the CodeAct agent is initialized when needed.""" |
|
if self.agent is None and hasattr(self, '_agent_params'): |
|
try: |
|
|
|
if hasattr(self, 'context') and self.context: |
|
try: |
|
|
|
self.context = None |
|
except: |
|
pass |
|
|
|
|
|
|
|
|
|
enhanced_prompt = f""" |
|
You are a helpful assistant tasked with answering questions using a set of tools. |
|
Now, I will ask you a question. Report your thoughts, and finish your answer with the following template: |
|
FINAL ANSWER: [YOUR FINAL ANSWER]. |
|
YOUR FINAL ANSWER should be a number OR as few words as possible OR a comma separated list of numbers and/or strings. If you are asked for a number, don't use comma to write your number neither use units such as $ or percent sign unless specified otherwise. If you are asked for a string, don't use articles, neither abbreviations (e.g. for cities), and write the digits in plain text unless specified otherwise. If you are asked for a comma separated list, apply the above rules depending of whether the element to be put in the list is a number or a string. |
|
Your answer should only start with "FINAL ANSWER: ", then follows with the answer. |
|
""" |
|
|
|
self.agent = FunctionAgent( |
|
tools=self.tools, |
|
llm=self.llm, |
|
system_prompt=enhanced_prompt, |
|
) |
|
print("✅ CodeAct Agent initialized (deferred)") |
|
|
|
except Exception as e: |
|
print(f"Error in deferred agent initialization: {e}") |
|
print("Continuing with fallback mode...") |
|
return False |
|
return self.agent is not None |
|
|
|
async def __call__(self, question: str) -> str: |
|
""" |
|
Main method that processes a question and returns an answer. |
|
""" |
|
print(f"Agent received question (first 100 chars): {question[:100]}...") |
|
|
|
|
|
self._ensure_agent_initialized() |
|
|
|
|
|
if self.agent: |
|
try: |
|
|
|
response = await self._async_agent_run(question) |
|
return response |
|
except Exception as e: |
|
print(f"Error with agent: {e}") |
|
return f"FINAL ANSWER: Error processing question - {str(e)}" |
|
else: |
|
return "FINAL ANSWER: Agent not properly initialized" |
|
|
|
|
|
async def _async_agent_run(self, question: str) -> str: |
|
"""Run the agent asynchronously.""" |
|
try: |
|
|
|
|
|
print("Agent running...") |
|
print(self.agent) |
|
handler = self.agent.run(question) |
|
|
|
iterationsNumber = 0 |
|
async for event in handler.stream_events(): |
|
iterationsNumber += 1 |
|
|
|
|
|
|
|
|
|
if isinstance(event, ToolCall): |
|
print(f"\n-----------\nevent:\n{event}") |
|
elif isinstance(event, AgentStream): |
|
print(f"{event.delta}", end="", flush=True) |
|
""" if iterationsNumber > 5: |
|
print("Too many iterations, stopping...") |
|
break """ |
|
response = await handler |
|
print(f'response.response: {response.response.content}') |
|
return response.response.content.split("FINAL ANSWER: ")[1] |
|
except Exception as e: |
|
print(f"Async agent error: {e}") |
|
return f"FINAL ANSWER: Error in agent processing - {str(e)}" |
|
|
|
|