Spaces:
Running
Running
import os | |
import time | |
import logging | |
import re # Import regex for video ID extraction | |
from typing import List, Optional, Dict # Added Dict | |
from llama_index.core.agent.workflow import ReActAgent | |
from llama_index.core.tools import FunctionTool | |
from llama_index.llms.google_genai import GoogleGenAI | |
from llama_index.tools.google import GoogleSearchToolSpec | |
from llama_index.tools.tavily_research import TavilyToolSpec | |
from llama_index.tools.wikipedia import WikipediaToolSpec | |
from llama_index.tools.duckduckgo import DuckDuckGoSearchToolSpec | |
from llama_index.tools.yahoo_finance import YahooFinanceToolSpec | |
from llama_index.tools.arxiv import ArxivToolSpec | |
# Attempt to import browser tools; handle import errors gracefully | |
try: | |
from selenium import webdriver | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.common.keys import Keys | |
from selenium.common.exceptions import WebDriverException, NoSuchElementException, TimeoutException | |
from helium import start_chrome, go_to, find_all, Text, kill_browser, get_driver, click, write, press | |
SELENIUM_AVAILABLE = True | |
except ImportError: | |
logging.warning("Selenium or Helium not installed. Browser interaction tools will be unavailable.") | |
SELENIUM_AVAILABLE = False | |
# Setup logging | |
logger = logging.getLogger(__name__) | |
# --- Browser Interaction Tools (Conditional on Selenium/Helium availability) --- | |
# Global browser instance (managed by initializer) | |
_browser_instance = None | |
_browser_driver = None | |
# Helper decorator for browser tool error handling and logging | |
def browser_tool_handler(func): | |
def wrapper(*args, **kwargs): | |
if not SELENIUM_AVAILABLE: | |
return "Error: Browser tools require Selenium and Helium to be installed." | |
if _browser_instance is None or _browser_driver is None: | |
# Attempt to initialize if not already done (e.g., if called directly) | |
# This is not ideal, initialization should happen via get_research_initializer() | |
logger.warning("Browser accessed before explicit initialization. Attempting to initialize now.") | |
try: | |
get_research_initializer() # This will initialize the browser | |
if _browser_instance is None or _browser_driver is None: | |
return "Error: Browser initialization failed." | |
except Exception as init_err: | |
return f"Error: Browser initialization failed: {init_err}" | |
func_name = func.__name__ | |
logger.info(f"Executing browser tool: {func_name} with args: {args}, kwargs: {kwargs}") | |
try: | |
result = func(*args, **kwargs) | |
logger.info(f"Tool {func_name} executed successfully.") | |
# Ensure result is a string for consistency | |
return str(result) if result is not None else f"{func_name} completed." | |
except (NoSuchElementException, WebDriverException, TimeoutException) as e: | |
logger.warning(f"Browser error in {func_name}: {e.__class__.__name__} - {str(e).split()[0]}") | |
return f"Error in {func_name}: {e.__class__.__name__} - {str(e).split()[0]}" | |
except Exception as e: | |
logger.error(f"Unexpected error in {func_name}: {e}", exc_info=True) | |
return f"Unexpected error in {func_name}: {e}" | |
return wrapper | |
def visit(url: str, wait_seconds: float = 3.0) -> str: | |
"""Navigate the browser to the specified URL and wait for the page to load.""" | |
logger.info(f"Navigating to {url} and waiting {wait_seconds}s...") | |
go_to(url) | |
time.sleep(wait_seconds) # Wait for dynamic content | |
current_url = _browser_driver.current_url | |
return f"Successfully navigated to: {current_url}" | |
def get_text_by_css(selector: str) -> List[str]: | |
"""Extract text from all elements matching a CSS selector. Use selector=\"body\" for all visible text.""" | |
logger.info(f"Extracting text using CSS selector: {selector}") | |
if selector.lower() == "body": | |
# Helium Text() might be too broad, let's try body tag first | |
try: | |
body_element = _browser_driver.find_element(By.TAG_NAME, "body") | |
all_text = body_element.text.split("\n") # Split into lines | |
# Filter out empty lines | |
non_empty_text = [line.strip() for line in all_text if line.strip()] | |
logger.info(f"Extracted {len(non_empty_text)} lines of text from body.") | |
return non_empty_text | |
except NoSuchElementException: | |
logger.warning("Could not find body tag, falling back to Helium Text().") | |
elements = find_all(Text()) | |
# Process Helium elements if fallback is used | |
texts = [elem.web_element.text for elem in elements if elem.web_element.is_displayed() and elem.web_element.text.strip()] | |
logger.info(f"Extracted {len(texts)} visible text elements using Helium Text().") | |
return texts | |
else: | |
# Use Selenium directly for more control | |
elements_selenium = _browser_driver.find_elements(By.CSS_SELECTOR, selector) | |
texts = [elem.text for elem in elements_selenium if elem.is_displayed() and elem.text.strip()] | |
logger.info(f"Extracted {len(texts)} visible text elements for selector {selector}.") | |
return texts | |
def get_page_html() -> str: | |
"""Return the full HTML source of the current page.""" | |
logger.info("Retrieving page HTML source...") | |
return _browser_driver.page_source | |
def click_element_by_css(selector: str, index: int = 0) -> str: | |
"""Click on the Nth (0-based index) element matching the CSS selector.""" | |
logger.info(f"Attempting to click element {index} matching selector: {selector}") | |
# Use Selenium directly for finding elements | |
elements_selenium = _browser_driver.find_elements(By.CSS_SELECTOR, selector) | |
if not elements_selenium: | |
raise NoSuchElementException(f"No elements found for selector: {selector}") | |
if index >= len(elements_selenium): | |
raise IndexError(f"Index {index} out of bounds. Only {len(elements_selenium)} elements found for selector: {selector}") | |
target_element = elements_selenium[index] | |
if not target_element.is_displayed() or not target_element.is_enabled(): | |
logger.warning(f"Element {index} for selector {selector} is not visible or enabled. Attempting click anyway.") | |
# Try scrolling into view first | |
try: | |
_browser_driver.execute_script("arguments[0].scrollIntoView(true);", target_element) | |
time.sleep(0.5) | |
except Exception as scroll_err: | |
logger.warning(f"Could not scroll element into view: {scroll_err}") | |
# Use Helium click which might handle overlays better, passing the Selenium element | |
click(target_element) | |
time.sleep(1.5) # Increased wait after click | |
return f"Clicked element {index} matching selector {selector}. Current URL: {_browser_driver.current_url}" | |
def input_text_by_css(selector: str, text: str, index: int = 0, press_enter: bool = False) -> str: | |
"""Input text into the Nth (0-based index) element matching the CSS selector. Optionally press Enter.""" | |
logger.info(f"Attempting to input text into element {index} matching selector: {selector}") | |
# Use Selenium directly for finding elements | |
elements_selenium = _browser_driver.find_elements(By.CSS_SELECTOR, selector) | |
if not elements_selenium: | |
raise NoSuchElementException(f"No elements found for selector: {selector}") | |
if index >= len(elements_selenium): | |
raise IndexError(f"Index {index} out of bounds. Only {len(elements_selenium)} elements found for selector: {selector}") | |
target_element = elements_selenium[index] | |
if not target_element.is_displayed() or not target_element.is_enabled(): | |
logger.warning(f"Input element {index} for selector {selector} is not visible or enabled. Attempting input anyway.") | |
# Try scrolling into view | |
try: | |
_browser_driver.execute_script("arguments[0].scrollIntoView(true);", target_element) | |
time.sleep(0.5) | |
except Exception as scroll_err: | |
logger.warning(f"Could not scroll input element into view: {scroll_err}") | |
# Use Helium write, passing the Selenium element | |
write(text, into=target_element) | |
time.sleep(0.5) | |
if press_enter: | |
press(Keys.ENTER) | |
time.sleep(1.5) # Wait longer if Enter was pressed | |
return f"Input text into element {index} ({selector}) and pressed Enter. Current URL: {_browser_driver.current_url}" | |
else: | |
return f"Input text into element {index} ({selector})." | |
def scroll_page(direction: str = "down", amount: str = "page") -> str: | |
"""Scroll the page up or down by a specified amount ('page', 'top', 'bottom', or pixels).""" | |
logger.info(f"Scrolling {direction} by {amount}") | |
if direction not in ["up", "down"]: | |
raise ValueError("Direction must be \"up\" or \"down\".") | |
if amount == "page": | |
scroll_script = "window.scrollBy(0, window.innerHeight);" if direction == "down" else "window.scrollBy(0, -window.innerHeight);" | |
elif amount == "top": | |
scroll_script = "window.scrollTo(0, 0);" | |
elif amount == "bottom": | |
scroll_script = "window.scrollTo(0, document.body.scrollHeight);" | |
else: | |
try: | |
pixels = int(amount) | |
scroll_script = f"window.scrollBy(0, {pixels});" if direction == "down" else f"window.scrollBy(0, {-pixels});" | |
except ValueError: | |
raise ValueError("Amount must be \"page\", \"top\", \"bottom\", or a number of pixels.") | |
_browser_driver.execute_script(scroll_script) | |
time.sleep(1) # Wait for scroll effects | |
return f"Scrolled {direction} by {amount}." | |
def go_back() -> str: | |
"""Navigate the browser back one step in its history.""" | |
logger.info("Navigating back...") | |
_browser_driver.back() | |
time.sleep(1.5) # Wait after navigation | |
return f"Navigated back. Current URL: {_browser_driver.current_url}" | |
def close_popups() -> str: | |
"""Send an ESC keypress to attempt to dismiss modals or pop-ups.""" | |
logger.info("Sending ESC key...") | |
webdriver.ActionChains(_browser_driver).send_keys(Keys.ESCAPE).perform() | |
time.sleep(0.5) | |
return "Sent ESC key press." | |
def answer_question(question: str) -> str: | |
""" | |
Answer any question by following this strict format: | |
1. Include your chain of thought (your reasoning steps). | |
2. End your reply with the exact template: | |
FINAL ANSWER: [YOUR FINAL ANSWER] | |
YOUR FINAL ANSWER must be: | |
- A number, or | |
- As few words as possible, or | |
- A comma-separated list of numbers and/or strings. | |
Formatting rules: | |
* If asked for a number, do not use commas or units (e.g., $, %), unless explicitly requested. | |
* If asked for a string, do not include articles or abbreviations (e.g., city names), and write digits in plain text. | |
* If asked for a comma-separated list, apply the above rules to each element. | |
This tool should be invoked immediately after completing the final planning sub-step. | |
""" | |
logger.info(f"Answering question: {question[:100]}") | |
gemini_api_key = os.getenv("GEMINI_API_KEY") | |
if not gemini_api_key: | |
logger.error("GEMINI_API_KEY not set for answer_question tool.") | |
return "Error: GEMINI_API_KEY not set." | |
model_name = os.getenv("ANSWER_TOOL_LLM_MODEL", "models/gemini-1.5-pro") | |
# Build the assistant prompt enforcing the required format | |
assistant_prompt = ( | |
"You are a general AI assistant. I will ask you a question. " | |
"Report your thoughts, and finish your answer with the following template: " | |
"FINAL ANSWER: [YOUR FINAL ANSWER]. " | |
"YOUR FINAL ANSWER should be a number OR as few words as possible " | |
"OR a comma separated list of numbers and/or strings. " | |
"If you are asked for a number, don't use commas for thousands or any units like $ or % unless specified. " | |
"If you are asked for a string, omit articles and abbreviations, and write digits in plain text. " | |
"If you are asked for a comma separated list, apply these rules to each element.\n\n" | |
f"Question: {question}\n" | |
"Answer:" | |
) | |
try: | |
llm = GoogleGenAI(api_key=gemini_api_key, model=model_name) | |
logger.info(f"Using answer LLM: {model_name}") | |
response = llm.complete(assistant_prompt) | |
logger.info("Answer generated successfully.") | |
return response.text | |
except Exception as e: | |
logger.error(f"LLM call failed during answer generation: {e}", exc_info=True) | |
return f"Error during answer generation: {e}" | |
# --- Agent Initializer Class --- | |
class ResearchAgentInitializer: | |
def __init__(self): | |
logger.info("Initializing ResearchAgent resources...") | |
self.llm = None | |
self.browser_tools = [] | |
self.search_tools = [] | |
self.datasource_tools = [] | |
# Initialize LLM | |
self._initialize_llm() | |
# Initialize Browser (conditionally) | |
if SELENIUM_AVAILABLE: | |
self._initialize_browser() | |
self._create_browser_tools() | |
else: | |
logger.warning("Browser tools are disabled as Selenium/Helium are not available.") | |
# Initialize Search/Datasource Tools | |
self._create_search_tools() | |
self._create_datasource_tools() | |
self.answer_question = FunctionTool.from_defaults( | |
fn=answer_question, | |
name="answer_question", | |
description=( | |
"Use this tool to answer any question, reporting your reasoning steps and ending with 'FINAL ANSWER: ...'. " | |
"Invoke this tool immediately after the final sub-step of planning is complete." | |
), | |
) | |
logger.info("ResearchAgent resources initialized.") | |
def _initialize_llm(self): | |
agent_llm_model = os.getenv("RESEARCH_AGENT_LLM_MODEL", "models/gemini-1.5-pro") | |
gemini_api_key = os.getenv("GEMINI_API_KEY") | |
if not gemini_api_key: | |
logger.error("GEMINI_API_KEY not found for ResearchAgent LLM.") | |
raise ValueError("GEMINI_API_KEY must be set for ResearchAgent") | |
try: | |
self.llm = GoogleGenAI(api_key=gemini_api_key, model=agent_llm_model) | |
logger.info(f"ResearchAgent LLM initialized: {agent_llm_model}") | |
except Exception as e: | |
logger.error(f"Failed to initialize ResearchAgent LLM: {e}", exc_info=True) | |
raise | |
def _initialize_browser(self): | |
global _browser_instance, _browser_driver | |
if _browser_instance is None: | |
logger.info("Initializing browser (Chrome headless)...") | |
try: | |
chrome_options = webdriver.ChromeOptions() | |
# Configurable options from env vars | |
if os.getenv("RESEARCH_AGENT_CHROME_NO_SANDBOX", "true").lower() == "true": | |
chrome_options.add_argument("--no-sandbox") | |
if os.getenv("RESEARCH_AGENT_CHROME_DISABLE_DEV_SHM", "true").lower() == "true": | |
chrome_options.add_argument("--disable-dev-shm-usage") | |
# Add prefs for downloads/popups | |
chrome_options.add_experimental_option("prefs", { | |
"download.prompt_for_download": False, | |
"plugins.always_open_pdf_externally": True, | |
"profile.default_content_settings.popups": 0 | |
}) | |
# Start Chrome using Helium | |
_browser_instance = start_chrome(headless=True, options=chrome_options) | |
_browser_driver = get_driver() # Get the underlying Selenium driver | |
logger.info("Browser initialized successfully.") | |
except Exception as e: | |
logger.error(f"Failed to initialize browser: {e}", exc_info=True) | |
# Set flags to prevent tool usage | |
global SELENIUM_AVAILABLE | |
SELENIUM_AVAILABLE = False | |
_browser_instance = None | |
_browser_driver = None | |
def _create_browser_tools(self): | |
if not SELENIUM_AVAILABLE: | |
self.browser_tools = [] | |
return | |
self.browser_tools = [ | |
FunctionTool.from_defaults(fn=visit, name="visit_url"), # Renamed for clarity | |
FunctionTool.from_defaults(fn=get_text_by_css, name="get_text_by_css"), | |
# FunctionTool.from_defaults(fn=get_page_html, name="get_page_html"), | |
FunctionTool.from_defaults(fn=click_element_by_css, name="click_element_by_css"), | |
FunctionTool.from_defaults(fn=input_text_by_css, name="input_text_by_css"), | |
FunctionTool.from_defaults(fn=scroll_page, name="scroll_page"), | |
FunctionTool.from_defaults(fn=go_back, name="navigate_back"), # Renamed | |
FunctionTool.from_defaults(fn=close_popups, name="close_popups"), | |
] | |
for tool in self.browser_tools: | |
tool.metadata.description = f"(Browser) {tool.metadata.description}" | |
logger.info(f"Created {len(self.browser_tools)} browser interaction tools.") | |
def _create_search_tools(self): | |
self.search_tools = [] | |
# Google Search | |
google_spec = GoogleSearchToolSpec(key=os.getenv("GOOGLE_API_KEY"), engine=os.getenv("GOOGLE_CSE_ID")) | |
if google_spec: | |
google_tool = FunctionTool.from_defaults(fn=google_spec.google_search, name="google_search") | |
google_tool.metadata.description = "(Search) Execute a Google Custom Search query. Returns structured results." | |
self.search_tools.append(google_tool) | |
# Tavily Search | |
tavily_spec = TavilyToolSpec(api_key=os.getenv("TAVILY_API_KEY")) | |
if tavily_spec: | |
# Use search method which is more general | |
tavily_tool = FunctionTool.from_defaults(fn=tavily_spec.search, name="tavily_search") | |
tavily_tool.metadata.description = "(Search) Perform a deep research search using Tavily API. Good for finding documents/articles." | |
self.search_tools.append(tavily_tool) | |
# DuckDuckGo Search | |
ddg_spec = DuckDuckGoSearchToolSpec() | |
if ddg_spec: | |
ddg_tool = FunctionTool.from_defaults(fn=ddg_spec.duckduckgo_full_search, name="duckduckgo_search") | |
ddg_tool.metadata.description = "(Search) Execute a DuckDuckGo search. Returns structured results." | |
self.search_tools.append(ddg_tool) | |
logger.info(f"Created {len(self.search_tools)} search engine tools.") | |
def _create_datasource_tools(self): | |
self.datasource_tools = [] | |
# Wikipedia | |
wiki_spec = WikipediaToolSpec() | |
if wiki_spec: | |
wiki_search_tool = FunctionTool.from_defaults(fn=wiki_spec.search_data, name="wikipedia_search_pages") | |
wiki_search_tool.metadata.description = "(Wikipedia) Search for Wikipedia page titles matching a query." | |
wiki_load_tool = FunctionTool.from_defaults(fn=wiki_spec.load_data, name="wikipedia_load_page") | |
wiki_load_tool.metadata.description = "(Wikipedia) Load the full content of a specific Wikipedia page title." | |
self.datasource_tools.extend([wiki_search_tool, wiki_load_tool]) | |
# Yahoo Finance | |
yf_spec = YahooFinanceToolSpec() | |
if yf_spec: | |
yf_tools_map = { | |
"balance_sheet": "Get the latest balance sheet for a stock ticker.", | |
"income_statement": "Get the latest income statement for a stock ticker.", | |
"cash_flow": "Get the latest cash flow statement for a stock ticker.", | |
"stock_basic_info": "Get basic info (price, market cap, summary) for a stock ticker.", | |
"stock_analyst_recommendations": "Get analyst recommendations for a stock ticker.", | |
"stock_news": "Get recent news headlines for a stock ticker." | |
} | |
for func_name, desc in yf_tools_map.items(): | |
if hasattr(yf_spec, func_name): | |
tool = FunctionTool.from_defaults(fn=getattr(yf_spec, func_name), name=f"yahoo_finance_{func_name}") | |
tool.metadata.description = f"(YahooFinance) {desc}" | |
self.datasource_tools.append(tool) | |
else: | |
logger.warning(f"YahooFinance function {func_name} not found in spec.") | |
# ArXiv | |
arxiv_spec = ArxivToolSpec() | |
if arxiv_spec: | |
arxiv_tool = FunctionTool.from_defaults(fn=arxiv_spec.arxiv_query, name="arxiv_search") | |
arxiv_tool.metadata.description = "(ArXiv) Search ArXiv for academic papers matching a query." | |
self.datasource_tools.append(arxiv_tool) | |
logger.info(f"Created {len(self.datasource_tools)} specific data source tools.") | |
def get_agent(self) -> ReActAgent: | |
"""Creates and returns the configured ReActAgent for research.""" | |
logger.info("Creating ResearchAgent ReActAgent instance...") | |
all_tools = self.browser_tools + self.search_tools + self.datasource_tools | |
all_tools.append(self.answer_question) | |
if not all_tools: | |
logger.warning("No tools available for ResearchAgent. It will likely be unable to function.") | |
# System prompt (consider loading from file) | |
# Updated prompt to include YouTube tool | |
system_prompt = """\ | |
You are ResearchAgent, an autonomous web research assistant. Your goal is to gather information accurately and efficiently using the available tools. | |
Available Tool Categories: | |
- (Browser): Tools for direct web page interaction (visiting URLs, clicking, scrolling, extracting text/HTML, inputting text). | |
- (Search): Tools for querying search engines (Google, DuckDuckGo, Tavily). | |
- (Wikipedia): Tools for searching and loading Wikipedia pages. | |
- (YahooFinance): Tools for retrieving financial data (balance sheets, income statements, stock info, news). | |
- (ArXiv): Tool for searching academic papers on ArXiv. | |
- (Answer): `answer_question` — use this when your research has yielded a definitive result and you need to reply in the strict “FINAL ANSWER” format. | |
**Answer Tool Usage** | |
When you know the final answer and no further data is required, invoke `answer_question` with the user’s query. It will return text ending with: | |
FINAL ANSWER: [YOUR FINAL ANSWER] | |
Formatting rules for **YOUR FINAL ANSWER**: | |
- A single number, or | |
- As few words as possible, or | |
- A comma-separated list of numbers and/or strings. | |
- If numeric: no thousands separators or units (%, $, etc.) unless explicitly requested. | |
- If string: omit articles and abbreviations; write digits in plain text. | |
- If a list: apply the above rules to each element. | |
**Workflow:** | |
1. **Thought**: Analyze the research goal. Break it down if necessary. Choose the *single best tool* for the *next immediate step*. Explain your choice. | |
2. **Action**: Call the chosen tool with the correct arguments. Ensure inputs match the tool's requirements. | |
3. **Observation**: Examine the tool's output. Extract the relevant information. Check for errors. | |
4. **Reflect & Iterate**: Does the observation satisfy the immediate goal? If not, return to step 1. If a tool failed, try an alternative approach. | |
5. **Advanced Validation**: Before delivering any final response, invoke `advanced_validation_agent` with the combined insights from the reasoning and planning phases. If validation fails, pass the feedback back into **planner_agent** to refine the approach and repeat validation. | |
6. **Synthesize**: Once validation is approved, synthesize all gathered information into a coherent answer. | |
7. **Respond**: Invoke `answer_question` to emit the **FINAL ANSWER** according to the strict template rules. | |
**Constraints:** | |
- Use only one tool per Action step. | |
- Think step-by-step. | |
- If using browser tools, start with `visit_url`. | |
- Synthesize results *before* handing off or responding. | |
- Do not skip any workflow step (reason → action → observation → reflect → validate → synthesize → respond). | |
""" | |
agent = ReActAgent( | |
name="research_agent", | |
description=( | |
"Performs web research using browser interaction, search engines (Google, DDG, Tavily), " | |
"specific data sources (Wikipedia, YahooFinance, ArXiv), and YouTube transcript fetching. Follows Thought-Action-Observation loop." | |
), | |
tools=all_tools, | |
llm=self.llm, | |
system_prompt=system_prompt, | |
can_handoff_to=[ | |
"code_agent", | |
"math_agent", | |
"text_analyzer_agent", # Added based on original prompt | |
"advanced_validation_agent", | |
"long_context_management_agent" | |
"planner_agent", | |
"reasoning_agent" | |
], | |
) | |
logger.info("ResearchAgent ReActAgent instance created.") | |
return agent | |
def close_browser(self): | |
"""Closes the browser instance if it was initialized.""" | |
global _browser_instance, _browser_driver | |
if _browser_instance: | |
logger.info("Closing browser instance...") | |
try: | |
kill_browser() # Use Helium's function | |
logger.info("Browser closed successfully.") | |
except Exception as e: | |
logger.error(f"Error closing browser: {e}", exc_info=True) | |
finally: | |
_browser_instance = None | |
_browser_driver = None | |
else: | |
logger.info("No active browser instance to close.") | |
# --- Singleton Initializer Instance --- | |
_research_agent_initializer_instance = None | |
def get_research_initializer(): | |
"""Gets the singleton instance of ResearchAgentInitializer.""" | |
global _research_agent_initializer_instance | |
if _research_agent_initializer_instance is None: | |
logger.info("Instantiating ResearchAgentInitializer for the first time.") | |
_research_agent_initializer_instance = ResearchAgentInitializer() | |
return _research_agent_initializer_instance | |
# --- Public Initialization Function --- | |
def initialize_research_agent() -> ReActAgent: | |
"""Initializes and returns the Research Agent using a singleton initializer.""" | |
logger.info("initialize_research_agent called.") | |
initializer = get_research_initializer() | |
return initializer.get_agent() | |
# --- Cleanup Function (Optional but recommended) --- | |
def cleanup_research_agent_resources(): | |
"""Cleans up resources used by the research agent, like the browser.""" | |
logger.info("Cleaning up research agent resources...") | |
initializer = get_research_initializer() # Ensure it exists | |
initializer.close_browser() | |
# Example usage (for testing if run directly) | |
if __name__ == "__main__": | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
logger.info("Running research_agent.py directly for testing...") | |
# Check required keys | |
required_keys = ["GEMINI_API_KEY"] # Others are optional depending on tools needed | |
missing_keys = [key for key in required_keys if not os.getenv(key)] | |
if missing_keys: | |
print(f"Error: Required environment variable(s) not set: {', '.join(missing_keys)}. Cannot run test.") | |
else: | |
# Warn about optional keys | |
optional_keys = ["GOOGLE_API_KEY", "GOOGLE_CSE_ID", "TAVILY_API_KEY", "WOLFRAM_ALPHA_APP_ID"] | |
missing_optional = [key for key in optional_keys if not os.getenv(key)] | |
if missing_optional: | |
print(f"Warning: Optional environment variable(s) not set: {', '.join(missing_optional)}. Some tools may be unavailable.") | |