EtienneB
Update tools.py
eaad534
import base64
import datetime
import math
import os
import urllib.parse
from pathlib import Path
from typing import Dict, Union
# from dotenv import load_dotenv
import pandas
import pytz
import requests
from bs4 import BeautifulSoup
from langchain_community.document_loaders import (
ArxivLoader, AssemblyAIAudioTranscriptLoader, WikipediaLoader)
from langchain_community.document_loaders.generic import GenericLoader
from langchain_community.document_loaders.parsers import LanguageParser
from langchain_core.messages import HumanMessage
# from langchain_community.tools import DuckDuckGoSearchRun
# from langchain_community.tools import DuckDuckGoSearchRun
from langchain_core.tools import tool
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_openai import ChatOpenAI
from langchain_tavily import TavilySearch
# load_dotenv()
# OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
# os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY
@tool
def download_file(url: str, filename: str = None) -> str:
"""
Download a file from a URL and save it locally for analysis.
Analysis can be done using one or more other tools.
Args:
url: The URL of the file to download
filename: Optional custom filename. If not provided, will extract from URL
Returns:
The local file path where the file was saved
"""
try:
# Clean and validate URL
url = url.strip()
if not url.startswith(('http://', 'https://')):
return f"Error: Invalid URL format: {url}"
# Create downloads directory if it doesn't exist
download_dir = Path("downloads")
download_dir.mkdir(exist_ok=True)
# Determine filename
if not filename:
# Extract filename from URL
parsed_url = urllib.parse.urlparse(url)
filename = os.path.basename(parsed_url.path)
# If no filename in URL, create one based on URL hash
if not filename or '.' not in filename:
url_hash = str(hash(url))[-8:] # Last 8 characters of hash
filename = f"downloaded_file_{url_hash}"
# Ensure filename has proper extension if we can detect content type
filepath = download_dir / filename
# Download the file
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
print(f"Downloading file from: {url}")
response = requests.get(url, headers=headers, timeout=30, stream=True)
response.raise_for_status()
# Try to get better filename from Content-Disposition header
if 'content-disposition' in response.headers:
import re
cd = response.headers['content-disposition']
filename_match = re.findall('filename=(.+)', cd)
if filename_match:
suggested_filename = filename_match[0].strip('"')
if suggested_filename:
filepath = download_dir / suggested_filename
# Write file to disk
with open(filepath, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
file_size = os.path.getsize(filepath)
print(f"Successfully downloaded {filename} ({file_size} bytes)")
return str(filepath)
except requests.exceptions.RequestException as e:
return f"Error downloading file: {str(e)}"
except Exception as e:
return f"Error saving file: {str(e)}"
@tool
def multiply(a: Union[int, float], b: Union[int, float]) -> Union[int, float]:
"""Multiplies two numbers and returns the product.
Args:
a: The first number.
b: The second number.
Returns:
The product of the two input numbers.
"""
try:
result = a * b
return int(result) if isinstance(a, int) and isinstance(b, int) else result
except Exception as e:
return f"Error in multiplication: {str(e)}"
@tool
def add(a: Union[int, float], b: Union[int, float]) -> Union[int, float]:
"""Adds two numbers and returns the sum.
Args:
a: The first number.
b: The second number.
Returns:
The sum of the two input numbers.
"""
try:
result = a + b
return int(result) if isinstance(a, int) and isinstance(b, int) else result
except Exception as e:
return f"Error in addition: {str(e)}"
@tool
def power(a: Union[int, float], b: Union[int, float]) -> float:
"""Raises a number to the power of another.
Args:
a: The base number.
b: The exponent.
Returns:
The result of raising `a` to the power of `b`.
"""
try:
if a == 0 and b < 0:
return "Error: Cannot raise 0 to a negative power"
result = a ** b
return result
except OverflowError:
return "Error: Result too large to compute"
except Exception as e:
return f"Error in power calculation: {str(e)}"
@tool
def subtract(a: Union[int, float], b: Union[int, float]) -> Union[int, float]:
"""Subtracts the second number from the first.
Args:
a: The number from which to subtract.
b: The number to subtract.
Returns:
The result of `a` minus `b`.
"""
try:
result = a - b
return int(result) if isinstance(a, int) and isinstance(b, int) else result
except Exception as e:
return f"Error in subtraction: {str(e)}"
@tool
def divide(a: Union[int, float], b: Union[int, float]) -> float:
"""Divides one number by another.
Args:
a: The numerator.
b: The denominator.
Returns:
The result of `a` divided by `b`.
"""
try:
if b == 0:
return "Error: Division by zero is not allowed"
return a / b
except Exception as e:
return f"Error in division: {str(e)}"
@tool
def modulus(a: int, b: int) -> Union[int, str]:
"""Returns the remainder of the division of two integers.
Args:
a: The dividend.
b: The divisor.
Returns:
The remainder when `a` is divided by `b`.
"""
try:
if b == 0:
return "Error: Modulus by zero is not allowed"
return a % b
except Exception as e:
return f"Error in modulus operation: {str(e)}"
@tool
def square_root(x: Union[int, float]) -> Union[float, str]:
"""Returns the square root of a number.
Args:
x: The input number. Must be non-negative.
Returns:
The square root of `x`.
"""
try:
if x < 0:
return "Error: Square root of negative number is not allowed"
return math.sqrt(x)
except Exception as e:
return f"Error in square root calculation: {str(e)}"
@tool
def floor_divide(a: int, b: int) -> Union[int, str]:
"""Performs integer division (floor division) of two numbers.
Args:
a: The dividend.
b: The divisor.
Returns:
The floor of the quotient.
"""
try:
if b == 0:
return "Error: Division by zero is not allowed"
return a // b
except Exception as e:
return f"Error in floor division: {str(e)}"
@tool
def absolute(x: Union[int, float]) -> Union[int, float]:
"""Returns the absolute value of a number.
Args:
x: The input number.
Returns:
The absolute value of `x`.
"""
try:
result = abs(x)
return int(result) if isinstance(x, int) else result
except Exception as e:
return f"Error in absolute value calculation: {str(e)}"
@tool
def logarithm(x: Union[int, float], base: Union[int, float] = math.e) -> Union[float, str]:
"""Returns the logarithm of a number with a given base.
Args:
x: The number to take the logarithm of. Must be positive.
base: The logarithmic base. Must be positive and not equal to 1.
Returns:
The logarithm of `x` to the given base.
"""
try:
if x <= 0:
return "Error: Logarithm input must be positive"
if base <= 0 or base == 1:
return "Error: Logarithm base must be positive and not equal to 1"
return math.log(x, base)
except Exception as e:
return f"Error in logarithm calculation: {str(e)}"
@tool
def exponential(x: Union[int, float]) -> Union[float, str]:
"""Returns e raised to the power of `x`.
Args:
x: The exponent.
Returns:
The value of e^x.
"""
try:
if x > 700: # Prevent overflow
return "Error: Exponent too large, would cause overflow"
return math.exp(x)
except OverflowError:
return "Error: Result too large to compute"
except Exception as e:
return f"Error in exponential calculation: {str(e)}"
@tool
def roman_calculator_converter(value1: int, value2: int, oper: str) -> str:
"""Performs an operation on 2 numbers and returns the result as a Roman numeral.
Args:
value1: The first value
value2: The second value
oper: Operator for the calculation ("add", "subtract", "multiply", "divide")
Returns:
The result as a Roman numeral string.
"""
try:
# Input validation
if not isinstance(value1, int) or not isinstance(value2, int):
return "Error: Both values must be integers"
if oper not in ["add", "subtract", "multiply", "divide"]:
return "Error: Operator must be 'add', 'subtract', 'multiply', or 'divide'"
# Roman numeral mapping
roman_numerals = [
(1000, "M"), (900, "CM"), (500, "D"), (400, "CD"),
(100, "C"), (90, "XC"), (50, "L"), (40, "XL"),
(10, "X"), (9, "IX"), (5, "V"), (4, "IV"), (1, "I")
]
# Perform calculation
if oper == "add":
result = value1 + value2
elif oper == "subtract":
result = value1 - value2
elif oper == "multiply":
result = value1 * value2
elif oper == "divide":
if value2 == 0:
return "Error: Division by zero is not allowed"
result = int(value1 / value2) # Integer division for Roman numerals
# Handle invalid results for Roman numerals
if result <= 0:
return f"Error: Roman numerals cannot represent zero or negative numbers. Result was: {result}"
if result > 3999: # Roman numerals traditionally don't go beyond this
return f"Error: Result ({result}) is too large for standard Roman numeral representation"
# Convert to Roman numeral
roman_string = ""
for value, numeral in roman_numerals:
count = result // value
if count:
roman_string += numeral * count
result -= value * count
return f"The result of {oper}ing {value1} and {value2} is: {roman_string}"
except Exception as e:
return f"Error in Roman calculator: {str(e)}"
@tool
def get_current_time_in_timezone(timezone: str) -> str:
"""Fetches the current local time in a specified timezone.
Args:
timezone: A string representing a valid timezone (e.g., 'America/New_York', 'Europe/London').
Returns:
The current time in the specified timezone.
"""
try:
if not timezone or not timezone.strip():
return "Error: Timezone cannot be empty"
# Clean the timezone string
timezone = timezone.strip()
# Handle common timezone aliases
timezone_aliases = {
'EST': 'America/New_York',
'PST': 'America/Los_Angeles',
'MST': 'America/Denver',
'CST': 'America/Chicago',
'GMT': 'GMT',
'UTC': 'UTC',
'CET': 'Europe/Berlin',
'JST': 'Asia/Tokyo',
}
if timezone.upper() in timezone_aliases:
timezone = timezone_aliases[timezone.upper()]
# Create timezone object
tz = pytz.timezone(timezone)
# Get current time in that timezone
local_time = datetime.datetime.now(tz)
formatted_time = local_time.strftime("%Y-%m-%d %H:%M:%S %Z")
return f"The current local time in {timezone} is: {formatted_time}"
except pytz.exceptions.UnknownTimeZoneError:
return f"Error: Unknown timezone '{timezone}'. Please use a valid timezone like 'America/New_York' or 'Europe/London'"
except Exception as e:
return f"Error fetching time for timezone '{timezone}': {str(e)}"
@tool
def factorial(n: int) -> Union[int, str]:
"""Calculates the factorial of a non-negative integer.
Args:
n: A non-negative integer.
Returns:
The factorial of n.
"""
try:
if not isinstance(n, int):
return "Error: Input must be an integer"
if n < 0:
return "Error: Factorial is not defined for negative numbers"
if n > 170: # Prevent overflow
return "Error: Number too large for factorial calculation"
result = math.factorial(n)
return result
except Exception as e:
return f"Error calculating factorial: {str(e)}"
@tool
def greatest_common_divisor(a: int, b: int) -> Union[int, str]:
"""Finds the greatest common divisor of two integers.
Args:
a: First integer.
b: Second integer.
Returns:
The greatest common divisor of a and b.
"""
try:
if not isinstance(a, int) or not isinstance(b, int):
return "Error: Both inputs must be integers"
return math.gcd(abs(a), abs(b))
except Exception as e:
return f"Error calculating GCD: {str(e)}"
@tool
def least_common_multiple(a: int, b: int) -> Union[int, str]:
"""Finds the least common multiple of two integers.
Args:
a: First integer.
b: Second integer.
Returns:
The least common multiple of a and b.
"""
try:
if not isinstance(a, int) or not isinstance(b, int):
return "Error: Both inputs must be integers"
if a == 0 or b == 0:
return 0
return abs(a * b) // math.gcd(abs(a), abs(b))
except Exception as e:
return f"Error calculating LCM: {str(e)}"
@tool
def is_prime(n: int) -> Union[bool, str]:
"""Checks if a number is prime.
Args:
n: The number to check.
Returns:
True if n is prime, False otherwise.
"""
try:
if not isinstance(n, int):
return "Error: Input must be an integer"
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
# Check odd divisors up to sqrt(n)
for i in range(3, int(math.sqrt(n)) + 1, 2):
if n % i == 0:
return False
return True
except Exception as e:
return f"Error checking if prime: {str(e)}"
@tool
def percentage_calculator(part: Union[int, float], whole: Union[int, float]) -> Union[float, str]:
"""Calculates what percentage 'part' is of 'whole'.
Args:
part: The part value.
whole: The whole value.
Returns:
The percentage as a float.
"""
try:
if whole == 0:
return "Error: Cannot calculate percentage when whole is zero"
percentage = (part / whole) * 100
return round(percentage, 2)
except Exception as e:
return f"Error calculating percentage: {str(e)}"
@tool
def compound_interest(principal: Union[int, float], rate: Union[int, float],
time: Union[int, float], compound_frequency: int = 1) -> Union[float, str]:
"""Calculates compound interest.
Args:
principal: The initial amount of money.
rate: The annual interest rate (as a percentage, e.g., 5 for 5%).
time: The time period in years.
compound_frequency: How many times per year the interest is compounded (default: 1).
Returns:
The final amount after compound interest.
"""
try:
if principal <= 0:
return "Error: Principal must be positive"
if rate < 0:
return "Error: Interest rate cannot be negative"
if time < 0:
return "Error: Time cannot be negative"
if compound_frequency <= 0:
return "Error: Compound frequency must be positive"
# Convert percentage to decimal
rate_decimal = rate / 100
# Compound interest formula: A = P(1 + r/n)^(nt)
amount = principal * (1 + rate_decimal / compound_frequency) ** (compound_frequency * time)
return round(amount, 2)
except Exception as e:
return f"Error calculating compound interest: {str(e)}"
@tool
def convert_temperature(value: Union[int, float], from_unit: str, to_unit: str) -> Union[float, str]:
"""Converts temperature between Celsius, Fahrenheit, and Kelvin.
Args:
value: The temperature value to convert.
from_unit: The source unit ('C', 'F', or 'K').
to_unit: The target unit ('C', 'F', or 'K').
Returns:
The converted temperature value.
"""
try:
from_unit = from_unit.upper().strip()
to_unit = to_unit.upper().strip()
valid_units = ['C', 'F', 'K', 'CELSIUS', 'FAHRENHEIT', 'KELVIN']
# Normalize unit names
unit_map = {
'CELSIUS': 'C', 'FAHRENHEIT': 'F', 'KELVIN': 'K'
}
from_unit = unit_map.get(from_unit, from_unit)
to_unit = unit_map.get(to_unit, to_unit)
if from_unit not in ['C', 'F', 'K'] or to_unit not in ['C', 'F', 'K']:
return "Error: Units must be 'C' (Celsius), 'F' (Fahrenheit), or 'K' (Kelvin)"
if from_unit == to_unit:
return float(value)
# Convert to Celsius first
if from_unit == 'F':
celsius = (value - 32) * 5/9
elif from_unit == 'K':
celsius = value - 273.15
else: # from_unit == 'C'
celsius = value
# Convert from Celsius to target unit
if to_unit == 'F':
result = celsius * 9/5 + 32
elif to_unit == 'K':
result = celsius + 273.15
else: # to_unit == 'C'
result = celsius
return round(result, 2)
except Exception as e:
return f"Error converting temperature: {str(e)}"
@tool
def wikipedia_search(query: str) -> str:
"""
Search Wikipedia for a query and return maximum 2 results.
Args:
query: The search query.
"""
search_docs = WikipediaLoader(query=query, load_max_docs=2).load()
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>'
for doc in search_docs
])
return {"wiki_results": formatted_search_docs}
@tool
def arxiv_search(query: str) -> str:
"""
Search Arxiv for a query and return maximum 3 result.
Args:
query: The search query.
"""
search_docs = ArxivLoader(query=query, load_max_docs=3).load()
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content[:1000]}\n</Document>'
for doc in search_docs
])
return {"arxiv_results": formatted_search_docs}
@tool
def analyze_excel_file(file_path: str, query: str) -> str:
"""
Analyze an Excel file using pandas and answer a question about it.
Args:
file_path (str): the path to the Excel file.
query (str): Question about the data
"""
try:
file = pandas.read_excel(file_path)
result = (
f"Excel file loaded with {len(file)} rows and {len(file.columns)} columns.\n"
)
result += f"Columns: {', '.join(file.columns)}\n\n"
result += "Summary statistics:\n"
result += str(file.describe())
return result
except Exception as e:
return f"Error analyzing Excel file: {str(e)}"
@tool
def python_code_parser(file_path: str) -> str:
"""
Parse Python code to extract function names and their docstrings.
Args:
file_path: The path to the Python file to parse.
Returns:
Interpreted Python code as a string.
"""
if not os.path.exists(file_path):
return "0"
loader = GenericLoader.from_filesystem(
file_path,
glob="**/*",
suffixes=[".py"],
parser=LanguageParser()
)
search_docs = loader.load()
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>'
for doc in search_docs
])
return {"python_results": formatted_search_docs}
@tool
def audio_transcription(file_path: str) -> str:
"""
Transcribe an audio file to text using AssemblyAI.
Args:
file_path: The path to the audio file.
Returns:
The transcribed text from the audio file.
"""
search_docs = AssemblyAIAudioTranscriptLoader(file_path=file_path).load()
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>'
for doc in search_docs
])
return {"audio_results": formatted_search_docs}
@tool
def analyze_csv_file(file_path: str, query: str) -> str:
"""
Analyze a CSV file using pandas and answer a question about it.
Args:
file_path (str): the path to the CSV file.
query (str): Question about the data
"""
try:
file = pandas.read_csv(file_path)
result = f"CSV file loaded with {len(file)} rows and {len(file.columns)} columns.\n"
result += f"Columns: {', '.join(file.columns)}\n\n"
result += "Summary statistics:\n"
result += str(file.describe())
return result
except Exception as e:
return f"Error analyzing CSV file: {str(e)}"
@tool
def extract_text(img_path: str) -> str:
"""
Extract text from an image file using a multimodal model.
This allowsto properly analyze the contents.
"""
vision_llm = ChatOpenAI(model="gpt-4o")
all_text = ""
# Read image and encode as base64
with open(img_path, "rb") as image_file:
image_bytes = image_file.read()
image_base64 = base64.b64encode(image_bytes).decode("utf-8")
# Prepare the prompt including the base64 image data
message = [
HumanMessage(
content=[
{
"type": "text",
"text": (
"Extract all the text from this image. "
"Return only the extracted text, no explanations."
),
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/png;base64,{image_base64}"
},
},
]
)
]
# Call the vision-capable model
response = vision_llm.invoke(message)
# Append extracted text
all_text += response.content + "\n\n"
return all_text.strip()
@tool
def reverse_sentence(text: str) -> str:
"""
Reverses the input text. In case a question is written in reversed text, it can be corrected with this tool.
Args:
text (str): The input string to be reversed.
Returns:
str: The reversed string.
"""
return text[::-1]
@tool
def web_search(query: str) -> str:
"""
Searches the web and returns a list of the most relevant URLs.
Use this FIRST for complex queries, metadata questions, or to find the right sources.
Then follow up with web_content_extract on the most promising URL.
"""
try:
tavily_search = TavilySearch(
max_results=5,
topic="general",
search_depth="advanced",
include_raw_content=False, # Just URLs and snippets
)
results = tavily_search.invoke(query)
# Format results to show URLs and brief descriptions
web_search_results = "Search Results:\n"
for i, result in enumerate(results["results"], 1):
web_search_results += f"{i}. {result['title']}: {result['url']}\n {result['content'][:150]}...\n\n"
return web_search_results
except Exception as e:
return f"web_search tool error: {str(e)}"
@tool
def web_content_extract(url: str) -> str:
"""
Extracts and analyzes specific content from a URL using BeautifulSoup.
Particularly effective for Wikipedia metadata pages, discussion pages,
and structured web content.
Can be used after web_search to get detailed information.
"""
try:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status() # Raise exception for 4XX/5XX responses
soup = BeautifulSoup(response.text, 'html.parser')
for element in soup.select('script, style, footer, nav, header'):
if element:
element.decompose()
text = soup.body.get_text(separator='\n', strip=True) if soup.body else soup.get_text(separator='\n', strip=True)
# Limit content length for response
return f"Content extracted from {url}:\n\n{text[:10000]}..." if len(text) > 10000 else text
except Exception as e:
return f"web_content_extract tool error: {str(e)}"