Spaces:
Sleeping
Sleeping
"""Tool definitions and utility functions for the agent""" | |
from typing import List, Dict, Any, Optional | |
import os | |
import json | |
import requests | |
from dotenv import load_dotenv | |
from bs4 import BeautifulSoup | |
try: | |
from duckduckgo_search import DDGS | |
except ImportError: | |
print("Warning: duckduckgo_search package not found. DuckDuckGo search will not work.") | |
DDGS = None | |
from llama_index.core.tools import BaseTool, FunctionTool | |
from llama_index.readers.wikipedia import WikipediaReader | |
from llama_index.readers.web import SimpleWebPageReader | |
from llama_index.core.schema import Document | |
# Import direct wikipedia package as fallback | |
import wikipedia | |
# Load environment variables | |
load_dotenv() | |
# --- Text Processing Tools --- | |
def text_reverser(text: str) -> str: | |
""" | |
Reverse the given text. Useful for answering questions that are written backwards. | |
Args: | |
text: The text to reverse | |
Returns: | |
The reversed text | |
""" | |
return text[::-1] | |
# --- Math Tools --- | |
def simple_calculator(operation: str, a: float, b: float) -> float: | |
""" | |
Perform a simple calculation. | |
Args: | |
operation: One of 'add', 'subtract', 'multiply', 'divide' | |
a: First number | |
b: Second number | |
Returns: | |
The result of the calculation | |
""" | |
if operation == "add": | |
return a + b | |
elif operation == "subtract": | |
return a - b | |
elif operation == "multiply": | |
return a * b | |
elif operation == "divide": | |
if b == 0: | |
raise ValueError("Cannot divide by zero") | |
return a / b | |
else: | |
raise ValueError(f"Unknown operation: {operation}") | |
# --- File Processing Tools --- | |
def open_file_as_text(file_name: str, filetype: Optional[str] = "txt") -> str: | |
""" | |
Opens a file and returns its content as readable text. | |
Supports 'txt', 'json', 'csv', and 'xlsx'. | |
Args: | |
file_name (str): The path or name of the file. | |
filetype (Optional[str]): Type of file ('txt', 'json', 'csv', 'xlsx'). Defaults to 'txt'. | |
Returns: | |
str: The content of the file as text | |
""" | |
try: | |
if filetype == "txt": | |
with open(file_name, "r", encoding="utf-8") as f: | |
return f.read() | |
elif filetype == "json": | |
with open(file_name, "r", encoding="utf-8") as f: | |
data = json.load(f) | |
return json.dumps(data, indent=2) | |
elif filetype == "csv": | |
with open(file_name, "r", encoding="utf-8") as f: | |
reader = csv.reader(f) | |
rows = list(reader) | |
return "\n".join([", ".join(row) for row in rows]) | |
elif filetype == "xlsx": | |
try: | |
import openpyxl | |
wb = openpyxl.load_workbook(file_name, data_only=True) | |
sheet = wb.active | |
content = [] | |
for row in sheet.iter_rows(values_only=True): | |
content.append(", ".join(str(cell) if cell is not None else "" for cell in row)) | |
return "\n".join(content) | |
except ImportError: | |
return "Error: openpyxl package not installed. Cannot read Excel files." | |
else: | |
return f"Unsupported filetype '{filetype}'. Supported types are 'txt', 'json', 'csv', and 'xlsx'." | |
except FileNotFoundError: | |
return f"File '{file_name}' not found." | |
except Exception as e: | |
return f"Error opening file '{file_name}': {str(e)}" | |
# --- Information Retrieval Tools --- | |
def duckduckgo_search(query: str, num_results: int = 3) -> str: | |
""" | |
Searches the web using DuckDuckGo and returns top search snippets. | |
Args: | |
query (str): The search query string. | |
num_results (int): Number of results to return (default: 3) | |
Returns: | |
str: A list of top search results with title, snippet, and URL. | |
""" | |
if DDGS is None: | |
return "Error: duckduckgo_search package not installed. Cannot perform DuckDuckGo search." | |
try: | |
with DDGS() as ddgs: | |
results = ddgs.text(query, max_results=num_results) | |
if not results: | |
return "No results found." | |
return "\n\n".join([f"Title: {r['title']}\nSnippet: {r['body']}\nURL: {r['href']}" for r in results]) | |
except Exception as e: | |
return f"Error during DuckDuckGo search: {str(e)}" | |
def google_search(query: str, num_results: int = 5) -> str: | |
""" | |
Search Google using the Serper API. | |
Args: | |
query: The search query | |
num_results: Number of results to return (default: 5) | |
Returns: | |
A formatted string with the search results | |
""" | |
api_key = os.getenv("SERPER_API_KEY") | |
if not api_key: | |
return "SERPER_API_KEY not found in environment variables. Cannot perform Google search." | |
headers = { | |
"X-API-KEY": api_key, | |
"Content-Type": "application/json" | |
} | |
payload = { | |
"q": query, | |
"num": num_results | |
} | |
try: | |
response = requests.post( | |
"https://google.serper.dev/search", | |
headers=headers, | |
json=payload | |
) | |
response.raise_for_status() | |
search_results = response.json() | |
# Format the results | |
formatted_results = [] | |
# Add answer box if available | |
if "answerBox" in search_results: | |
answer_box = search_results["answerBox"] | |
answer_title = answer_box.get("title", "") | |
answer_snippet = answer_box.get("snippet", "") | |
answer_answer = answer_box.get("answer", "") | |
answer_text = "Featured Answer:\n" | |
if answer_title: | |
answer_text += f"Title: {answer_title}\n" | |
if answer_answer: | |
answer_text += f"Answer: {answer_answer}\n" | |
if answer_snippet: | |
answer_text += f"Snippet: {answer_snippet}\n" | |
formatted_results.insert(0, answer_text) # Put answer box at the top | |
# Add organic results | |
if "organic" in search_results: | |
for i, result in enumerate(search_results["organic"][:num_results], 1): | |
title = result.get("title", "No Title") | |
snippet = result.get("snippet", "No snippet available") | |
link = result.get("link", "No link available") | |
formatted_results.append(f"Result {i}:\nTitle: {title}\nSnippet: {snippet}\nURL: {link}\n") | |
# Return formatted results | |
if formatted_results: | |
return "\n".join(formatted_results) | |
else: | |
return f"No results found for query: '{query}'" | |
except Exception as e: | |
return f"Error performing Google search: {str(e)}" | |
def parse_wikipedia_table(table) -> str: | |
""" | |
Parses a Wikipedia table into a clean, readable text format. | |
Args: | |
table (Tag): BeautifulSoup Tag for the table. | |
Returns: | |
str: Formatted table as readable text. | |
""" | |
rows = [] | |
headers = [] | |
# Try to get headers | |
thead = table.find('thead') | |
if thead: | |
for th in thead.find_all('th'): | |
header_text = th.get_text(separator=" ", strip=True) | |
headers.append(header_text) | |
if headers: | |
rows.append(" | ".join(headers)) | |
# Parse table body rows | |
tbody = table.find('tbody') | |
if not tbody: | |
tbody = table # fallback: some tables have no tbody explicitly | |
for tr in tbody.find_all('tr'): | |
cells = tr.find_all(['th', 'td']) | |
cell_texts = [] | |
for cell in cells: | |
# Clean references like [7], [note 1], etc. | |
for sup in cell.find_all('sup', class_='reference'): | |
sup.decompose() | |
text = cell.get_text(separator=" ", strip=True) | |
cell_texts.append(text) | |
if cell_texts: | |
row_text = " | ".join(cell_texts) | |
rows.append(row_text) | |
return "\n".join(rows) | |
def read_wikipedia_page(url: str) -> str: | |
""" | |
Fetches a Wikipedia article and extracts clean sectioned text. | |
Args: | |
url (str): The Wikipedia page URL. | |
Returns: | |
str: Sectioned and readable content from the Wikipedia page. | |
""" | |
headers = { | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36" | |
} | |
try: | |
resp = requests.get(url, headers=headers, timeout=10) | |
resp.raise_for_status() | |
soup = BeautifulSoup(resp.text, "html.parser") | |
content_div = soup.find('div', id='mw-content-text') | |
if not content_div: | |
return "Content not found." | |
parts = [] | |
for elem in content_div.find_all(['h2', 'h3', 'p', 'ul', 'ol', 'table']): | |
if elem.name in ['h2', 'h3']: | |
parts.append("\n\n" + elem.get_text(strip=True) + "\n") | |
elif elem.name in ['p', 'ul', 'ol']: | |
parts.append(elem.get_text(strip=True)) | |
elif elem.name == 'table': | |
parts.append(parse_wikipedia_table(elem)) | |
full_text = "\n".join(parts) | |
return full_text | |
except Exception as e: | |
return f"Error reading Wikipedia page: {str(e)}" | |
def smart_paginate_around_query(full_text: str, query: str, depth: int = 0) -> list: | |
""" | |
Splits text into windows around each occurrence of the query. | |
Args: | |
full_text (str): The full text to search within. | |
query (str): The search query. | |
depth (int): Recursion depth counter, used internally. | |
Returns: | |
list: List of relevant text windows (pages). | |
""" | |
# Prevent excessive recursion | |
if depth > 2: # Limit recursion depth | |
return [] | |
before_chars = 1000 | |
after_chars = 3000 | |
full_text_lower = full_text.lower() | |
query_lower = query.lower() | |
query_len = len(query_lower) | |
pages = [] | |
search_pos = 0 | |
text_len = len(full_text) | |
while True: | |
match_pos = full_text_lower.find(query_lower, search_pos) | |
if match_pos == -1: | |
break # no more matches | |
# Define window around match | |
start = max(0, match_pos - before_chars) | |
end = min(text_len, match_pos + query_len + after_chars) | |
page = full_text[start:end] | |
pages.append(page) | |
# Move search pointer to AFTER current window | |
search_pos = end | |
if not pages and len(query_lower) > 3 and depth < 2: | |
# If no exact matches for longer queries, try with partial matches | |
words = query_lower.split() | |
for word in words: | |
if len(word) > 3: # only use meaningful words | |
partial_pages = smart_paginate_around_query(full_text, word, depth + 1) | |
pages.extend(partial_pages) | |
if len(pages) >= 2: # limit to reasonable number | |
break | |
return pages | |
def wikipedia_search(query: str, num_results: int = 2) -> str: | |
""" | |
Search Wikipedia for information. | |
Args: | |
query: The search query | |
num_results: Number of results to return (default: 2) | |
Returns: | |
A formatted string with the search results | |
""" | |
# Try using direct wikipedia package - it's more reliable | |
try: | |
# Search for pages | |
search_results = wikipedia.search(query, results=num_results) | |
if not search_results: | |
return f"No Wikipedia results found for '{query}'." | |
results = [] | |
for i, page_title in enumerate(search_results, 1): | |
try: | |
# Get page content | |
page = wikipedia.page(title=page_title, auto_suggest=False) | |
title = page.title | |
page_url = page.url | |
# Get a summary instead of full content | |
content = page.summary[:1000] + "..." if len(page.summary) > 1000 else page.summary | |
results.append(f"Result {i}: {title}\nURL: {page_url}\n{content}\n") | |
except wikipedia.exceptions.DisambiguationError as e: | |
# Handle disambiguation pages | |
options = e.options[:5] # Get first 5 options | |
results.append(f"Result {i}: Multiple matches for '{page_title}'. Options include: {', '.join(options)}") | |
except Exception as e: | |
# Handle other errors with specific pages | |
results.append(f"Result {i}: Error retrieving '{page_title}': {str(e)}") | |
return "\n".join(results) | |
except Exception as e: | |
print(f"Direct wikipedia package failed: {e}, trying LlamaIndex WikipediaReader...") | |
# Fallback to LlamaIndex WikipediaReader | |
try: | |
# Note: The WikipediaReader API has changed; we now need to get pages first | |
reader = WikipediaReader() | |
# First get the search results | |
pages = wikipedia.search(query, results=num_results) | |
# Then load the data using the pages parameter | |
docs = reader.load_data(pages=pages) | |
if not docs: | |
return f"No Wikipedia results found for '{query}'." | |
results = [] | |
for i, doc in enumerate(docs, 1): | |
title = doc.metadata.get("title", "Unknown Title") | |
content = doc.text[:1000] + "..." if len(doc.text) > 1000 else doc.text | |
results.append(f"Result {i}: {title}\n{content}\n") | |
return "\n".join(results) | |
except Exception as e: | |
return f"Error searching Wikipedia: {str(e)}" | |
def web_search(url: str) -> str: | |
""" | |
Fetch and extract content from a specific web page. | |
Args: | |
url: The URL of the web page to search | |
Returns: | |
The extracted content from the web page | |
""" | |
try: | |
headers = { | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36" | |
} | |
# First try using BeautifulSoup for better extraction | |
try: | |
response = requests.get(url, headers=headers, timeout=10) | |
response.raise_for_status() | |
soup = BeautifulSoup(response.text, "html.parser") | |
# Remove script and style elements | |
for script in soup(["script", "style"]): | |
script.decompose() | |
# Get text | |
text = soup.get_text(separator="\n", strip=True) | |
# Break into lines and remove leading and trailing space on each | |
lines = (line.strip() for line in text.splitlines()) | |
# Break multi-headlines into a line each | |
chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
# Remove blank lines | |
text = '\n'.join(chunk for chunk in chunks if chunk) | |
return text[:15000] + "..." if len(text) > 15000 else text | |
except Exception as e: | |
print(f"BeautifulSoup extraction failed: {e}, trying LlamaIndex SimpleWebPageReader...") | |
# Fallback to LlamaIndex SimpleWebPageReader | |
reader = SimpleWebPageReader() | |
docs = reader.load_data(urls=[url]) | |
if not docs: | |
return f"No content found for URL: {url}" | |
# Just return the content of the first document | |
return docs[0].text | |
except Exception as e: | |
return f"Error retrieving web page: {str(e)}" | |
# --- Tool Selection and Routing --- | |
def get_tools() -> List[BaseTool]: | |
"""Create and return a list of tools for the agent.""" | |
text_reverser_tool = FunctionTool.from_defaults( | |
fn=text_reverser, | |
name="text_reverser", | |
description="Reverses the given text. Useful for processing reversed questions or text.", | |
) | |
calculator_tool = FunctionTool.from_defaults( | |
fn=simple_calculator, | |
name="calculator", | |
description="Performs simple calculations: add, subtract, multiply, divide.", | |
) | |
file_reader_tool = FunctionTool.from_defaults( | |
fn=open_file_as_text, | |
name="open_file_as_text", | |
description="Opens a file and returns its content as text. Supports txt, json, csv, and xlsx files.", | |
) | |
wikipedia_tool = FunctionTool.from_defaults( | |
fn=wikipedia_search, | |
name="wikipedia_search", | |
description="Searches Wikipedia for information on a topic and returns summaries of matching pages.", | |
) | |
wikipedia_page_tool = FunctionTool.from_defaults( | |
fn=read_wikipedia_page, | |
name="read_wikipedia_page", | |
description="Reads a specific Wikipedia page URL and returns its full content with sections and tables.", | |
) | |
paginate_tool = FunctionTool.from_defaults( | |
fn=smart_paginate_around_query, | |
name="smart_paginate_around_query", | |
description="Given a large text and a query, returns sections of text around occurrences of the query.", | |
) | |
web_tool = FunctionTool.from_defaults( | |
fn=web_search, | |
name="web_search", | |
description="Fetches and extracts content from a specific web page. Requires a full URL.", | |
) | |
google_tool = FunctionTool.from_defaults( | |
fn=google_search, | |
name="google_search", | |
description="Searches Google for information. Use this for recent events, current information, or when Wikipedia doesn't have enough information.", | |
) | |
duckduckgo_tool = FunctionTool.from_defaults( | |
fn=duckduckgo_search, | |
name="duckduckgo_search", | |
description="Searches the web using DuckDuckGo and returns top search results. Good for privacy-focused searches.", | |
) | |
tools = [ | |
text_reverser_tool, | |
calculator_tool, | |
file_reader_tool, | |
wikipedia_tool, | |
wikipedia_page_tool, | |
paginate_tool, | |
web_tool, | |
google_tool, | |
duckduckgo_tool | |
] | |
return tools | |
if __name__ == "__main__": | |
print("This module defines tools for the agent. Run app.py or standalone_debug.py to test the agent.") |