"""Tool definitions and utility functions for the agent""" from typing import List, Dict, Any, Optional import os import json import requests from dotenv import load_dotenv from bs4 import BeautifulSoup try: from duckduckgo_search import DDGS except ImportError: print("Warning: duckduckgo_search package not found. DuckDuckGo search will not work.") DDGS = None from llama_index.core.tools import BaseTool, FunctionTool from llama_index.readers.wikipedia import WikipediaReader from llama_index.readers.web import SimpleWebPageReader from llama_index.core.schema import Document # Import direct wikipedia package as fallback import wikipedia # Load environment variables load_dotenv() # --- Text Processing Tools --- def text_reverser(text: str) -> str: """ Reverse the given text. Useful for answering questions that are written backwards. Args: text: The text to reverse Returns: The reversed text """ return text[::-1] # --- Math Tools --- def simple_calculator(operation: str, a: float, b: float) -> float: """ Perform a simple calculation. Args: operation: One of 'add', 'subtract', 'multiply', 'divide' a: First number b: Second number Returns: The result of the calculation """ if operation == "add": return a + b elif operation == "subtract": return a - b elif operation == "multiply": return a * b elif operation == "divide": if b == 0: raise ValueError("Cannot divide by zero") return a / b else: raise ValueError(f"Unknown operation: {operation}") # --- File Processing Tools --- def open_file_as_text(file_name: str, filetype: Optional[str] = "txt") -> str: """ Opens a file and returns its content as readable text. Supports 'txt', 'json', 'csv', and 'xlsx'. Args: file_name (str): The path or name of the file. filetype (Optional[str]): Type of file ('txt', 'json', 'csv', 'xlsx'). Defaults to 'txt'. Returns: str: The content of the file as text """ try: if filetype == "txt": with open(file_name, "r", encoding="utf-8") as f: return f.read() elif filetype == "json": with open(file_name, "r", encoding="utf-8") as f: data = json.load(f) return json.dumps(data, indent=2) elif filetype == "csv": with open(file_name, "r", encoding="utf-8") as f: reader = csv.reader(f) rows = list(reader) return "\n".join([", ".join(row) for row in rows]) elif filetype == "xlsx": try: import openpyxl wb = openpyxl.load_workbook(file_name, data_only=True) sheet = wb.active content = [] for row in sheet.iter_rows(values_only=True): content.append(", ".join(str(cell) if cell is not None else "" for cell in row)) return "\n".join(content) except ImportError: return "Error: openpyxl package not installed. Cannot read Excel files." else: return f"Unsupported filetype '{filetype}'. Supported types are 'txt', 'json', 'csv', and 'xlsx'." except FileNotFoundError: return f"File '{file_name}' not found." except Exception as e: return f"Error opening file '{file_name}': {str(e)}" # --- Information Retrieval Tools --- def duckduckgo_search(query: str, num_results: int = 3) -> str: """ Searches the web using DuckDuckGo and returns top search snippets. Args: query (str): The search query string. num_results (int): Number of results to return (default: 3) Returns: str: A list of top search results with title, snippet, and URL. """ if DDGS is None: return "Error: duckduckgo_search package not installed. Cannot perform DuckDuckGo search." try: with DDGS() as ddgs: results = ddgs.text(query, max_results=num_results) if not results: return "No results found." return "\n\n".join([f"Title: {r['title']}\nSnippet: {r['body']}\nURL: {r['href']}" for r in results]) except Exception as e: return f"Error during DuckDuckGo search: {str(e)}" def google_search(query: str, num_results: int = 5) -> str: """ Search Google using the Serper API. Args: query: The search query num_results: Number of results to return (default: 5) Returns: A formatted string with the search results """ api_key = os.getenv("SERPER_API_KEY") if not api_key: return "SERPER_API_KEY not found in environment variables. Cannot perform Google search." headers = { "X-API-KEY": api_key, "Content-Type": "application/json" } payload = { "q": query, "num": num_results } try: response = requests.post( "https://google.serper.dev/search", headers=headers, json=payload ) response.raise_for_status() search_results = response.json() # Format the results formatted_results = [] # Add answer box if available if "answerBox" in search_results: answer_box = search_results["answerBox"] answer_title = answer_box.get("title", "") answer_snippet = answer_box.get("snippet", "") answer_answer = answer_box.get("answer", "") answer_text = "Featured Answer:\n" if answer_title: answer_text += f"Title: {answer_title}\n" if answer_answer: answer_text += f"Answer: {answer_answer}\n" if answer_snippet: answer_text += f"Snippet: {answer_snippet}\n" formatted_results.insert(0, answer_text) # Put answer box at the top # Add organic results if "organic" in search_results: for i, result in enumerate(search_results["organic"][:num_results], 1): title = result.get("title", "No Title") snippet = result.get("snippet", "No snippet available") link = result.get("link", "No link available") formatted_results.append(f"Result {i}:\nTitle: {title}\nSnippet: {snippet}\nURL: {link}\n") # Return formatted results if formatted_results: return "\n".join(formatted_results) else: return f"No results found for query: '{query}'" except Exception as e: return f"Error performing Google search: {str(e)}" def parse_wikipedia_table(table) -> str: """ Parses a Wikipedia table into a clean, readable text format. Args: table (Tag): BeautifulSoup Tag for the table. Returns: str: Formatted table as readable text. """ rows = [] headers = [] # Try to get headers thead = table.find('thead') if thead: for th in thead.find_all('th'): header_text = th.get_text(separator=" ", strip=True) headers.append(header_text) if headers: rows.append(" | ".join(headers)) # Parse table body rows tbody = table.find('tbody') if not tbody: tbody = table # fallback: some tables have no tbody explicitly for tr in tbody.find_all('tr'): cells = tr.find_all(['th', 'td']) cell_texts = [] for cell in cells: # Clean references like [7], [note 1], etc. for sup in cell.find_all('sup', class_='reference'): sup.decompose() text = cell.get_text(separator=" ", strip=True) cell_texts.append(text) if cell_texts: row_text = " | ".join(cell_texts) rows.append(row_text) return "\n".join(rows) def read_wikipedia_page(url: str) -> str: """ Fetches a Wikipedia article and extracts clean sectioned text. Args: url (str): The Wikipedia page URL. Returns: str: Sectioned and readable content from the Wikipedia page. """ headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36" } try: resp = requests.get(url, headers=headers, timeout=10) resp.raise_for_status() soup = BeautifulSoup(resp.text, "html.parser") content_div = soup.find('div', id='mw-content-text') if not content_div: return "Content not found." parts = [] for elem in content_div.find_all(['h2', 'h3', 'p', 'ul', 'ol', 'table']): if elem.name in ['h2', 'h3']: parts.append("\n\n" + elem.get_text(strip=True) + "\n") elif elem.name in ['p', 'ul', 'ol']: parts.append(elem.get_text(strip=True)) elif elem.name == 'table': parts.append(parse_wikipedia_table(elem)) full_text = "\n".join(parts) return full_text except Exception as e: return f"Error reading Wikipedia page: {str(e)}" def smart_paginate_around_query(full_text: str, query: str, depth: int = 0) -> list: """ Splits text into windows around each occurrence of the query. Args: full_text (str): The full text to search within. query (str): The search query. depth (int): Recursion depth counter, used internally. Returns: list: List of relevant text windows (pages). """ # Prevent excessive recursion if depth > 2: # Limit recursion depth return [] before_chars = 1000 after_chars = 3000 full_text_lower = full_text.lower() query_lower = query.lower() query_len = len(query_lower) pages = [] search_pos = 0 text_len = len(full_text) while True: match_pos = full_text_lower.find(query_lower, search_pos) if match_pos == -1: break # no more matches # Define window around match start = max(0, match_pos - before_chars) end = min(text_len, match_pos + query_len + after_chars) page = full_text[start:end] pages.append(page) # Move search pointer to AFTER current window search_pos = end if not pages and len(query_lower) > 3 and depth < 2: # If no exact matches for longer queries, try with partial matches words = query_lower.split() for word in words: if len(word) > 3: # only use meaningful words partial_pages = smart_paginate_around_query(full_text, word, depth + 1) pages.extend(partial_pages) if len(pages) >= 2: # limit to reasonable number break return pages def wikipedia_search(query: str, num_results: int = 2) -> str: """ Search Wikipedia for information. Args: query: The search query num_results: Number of results to return (default: 2) Returns: A formatted string with the search results """ # Try using direct wikipedia package - it's more reliable try: # Search for pages search_results = wikipedia.search(query, results=num_results) if not search_results: return f"No Wikipedia results found for '{query}'." results = [] for i, page_title in enumerate(search_results, 1): try: # Get page content page = wikipedia.page(title=page_title, auto_suggest=False) title = page.title page_url = page.url # Get a summary instead of full content content = page.summary[:1000] + "..." if len(page.summary) > 1000 else page.summary results.append(f"Result {i}: {title}\nURL: {page_url}\n{content}\n") except wikipedia.exceptions.DisambiguationError as e: # Handle disambiguation pages options = e.options[:5] # Get first 5 options results.append(f"Result {i}: Multiple matches for '{page_title}'. Options include: {', '.join(options)}") except Exception as e: # Handle other errors with specific pages results.append(f"Result {i}: Error retrieving '{page_title}': {str(e)}") return "\n".join(results) except Exception as e: print(f"Direct wikipedia package failed: {e}, trying LlamaIndex WikipediaReader...") # Fallback to LlamaIndex WikipediaReader try: # Note: The WikipediaReader API has changed; we now need to get pages first reader = WikipediaReader() # First get the search results pages = wikipedia.search(query, results=num_results) # Then load the data using the pages parameter docs = reader.load_data(pages=pages) if not docs: return f"No Wikipedia results found for '{query}'." results = [] for i, doc in enumerate(docs, 1): title = doc.metadata.get("title", "Unknown Title") content = doc.text[:1000] + "..." if len(doc.text) > 1000 else doc.text results.append(f"Result {i}: {title}\n{content}\n") return "\n".join(results) except Exception as e: return f"Error searching Wikipedia: {str(e)}" def web_search(url: str) -> str: """ Fetch and extract content from a specific web page. Args: url: The URL of the web page to search Returns: The extracted content from the web page """ try: headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36" } # First try using BeautifulSoup for better extraction try: response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() soup = BeautifulSoup(response.text, "html.parser") # Remove script and style elements for script in soup(["script", "style"]): script.decompose() # Get text text = soup.get_text(separator="\n", strip=True) # Break into lines and remove leading and trailing space on each lines = (line.strip() for line in text.splitlines()) # Break multi-headlines into a line each chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) # Remove blank lines text = '\n'.join(chunk for chunk in chunks if chunk) return text[:15000] + "..." if len(text) > 15000 else text except Exception as e: print(f"BeautifulSoup extraction failed: {e}, trying LlamaIndex SimpleWebPageReader...") # Fallback to LlamaIndex SimpleWebPageReader reader = SimpleWebPageReader() docs = reader.load_data(urls=[url]) if not docs: return f"No content found for URL: {url}" # Just return the content of the first document return docs[0].text except Exception as e: return f"Error retrieving web page: {str(e)}" # --- Tool Selection and Routing --- def get_tools() -> List[BaseTool]: """Create and return a list of tools for the agent.""" text_reverser_tool = FunctionTool.from_defaults( fn=text_reverser, name="text_reverser", description="Reverses the given text. Useful for processing reversed questions or text.", ) calculator_tool = FunctionTool.from_defaults( fn=simple_calculator, name="calculator", description="Performs simple calculations: add, subtract, multiply, divide.", ) file_reader_tool = FunctionTool.from_defaults( fn=open_file_as_text, name="open_file_as_text", description="Opens a file and returns its content as text. Supports txt, json, csv, and xlsx files.", ) wikipedia_tool = FunctionTool.from_defaults( fn=wikipedia_search, name="wikipedia_search", description="Searches Wikipedia for information on a topic and returns summaries of matching pages.", ) wikipedia_page_tool = FunctionTool.from_defaults( fn=read_wikipedia_page, name="read_wikipedia_page", description="Reads a specific Wikipedia page URL and returns its full content with sections and tables.", ) paginate_tool = FunctionTool.from_defaults( fn=smart_paginate_around_query, name="smart_paginate_around_query", description="Given a large text and a query, returns sections of text around occurrences of the query.", ) web_tool = FunctionTool.from_defaults( fn=web_search, name="web_search", description="Fetches and extracts content from a specific web page. Requires a full URL.", ) google_tool = FunctionTool.from_defaults( fn=google_search, name="google_search", description="Searches Google for information. Use this for recent events, current information, or when Wikipedia doesn't have enough information.", ) duckduckgo_tool = FunctionTool.from_defaults( fn=duckduckgo_search, name="duckduckgo_search", description="Searches the web using DuckDuckGo and returns top search results. Good for privacy-focused searches.", ) tools = [ text_reverser_tool, calculator_tool, file_reader_tool, wikipedia_tool, wikipedia_page_tool, paginate_tool, web_tool, google_tool, duckduckgo_tool ] return tools if __name__ == "__main__": print("This module defines tools for the agent. Run app.py or standalone_debug.py to test the agent.")