Spaces:
Running
Running
File size: 28,276 Bytes
a23082c b8f6b7f a23082c b8f6b7f a23082c b8f6b7f a23082c b8f6b7f a23082c b8f6b7f a23082c b8f6b7f a23082c b8f6b7f a23082c b8f6b7f a23082c b8f6b7f a23082c b8f6b7f a23082c b8f6b7f a23082c |
|
import os
import time
import logging
import re # Import regex for video ID extraction
from typing import List, Optional, Dict # Added Dict
from llama_index.core.agent.workflow import ReActAgent
from llama_index.core.tools import FunctionTool
from llama_index.llms.google_genai import GoogleGenAI
from llama_index.tools.google import GoogleSearchToolSpec
from llama_index.tools.tavily_research import TavilyToolSpec
from llama_index.tools.wikipedia import WikipediaToolSpec
from llama_index.tools.duckduckgo import DuckDuckGoSearchToolSpec
from llama_index.tools.yahoo_finance import YahooFinanceToolSpec
from llama_index.tools.arxiv import ArxivToolSpec
# Attempt to import browser tools; handle import errors gracefully
try:
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.common.exceptions import WebDriverException, NoSuchElementException, TimeoutException
from helium import start_chrome, go_to, find_all, Text, kill_browser, get_driver, click, write, press
SELENIUM_AVAILABLE = True
except ImportError:
logging.warning("Selenium or Helium not installed. Browser interaction tools will be unavailable.")
SELENIUM_AVAILABLE = False
# Setup logging
logger = logging.getLogger(__name__)
# --- Browser Interaction Tools (Conditional on Selenium/Helium availability) ---
# Global browser instance (managed by initializer)
_browser_instance = None
_browser_driver = None
# Helper decorator for browser tool error handling and logging
def browser_tool_handler(func):
def wrapper(*args, **kwargs):
if not SELENIUM_AVAILABLE:
return "Error: Browser tools require Selenium and Helium to be installed."
if _browser_instance is None or _browser_driver is None:
# Attempt to initialize if not already done (e.g., if called directly)
# This is not ideal, initialization should happen via get_research_initializer()
logger.warning("Browser accessed before explicit initialization. Attempting to initialize now.")
try:
get_research_initializer() # This will initialize the browser
if _browser_instance is None or _browser_driver is None:
return "Error: Browser initialization failed."
except Exception as init_err:
return f"Error: Browser initialization failed: {init_err}"
func_name = func.__name__
logger.info(f"Executing browser tool: {func_name} with args: {args}, kwargs: {kwargs}")
try:
result = func(*args, **kwargs)
logger.info(f"Tool {func_name} executed successfully.")
# Ensure result is a string for consistency
return str(result) if result is not None else f"{func_name} completed."
except (NoSuchElementException, WebDriverException, TimeoutException) as e:
logger.warning(f"Browser error in {func_name}: {e.__class__.__name__} - {str(e).split()[0]}")
return f"Error in {func_name}: {e.__class__.__name__} - {str(e).split()[0]}"
except Exception as e:
logger.error(f"Unexpected error in {func_name}: {e}", exc_info=True)
return f"Unexpected error in {func_name}: {e}"
return wrapper
@browser_tool_handler
def visit(url: str, wait_seconds: float = 3.0) -> str:
"""Navigate the browser to the specified URL and wait for the page to load."""
logger.info(f"Navigating to {url} and waiting {wait_seconds}s...")
go_to(url)
time.sleep(wait_seconds) # Wait for dynamic content
current_url = _browser_driver.current_url
return f"Successfully navigated to: {current_url}"
@browser_tool_handler
def get_text_by_css(selector: str) -> List[str]:
"""Extract text from all elements matching a CSS selector. Use selector=\"body\" for all visible text."""
logger.info(f"Extracting text using CSS selector: {selector}")
if selector.lower() == "body":
# Helium Text() might be too broad, let's try body tag first
try:
body_element = _browser_driver.find_element(By.TAG_NAME, "body")
all_text = body_element.text.split("\n") # Split into lines
# Filter out empty lines
non_empty_text = [line.strip() for line in all_text if line.strip()]
logger.info(f"Extracted {len(non_empty_text)} lines of text from body.")
return non_empty_text
except NoSuchElementException:
logger.warning("Could not find body tag, falling back to Helium Text().")
elements = find_all(Text())
# Process Helium elements if fallback is used
texts = [elem.web_element.text for elem in elements if elem.web_element.is_displayed() and elem.web_element.text.strip()]
logger.info(f"Extracted {len(texts)} visible text elements using Helium Text().")
return texts
else:
# Use Selenium directly for more control
elements_selenium = _browser_driver.find_elements(By.CSS_SELECTOR, selector)
texts = [elem.text for elem in elements_selenium if elem.is_displayed() and elem.text.strip()]
logger.info(f"Extracted {len(texts)} visible text elements for selector {selector}.")
return texts
@browser_tool_handler
def get_page_html() -> str:
"""Return the full HTML source of the current page."""
logger.info("Retrieving page HTML source...")
return _browser_driver.page_source
@browser_tool_handler
def click_element_by_css(selector: str, index: int = 0) -> str:
"""Click on the Nth (0-based index) element matching the CSS selector."""
logger.info(f"Attempting to click element {index} matching selector: {selector}")
# Use Selenium directly for finding elements
elements_selenium = _browser_driver.find_elements(By.CSS_SELECTOR, selector)
if not elements_selenium:
raise NoSuchElementException(f"No elements found for selector: {selector}")
if index >= len(elements_selenium):
raise IndexError(f"Index {index} out of bounds. Only {len(elements_selenium)} elements found for selector: {selector}")
target_element = elements_selenium[index]
if not target_element.is_displayed() or not target_element.is_enabled():
logger.warning(f"Element {index} for selector {selector} is not visible or enabled. Attempting click anyway.")
# Try scrolling into view first
try:
_browser_driver.execute_script("arguments[0].scrollIntoView(true);", target_element)
time.sleep(0.5)
except Exception as scroll_err:
logger.warning(f"Could not scroll element into view: {scroll_err}")
# Use Helium click which might handle overlays better, passing the Selenium element
click(target_element)
time.sleep(1.5) # Increased wait after click
return f"Clicked element {index} matching selector {selector}. Current URL: {_browser_driver.current_url}"
@browser_tool_handler
def input_text_by_css(selector: str, text: str, index: int = 0, press_enter: bool = False) -> str:
"""Input text into the Nth (0-based index) element matching the CSS selector. Optionally press Enter."""
logger.info(f"Attempting to input text into element {index} matching selector: {selector}")
# Use Selenium directly for finding elements
elements_selenium = _browser_driver.find_elements(By.CSS_SELECTOR, selector)
if not elements_selenium:
raise NoSuchElementException(f"No elements found for selector: {selector}")
if index >= len(elements_selenium):
raise IndexError(f"Index {index} out of bounds. Only {len(elements_selenium)} elements found for selector: {selector}")
target_element = elements_selenium[index]
if not target_element.is_displayed() or not target_element.is_enabled():
logger.warning(f"Input element {index} for selector {selector} is not visible or enabled. Attempting input anyway.")
# Try scrolling into view
try:
_browser_driver.execute_script("arguments[0].scrollIntoView(true);", target_element)
time.sleep(0.5)
except Exception as scroll_err:
logger.warning(f"Could not scroll input element into view: {scroll_err}")
# Use Helium write, passing the Selenium element
write(text, into=target_element)
time.sleep(0.5)
if press_enter:
press(Keys.ENTER)
time.sleep(1.5) # Wait longer if Enter was pressed
return f"Input text into element {index} ({selector}) and pressed Enter. Current URL: {_browser_driver.current_url}"
else:
return f"Input text into element {index} ({selector})."
@browser_tool_handler
def scroll_page(direction: str = "down", amount: str = "page") -> str:
"""Scroll the page up or down by a specified amount ('page', 'top', 'bottom', or pixels)."""
logger.info(f"Scrolling {direction} by {amount}")
if direction not in ["up", "down"]:
raise ValueError("Direction must be \"up\" or \"down\".")
if amount == "page":
scroll_script = "window.scrollBy(0, window.innerHeight);" if direction == "down" else "window.scrollBy(0, -window.innerHeight);"
elif amount == "top":
scroll_script = "window.scrollTo(0, 0);"
elif amount == "bottom":
scroll_script = "window.scrollTo(0, document.body.scrollHeight);"
else:
try:
pixels = int(amount)
scroll_script = f"window.scrollBy(0, {pixels});" if direction == "down" else f"window.scrollBy(0, {-pixels});"
except ValueError:
raise ValueError("Amount must be \"page\", \"top\", \"bottom\", or a number of pixels.")
_browser_driver.execute_script(scroll_script)
time.sleep(1) # Wait for scroll effects
return f"Scrolled {direction} by {amount}."
@browser_tool_handler
def go_back() -> str:
"""Navigate the browser back one step in its history."""
logger.info("Navigating back...")
_browser_driver.back()
time.sleep(1.5) # Wait after navigation
return f"Navigated back. Current URL: {_browser_driver.current_url}"
@browser_tool_handler
def close_popups() -> str:
"""Send an ESC keypress to attempt to dismiss modals or pop-ups."""
logger.info("Sending ESC key...")
webdriver.ActionChains(_browser_driver).send_keys(Keys.ESCAPE).perform()
time.sleep(0.5)
return "Sent ESC key press."
def answer_question(question: str) -> str:
"""
Answer any question by following this strict format:
1. Include your chain of thought (your reasoning steps).
2. End your reply with the exact template:
FINAL ANSWER: [YOUR FINAL ANSWER]
YOUR FINAL ANSWER must be:
- A number, or
- As few words as possible, or
- A comma-separated list of numbers and/or strings.
Formatting rules:
* If asked for a number, do not use commas or units (e.g., $, %), unless explicitly requested.
* If asked for a string, do not include articles or abbreviations (e.g., city names), and write digits in plain text.
* If asked for a comma-separated list, apply the above rules to each element.
This tool should be invoked immediately after completing the final planning sub-step.
"""
logger.info(f"Answering question: {question[:100]}")
gemini_api_key = os.getenv("GEMINI_API_KEY")
if not gemini_api_key:
logger.error("GEMINI_API_KEY not set for answer_question tool.")
return "Error: GEMINI_API_KEY not set."
model_name = os.getenv("ANSWER_TOOL_LLM_MODEL", "models/gemini-1.5-pro")
# Build the assistant prompt enforcing the required format
assistant_prompt = (
"You are a general AI assistant. I will ask you a question. "
"Report your thoughts, and finish your answer with the following template: "
"FINAL ANSWER: [YOUR FINAL ANSWER]. "
"YOUR FINAL ANSWER should be a number OR as few words as possible "
"OR a comma separated list of numbers and/or strings. "
"If you are asked for a number, don't use commas for thousands or any units like $ or % unless specified. "
"If you are asked for a string, omit articles and abbreviations, and write digits in plain text. "
"If you are asked for a comma separated list, apply these rules to each element.\n\n"
f"Question: {question}\n"
"Answer:"
)
try:
llm = GoogleGenAI(api_key=gemini_api_key, model=model_name)
logger.info(f"Using answer LLM: {model_name}")
response = llm.complete(assistant_prompt)
logger.info("Answer generated successfully.")
return response.text
except Exception as e:
logger.error(f"LLM call failed during answer generation: {e}", exc_info=True)
return f"Error during answer generation: {e}"
# --- Agent Initializer Class ---
class ResearchAgentInitializer:
def __init__(self):
logger.info("Initializing ResearchAgent resources...")
self.llm = None
self.browser_tools = []
self.search_tools = []
self.datasource_tools = []
# Initialize LLM
self._initialize_llm()
# Initialize Browser (conditionally)
if SELENIUM_AVAILABLE:
self._initialize_browser()
self._create_browser_tools()
else:
logger.warning("Browser tools are disabled as Selenium/Helium are not available.")
# Initialize Search/Datasource Tools
self._create_search_tools()
self._create_datasource_tools()
self.answer_question = FunctionTool.from_defaults(
fn=answer_question,
name="answer_question",
description=(
"Use this tool to answer any question, reporting your reasoning steps and ending with 'FINAL ANSWER: ...'. "
"Invoke this tool immediately after the final sub-step of planning is complete."
),
)
logger.info("ResearchAgent resources initialized.")
def _initialize_llm(self):
agent_llm_model = os.getenv("RESEARCH_AGENT_LLM_MODEL", "models/gemini-1.5-pro")
gemini_api_key = os.getenv("GEMINI_API_KEY")
if not gemini_api_key:
logger.error("GEMINI_API_KEY not found for ResearchAgent LLM.")
raise ValueError("GEMINI_API_KEY must be set for ResearchAgent")
try:
self.llm = GoogleGenAI(api_key=gemini_api_key, model=agent_llm_model)
logger.info(f"ResearchAgent LLM initialized: {agent_llm_model}")
except Exception as e:
logger.error(f"Failed to initialize ResearchAgent LLM: {e}", exc_info=True)
raise
def _initialize_browser(self):
global _browser_instance, _browser_driver
if _browser_instance is None:
logger.info("Initializing browser (Chrome headless)...")
try:
chrome_options = webdriver.ChromeOptions()
# Configurable options from env vars
if os.getenv("RESEARCH_AGENT_CHROME_NO_SANDBOX", "true").lower() == "true":
chrome_options.add_argument("--no-sandbox")
if os.getenv("RESEARCH_AGENT_CHROME_DISABLE_DEV_SHM", "true").lower() == "true":
chrome_options.add_argument("--disable-dev-shm-usage")
# Add prefs for downloads/popups
chrome_options.add_experimental_option("prefs", {
"download.prompt_for_download": False,
"plugins.always_open_pdf_externally": True,
"profile.default_content_settings.popups": 0
})
# Start Chrome using Helium
_browser_instance = start_chrome(headless=True, options=chrome_options)
_browser_driver = get_driver() # Get the underlying Selenium driver
logger.info("Browser initialized successfully.")
except Exception as e:
logger.error(f"Failed to initialize browser: {e}", exc_info=True)
# Set flags to prevent tool usage
global SELENIUM_AVAILABLE
SELENIUM_AVAILABLE = False
_browser_instance = None
_browser_driver = None
def _create_browser_tools(self):
if not SELENIUM_AVAILABLE:
self.browser_tools = []
return
self.browser_tools = [
FunctionTool.from_defaults(fn=visit, name="visit_url"), # Renamed for clarity
FunctionTool.from_defaults(fn=get_text_by_css, name="get_text_by_css"),
# FunctionTool.from_defaults(fn=get_page_html, name="get_page_html"),
FunctionTool.from_defaults(fn=click_element_by_css, name="click_element_by_css"),
FunctionTool.from_defaults(fn=input_text_by_css, name="input_text_by_css"),
FunctionTool.from_defaults(fn=scroll_page, name="scroll_page"),
FunctionTool.from_defaults(fn=go_back, name="navigate_back"), # Renamed
FunctionTool.from_defaults(fn=close_popups, name="close_popups"),
]
for tool in self.browser_tools:
tool.metadata.description = f"(Browser) {tool.metadata.description}"
logger.info(f"Created {len(self.browser_tools)} browser interaction tools.")
def _create_search_tools(self):
self.search_tools = []
# Google Search
google_spec = GoogleSearchToolSpec(key=os.getenv("GOOGLE_API_KEY"), engine=os.getenv("GOOGLE_CSE_ID"))
if google_spec:
google_tool = FunctionTool.from_defaults(fn=google_spec.google_search, name="google_search")
google_tool.metadata.description = "(Search) Execute a Google Custom Search query. Returns structured results."
self.search_tools.append(google_tool)
# Tavily Search
tavily_spec = TavilyToolSpec(api_key=os.getenv("TAVILY_API_KEY"))
if tavily_spec:
# Use search method which is more general
tavily_tool = FunctionTool.from_defaults(fn=tavily_spec.search, name="tavily_search")
tavily_tool.metadata.description = "(Search) Perform a deep research search using Tavily API. Good for finding documents/articles."
self.search_tools.append(tavily_tool)
# DuckDuckGo Search
ddg_spec = DuckDuckGoSearchToolSpec()
if ddg_spec:
ddg_tool = FunctionTool.from_defaults(fn=ddg_spec.duckduckgo_full_search, name="duckduckgo_search")
ddg_tool.metadata.description = "(Search) Execute a DuckDuckGo search. Returns structured results."
self.search_tools.append(ddg_tool)
logger.info(f"Created {len(self.search_tools)} search engine tools.")
def _create_datasource_tools(self):
self.datasource_tools = []
# Wikipedia
wiki_spec = WikipediaToolSpec()
if wiki_spec:
wiki_search_tool = FunctionTool.from_defaults(fn=wiki_spec.search_data, name="wikipedia_search_pages")
wiki_search_tool.metadata.description = "(Wikipedia) Search for Wikipedia page titles matching a query."
wiki_load_tool = FunctionTool.from_defaults(fn=wiki_spec.load_data, name="wikipedia_load_page")
wiki_load_tool.metadata.description = "(Wikipedia) Load the full content of a specific Wikipedia page title."
self.datasource_tools.extend([wiki_search_tool, wiki_load_tool])
# Yahoo Finance
yf_spec = YahooFinanceToolSpec()
if yf_spec:
yf_tools_map = {
"balance_sheet": "Get the latest balance sheet for a stock ticker.",
"income_statement": "Get the latest income statement for a stock ticker.",
"cash_flow": "Get the latest cash flow statement for a stock ticker.",
"stock_basic_info": "Get basic info (price, market cap, summary) for a stock ticker.",
"stock_analyst_recommendations": "Get analyst recommendations for a stock ticker.",
"stock_news": "Get recent news headlines for a stock ticker."
}
for func_name, desc in yf_tools_map.items():
if hasattr(yf_spec, func_name):
tool = FunctionTool.from_defaults(fn=getattr(yf_spec, func_name), name=f"yahoo_finance_{func_name}")
tool.metadata.description = f"(YahooFinance) {desc}"
self.datasource_tools.append(tool)
else:
logger.warning(f"YahooFinance function {func_name} not found in spec.")
# ArXiv
arxiv_spec = ArxivToolSpec()
if arxiv_spec:
arxiv_tool = FunctionTool.from_defaults(fn=arxiv_spec.arxiv_query, name="arxiv_search")
arxiv_tool.metadata.description = "(ArXiv) Search ArXiv for academic papers matching a query."
self.datasource_tools.append(arxiv_tool)
logger.info(f"Created {len(self.datasource_tools)} specific data source tools.")
def get_agent(self) -> ReActAgent:
"""Creates and returns the configured ReActAgent for research."""
logger.info("Creating ResearchAgent ReActAgent instance...")
all_tools = self.browser_tools + self.search_tools + self.datasource_tools
all_tools.append(self.answer_question)
if not all_tools:
logger.warning("No tools available for ResearchAgent. It will likely be unable to function.")
# System prompt (consider loading from file)
# Updated prompt to include YouTube tool
system_prompt = """\
You are ResearchAgent, an autonomous web research assistant. Your goal is to gather information accurately and efficiently using the available tools.
Available Tool Categories:
- (Browser): Tools for direct web page interaction (visiting URLs, clicking, scrolling, extracting text/HTML, inputting text).
- (Search): Tools for querying search engines (Google, DuckDuckGo, Tavily).
- (Wikipedia): Tools for searching and loading Wikipedia pages.
- (YahooFinance): Tools for retrieving financial data (balance sheets, income statements, stock info, news).
- (ArXiv): Tool for searching academic papers on ArXiv.
- (Answer): `answer_question` — use this when your research has yielded a definitive result and you need to reply in the strict “FINAL ANSWER” format.
**Answer Tool Usage**
When you know the final answer and no further data is required, invoke `answer_question` with the user’s query. It will return text ending with:
FINAL ANSWER: [YOUR FINAL ANSWER]
Formatting rules for **YOUR FINAL ANSWER**:
- A single number, or
- As few words as possible, or
- A comma-separated list of numbers and/or strings.
- If numeric: no thousands separators or units (%, $, etc.) unless explicitly requested.
- If string: omit articles and abbreviations; write digits in plain text.
- If a list: apply the above rules to each element.
**Workflow:**
1. **Thought**: Analyze the research goal. Break it down if necessary. Choose the *single best tool* for the *next immediate step*. Explain your choice.
2. **Action**: Call the chosen tool with the correct arguments. Ensure inputs match the tool's requirements.
3. **Observation**: Examine the tool's output. Extract the relevant information. Check for errors.
4. **Reflect & Iterate**: Does the observation satisfy the immediate goal? If not, return to step 1. If a tool failed, try an alternative approach.
5. **Advanced Validation**: Before delivering any final response, invoke `advanced_validation_agent` with the combined insights from the reasoning and planning phases. If validation fails, pass the feedback back into **planner_agent** to refine the approach and repeat validation.
6. **Synthesize**: Once validation is approved, synthesize all gathered information into a coherent answer.
7. **Respond**: Invoke `answer_question` to emit the **FINAL ANSWER** according to the strict template rules.
**Constraints:**
- Use only one tool per Action step.
- Think step-by-step.
- If using browser tools, start with `visit_url`.
- Synthesize results *before* handing off or responding.
- Do not skip any workflow step (reason → action → observation → reflect → validate → synthesize → respond).
"""
agent = ReActAgent(
name="research_agent",
description=(
"Performs web research using browser interaction, search engines (Google, DDG, Tavily), "
"specific data sources (Wikipedia, YahooFinance, ArXiv), and YouTube transcript fetching. Follows Thought-Action-Observation loop."
),
tools=all_tools,
llm=self.llm,
system_prompt=system_prompt,
can_handoff_to=[
"code_agent",
"math_agent",
"text_analyzer_agent", # Added based on original prompt
"advanced_validation_agent",
"long_context_management_agent"
"planner_agent",
"reasoning_agent"
],
)
logger.info("ResearchAgent ReActAgent instance created.")
return agent
def close_browser(self):
"""Closes the browser instance if it was initialized."""
global _browser_instance, _browser_driver
if _browser_instance:
logger.info("Closing browser instance...")
try:
kill_browser() # Use Helium's function
logger.info("Browser closed successfully.")
except Exception as e:
logger.error(f"Error closing browser: {e}", exc_info=True)
finally:
_browser_instance = None
_browser_driver = None
else:
logger.info("No active browser instance to close.")
# --- Singleton Initializer Instance ---
_research_agent_initializer_instance = None
def get_research_initializer():
"""Gets the singleton instance of ResearchAgentInitializer."""
global _research_agent_initializer_instance
if _research_agent_initializer_instance is None:
logger.info("Instantiating ResearchAgentInitializer for the first time.")
_research_agent_initializer_instance = ResearchAgentInitializer()
return _research_agent_initializer_instance
# --- Public Initialization Function ---
def initialize_research_agent() -> ReActAgent:
"""Initializes and returns the Research Agent using a singleton initializer."""
logger.info("initialize_research_agent called.")
initializer = get_research_initializer()
return initializer.get_agent()
# --- Cleanup Function (Optional but recommended) ---
def cleanup_research_agent_resources():
"""Cleans up resources used by the research agent, like the browser."""
logger.info("Cleaning up research agent resources...")
initializer = get_research_initializer() # Ensure it exists
initializer.close_browser()
# Example usage (for testing if run directly)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger.info("Running research_agent.py directly for testing...")
# Check required keys
required_keys = ["GEMINI_API_KEY"] # Others are optional depending on tools needed
missing_keys = [key for key in required_keys if not os.getenv(key)]
if missing_keys:
print(f"Error: Required environment variable(s) not set: {', '.join(missing_keys)}. Cannot run test.")
else:
# Warn about optional keys
optional_keys = ["GOOGLE_API_KEY", "GOOGLE_CSE_ID", "TAVILY_API_KEY", "WOLFRAM_ALPHA_APP_ID"]
missing_optional = [key for key in optional_keys if not os.getenv(key)]
if missing_optional:
print(f"Warning: Optional environment variable(s) not set: {', '.join(missing_optional)}. Some tools may be unavailable.")
|