import base64 import datetime import math import os import urllib.parse from pathlib import Path from typing import Dict, Union # from dotenv import load_dotenv import pandas import pytz import requests from bs4 import BeautifulSoup from langchain_community.document_loaders import ( ArxivLoader, AssemblyAIAudioTranscriptLoader, WikipediaLoader) from langchain_community.document_loaders.generic import GenericLoader from langchain_community.document_loaders.parsers import LanguageParser from langchain_core.messages import HumanMessage # from langchain_community.tools import DuckDuckGoSearchRun # from langchain_community.tools import DuckDuckGoSearchRun from langchain_core.tools import tool from langchain_google_genai import ChatGoogleGenerativeAI from langchain_openai import ChatOpenAI from langchain_tavily import TavilySearch # load_dotenv() # OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") # os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY @tool def download_file(url: str, filename: str = None) -> str: """ Download a file from a URL and save it locally for analysis. Analysis can be done using one or more other tools. Args: url: The URL of the file to download filename: Optional custom filename. If not provided, will extract from URL Returns: The local file path where the file was saved """ try: # Clean and validate URL url = url.strip() if not url.startswith(('http://', 'https://')): return f"Error: Invalid URL format: {url}" # Create downloads directory if it doesn't exist download_dir = Path("downloads") download_dir.mkdir(exist_ok=True) # Determine filename if not filename: # Extract filename from URL parsed_url = urllib.parse.urlparse(url) filename = os.path.basename(parsed_url.path) # If no filename in URL, create one based on URL hash if not filename or '.' not in filename: url_hash = str(hash(url))[-8:] # Last 8 characters of hash filename = f"downloaded_file_{url_hash}" # Ensure filename has proper extension if we can detect content type filepath = download_dir / filename # Download the file headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } print(f"Downloading file from: {url}") response = requests.get(url, headers=headers, timeout=30, stream=True) response.raise_for_status() # Try to get better filename from Content-Disposition header if 'content-disposition' in response.headers: import re cd = response.headers['content-disposition'] filename_match = re.findall('filename=(.+)', cd) if filename_match: suggested_filename = filename_match[0].strip('"') if suggested_filename: filepath = download_dir / suggested_filename # Write file to disk with open(filepath, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) file_size = os.path.getsize(filepath) print(f"Successfully downloaded {filename} ({file_size} bytes)") return str(filepath) except requests.exceptions.RequestException as e: return f"Error downloading file: {str(e)}" except Exception as e: return f"Error saving file: {str(e)}" @tool def multiply(a: Union[int, float], b: Union[int, float]) -> Union[int, float]: """Multiplies two numbers and returns the product. Args: a: The first number. b: The second number. Returns: The product of the two input numbers. """ try: result = a * b return int(result) if isinstance(a, int) and isinstance(b, int) else result except Exception as e: return f"Error in multiplication: {str(e)}" @tool def add(a: Union[int, float], b: Union[int, float]) -> Union[int, float]: """Adds two numbers and returns the sum. Args: a: The first number. b: The second number. Returns: The sum of the two input numbers. """ try: result = a + b return int(result) if isinstance(a, int) and isinstance(b, int) else result except Exception as e: return f"Error in addition: {str(e)}" @tool def power(a: Union[int, float], b: Union[int, float]) -> float: """Raises a number to the power of another. Args: a: The base number. b: The exponent. Returns: The result of raising `a` to the power of `b`. """ try: if a == 0 and b < 0: return "Error: Cannot raise 0 to a negative power" result = a ** b return result except OverflowError: return "Error: Result too large to compute" except Exception as e: return f"Error in power calculation: {str(e)}" @tool def subtract(a: Union[int, float], b: Union[int, float]) -> Union[int, float]: """Subtracts the second number from the first. Args: a: The number from which to subtract. b: The number to subtract. Returns: The result of `a` minus `b`. """ try: result = a - b return int(result) if isinstance(a, int) and isinstance(b, int) else result except Exception as e: return f"Error in subtraction: {str(e)}" @tool def divide(a: Union[int, float], b: Union[int, float]) -> float: """Divides one number by another. Args: a: The numerator. b: The denominator. Returns: The result of `a` divided by `b`. """ try: if b == 0: return "Error: Division by zero is not allowed" return a / b except Exception as e: return f"Error in division: {str(e)}" @tool def modulus(a: int, b: int) -> Union[int, str]: """Returns the remainder of the division of two integers. Args: a: The dividend. b: The divisor. Returns: The remainder when `a` is divided by `b`. """ try: if b == 0: return "Error: Modulus by zero is not allowed" return a % b except Exception as e: return f"Error in modulus operation: {str(e)}" @tool def square_root(x: Union[int, float]) -> Union[float, str]: """Returns the square root of a number. Args: x: The input number. Must be non-negative. Returns: The square root of `x`. """ try: if x < 0: return "Error: Square root of negative number is not allowed" return math.sqrt(x) except Exception as e: return f"Error in square root calculation: {str(e)}" @tool def floor_divide(a: int, b: int) -> Union[int, str]: """Performs integer division (floor division) of two numbers. Args: a: The dividend. b: The divisor. Returns: The floor of the quotient. """ try: if b == 0: return "Error: Division by zero is not allowed" return a // b except Exception as e: return f"Error in floor division: {str(e)}" @tool def absolute(x: Union[int, float]) -> Union[int, float]: """Returns the absolute value of a number. Args: x: The input number. Returns: The absolute value of `x`. """ try: result = abs(x) return int(result) if isinstance(x, int) else result except Exception as e: return f"Error in absolute value calculation: {str(e)}" @tool def logarithm(x: Union[int, float], base: Union[int, float] = math.e) -> Union[float, str]: """Returns the logarithm of a number with a given base. Args: x: The number to take the logarithm of. Must be positive. base: The logarithmic base. Must be positive and not equal to 1. Returns: The logarithm of `x` to the given base. """ try: if x <= 0: return "Error: Logarithm input must be positive" if base <= 0 or base == 1: return "Error: Logarithm base must be positive and not equal to 1" return math.log(x, base) except Exception as e: return f"Error in logarithm calculation: {str(e)}" @tool def exponential(x: Union[int, float]) -> Union[float, str]: """Returns e raised to the power of `x`. Args: x: The exponent. Returns: The value of e^x. """ try: if x > 700: # Prevent overflow return "Error: Exponent too large, would cause overflow" return math.exp(x) except OverflowError: return "Error: Result too large to compute" except Exception as e: return f"Error in exponential calculation: {str(e)}" @tool def roman_calculator_converter(value1: int, value2: int, oper: str) -> str: """Performs an operation on 2 numbers and returns the result as a Roman numeral. Args: value1: The first value value2: The second value oper: Operator for the calculation ("add", "subtract", "multiply", "divide") Returns: The result as a Roman numeral string. """ try: # Input validation if not isinstance(value1, int) or not isinstance(value2, int): return "Error: Both values must be integers" if oper not in ["add", "subtract", "multiply", "divide"]: return "Error: Operator must be 'add', 'subtract', 'multiply', or 'divide'" # Roman numeral mapping roman_numerals = [ (1000, "M"), (900, "CM"), (500, "D"), (400, "CD"), (100, "C"), (90, "XC"), (50, "L"), (40, "XL"), (10, "X"), (9, "IX"), (5, "V"), (4, "IV"), (1, "I") ] # Perform calculation if oper == "add": result = value1 + value2 elif oper == "subtract": result = value1 - value2 elif oper == "multiply": result = value1 * value2 elif oper == "divide": if value2 == 0: return "Error: Division by zero is not allowed" result = int(value1 / value2) # Integer division for Roman numerals # Handle invalid results for Roman numerals if result <= 0: return f"Error: Roman numerals cannot represent zero or negative numbers. Result was: {result}" if result > 3999: # Roman numerals traditionally don't go beyond this return f"Error: Result ({result}) is too large for standard Roman numeral representation" # Convert to Roman numeral roman_string = "" for value, numeral in roman_numerals: count = result // value if count: roman_string += numeral * count result -= value * count return f"The result of {oper}ing {value1} and {value2} is: {roman_string}" except Exception as e: return f"Error in Roman calculator: {str(e)}" @tool def get_current_time_in_timezone(timezone: str) -> str: """Fetches the current local time in a specified timezone. Args: timezone: A string representing a valid timezone (e.g., 'America/New_York', 'Europe/London'). Returns: The current time in the specified timezone. """ try: if not timezone or not timezone.strip(): return "Error: Timezone cannot be empty" # Clean the timezone string timezone = timezone.strip() # Handle common timezone aliases timezone_aliases = { 'EST': 'America/New_York', 'PST': 'America/Los_Angeles', 'MST': 'America/Denver', 'CST': 'America/Chicago', 'GMT': 'GMT', 'UTC': 'UTC', 'CET': 'Europe/Berlin', 'JST': 'Asia/Tokyo', } if timezone.upper() in timezone_aliases: timezone = timezone_aliases[timezone.upper()] # Create timezone object tz = pytz.timezone(timezone) # Get current time in that timezone local_time = datetime.datetime.now(tz) formatted_time = local_time.strftime("%Y-%m-%d %H:%M:%S %Z") return f"The current local time in {timezone} is: {formatted_time}" except pytz.exceptions.UnknownTimeZoneError: return f"Error: Unknown timezone '{timezone}'. Please use a valid timezone like 'America/New_York' or 'Europe/London'" except Exception as e: return f"Error fetching time for timezone '{timezone}': {str(e)}" @tool def factorial(n: int) -> Union[int, str]: """Calculates the factorial of a non-negative integer. Args: n: A non-negative integer. Returns: The factorial of n. """ try: if not isinstance(n, int): return "Error: Input must be an integer" if n < 0: return "Error: Factorial is not defined for negative numbers" if n > 170: # Prevent overflow return "Error: Number too large for factorial calculation" result = math.factorial(n) return result except Exception as e: return f"Error calculating factorial: {str(e)}" @tool def greatest_common_divisor(a: int, b: int) -> Union[int, str]: """Finds the greatest common divisor of two integers. Args: a: First integer. b: Second integer. Returns: The greatest common divisor of a and b. """ try: if not isinstance(a, int) or not isinstance(b, int): return "Error: Both inputs must be integers" return math.gcd(abs(a), abs(b)) except Exception as e: return f"Error calculating GCD: {str(e)}" @tool def least_common_multiple(a: int, b: int) -> Union[int, str]: """Finds the least common multiple of two integers. Args: a: First integer. b: Second integer. Returns: The least common multiple of a and b. """ try: if not isinstance(a, int) or not isinstance(b, int): return "Error: Both inputs must be integers" if a == 0 or b == 0: return 0 return abs(a * b) // math.gcd(abs(a), abs(b)) except Exception as e: return f"Error calculating LCM: {str(e)}" @tool def is_prime(n: int) -> Union[bool, str]: """Checks if a number is prime. Args: n: The number to check. Returns: True if n is prime, False otherwise. """ try: if not isinstance(n, int): return "Error: Input must be an integer" if n < 2: return False if n == 2: return True if n % 2 == 0: return False # Check odd divisors up to sqrt(n) for i in range(3, int(math.sqrt(n)) + 1, 2): if n % i == 0: return False return True except Exception as e: return f"Error checking if prime: {str(e)}" @tool def percentage_calculator(part: Union[int, float], whole: Union[int, float]) -> Union[float, str]: """Calculates what percentage 'part' is of 'whole'. Args: part: The part value. whole: The whole value. Returns: The percentage as a float. """ try: if whole == 0: return "Error: Cannot calculate percentage when whole is zero" percentage = (part / whole) * 100 return round(percentage, 2) except Exception as e: return f"Error calculating percentage: {str(e)}" @tool def compound_interest(principal: Union[int, float], rate: Union[int, float], time: Union[int, float], compound_frequency: int = 1) -> Union[float, str]: """Calculates compound interest. Args: principal: The initial amount of money. rate: The annual interest rate (as a percentage, e.g., 5 for 5%). time: The time period in years. compound_frequency: How many times per year the interest is compounded (default: 1). Returns: The final amount after compound interest. """ try: if principal <= 0: return "Error: Principal must be positive" if rate < 0: return "Error: Interest rate cannot be negative" if time < 0: return "Error: Time cannot be negative" if compound_frequency <= 0: return "Error: Compound frequency must be positive" # Convert percentage to decimal rate_decimal = rate / 100 # Compound interest formula: A = P(1 + r/n)^(nt) amount = principal * (1 + rate_decimal / compound_frequency) ** (compound_frequency * time) return round(amount, 2) except Exception as e: return f"Error calculating compound interest: {str(e)}" @tool def convert_temperature(value: Union[int, float], from_unit: str, to_unit: str) -> Union[float, str]: """Converts temperature between Celsius, Fahrenheit, and Kelvin. Args: value: The temperature value to convert. from_unit: The source unit ('C', 'F', or 'K'). to_unit: The target unit ('C', 'F', or 'K'). Returns: The converted temperature value. """ try: from_unit = from_unit.upper().strip() to_unit = to_unit.upper().strip() valid_units = ['C', 'F', 'K', 'CELSIUS', 'FAHRENHEIT', 'KELVIN'] # Normalize unit names unit_map = { 'CELSIUS': 'C', 'FAHRENHEIT': 'F', 'KELVIN': 'K' } from_unit = unit_map.get(from_unit, from_unit) to_unit = unit_map.get(to_unit, to_unit) if from_unit not in ['C', 'F', 'K'] or to_unit not in ['C', 'F', 'K']: return "Error: Units must be 'C' (Celsius), 'F' (Fahrenheit), or 'K' (Kelvin)" if from_unit == to_unit: return float(value) # Convert to Celsius first if from_unit == 'F': celsius = (value - 32) * 5/9 elif from_unit == 'K': celsius = value - 273.15 else: # from_unit == 'C' celsius = value # Convert from Celsius to target unit if to_unit == 'F': result = celsius * 9/5 + 32 elif to_unit == 'K': result = celsius + 273.15 else: # to_unit == 'C' result = celsius return round(result, 2) except Exception as e: return f"Error converting temperature: {str(e)}" @tool def wikipedia_search(query: str) -> str: """ Search Wikipedia for a query and return maximum 2 results. Args: query: The search query. """ search_docs = WikipediaLoader(query=query, load_max_docs=2).load() formatted_search_docs = "\n\n---\n\n".join( [ f'\n{doc.page_content}\n' for doc in search_docs ]) return {"wiki_results": formatted_search_docs} @tool def arxiv_search(query: str) -> str: """ Search Arxiv for a query and return maximum 3 result. Args: query: The search query. """ search_docs = ArxivLoader(query=query, load_max_docs=3).load() formatted_search_docs = "\n\n---\n\n".join( [ f'\n{doc.page_content[:1000]}\n' for doc in search_docs ]) return {"arxiv_results": formatted_search_docs} @tool def analyze_excel_file(file_path: str, query: str) -> str: """ Analyze an Excel file using pandas and answer a question about it. Args: file_path (str): the path to the Excel file. query (str): Question about the data """ try: file = pandas.read_excel(file_path) result = ( f"Excel file loaded with {len(file)} rows and {len(file.columns)} columns.\n" ) result += f"Columns: {', '.join(file.columns)}\n\n" result += "Summary statistics:\n" result += str(file.describe()) return result except Exception as e: return f"Error analyzing Excel file: {str(e)}" @tool def python_code_parser(file_path: str) -> str: """ Parse Python code to extract function names and their docstrings. Args: file_path: The path to the Python file to parse. Returns: Interpreted Python code as a string. """ if not os.path.exists(file_path): return "0" loader = GenericLoader.from_filesystem( file_path, glob="**/*", suffixes=[".py"], parser=LanguageParser() ) search_docs = loader.load() formatted_search_docs = "\n\n---\n\n".join( [ f'\n{doc.page_content}\n' for doc in search_docs ]) return {"python_results": formatted_search_docs} @tool def audio_transcription(file_path: str) -> str: """ Transcribe an audio file to text using AssemblyAI. Args: file_path: The path to the audio file. Returns: The transcribed text from the audio file. """ search_docs = AssemblyAIAudioTranscriptLoader(file_path=file_path).load() formatted_search_docs = "\n\n---\n\n".join( [ f'\n{doc.page_content}\n' for doc in search_docs ]) return {"audio_results": formatted_search_docs} @tool def analyze_csv_file(file_path: str, query: str) -> str: """ Analyze a CSV file using pandas and answer a question about it. Args: file_path (str): the path to the CSV file. query (str): Question about the data """ try: file = pandas.read_csv(file_path) result = f"CSV file loaded with {len(file)} rows and {len(file.columns)} columns.\n" result += f"Columns: {', '.join(file.columns)}\n\n" result += "Summary statistics:\n" result += str(file.describe()) return result except Exception as e: return f"Error analyzing CSV file: {str(e)}" @tool def extract_text(img_path: str) -> str: """ Extract text from an image file using a multimodal model. This allowsto properly analyze the contents. """ vision_llm = ChatOpenAI(model="gpt-4o") all_text = "" # Read image and encode as base64 with open(img_path, "rb") as image_file: image_bytes = image_file.read() image_base64 = base64.b64encode(image_bytes).decode("utf-8") # Prepare the prompt including the base64 image data message = [ HumanMessage( content=[ { "type": "text", "text": ( "Extract all the text from this image. " "Return only the extracted text, no explanations." ), }, { "type": "image_url", "image_url": { "url": f"data:image/png;base64,{image_base64}" }, }, ] ) ] # Call the vision-capable model response = vision_llm.invoke(message) # Append extracted text all_text += response.content + "\n\n" return all_text.strip() @tool def reverse_sentence(text: str) -> str: """ Reverses the input text. In case a question is written in reversed text, it can be corrected with this tool. Args: text (str): The input string to be reversed. Returns: str: The reversed string. """ return text[::-1] @tool def web_search(query: str) -> str: """ Searches the web and returns a list of the most relevant URLs. Use this FIRST for complex queries, metadata questions, or to find the right sources. Then follow up with web_content_extract on the most promising URL. """ try: tavily_search = TavilySearch( max_results=5, topic="general", search_depth="advanced", include_raw_content=False, # Just URLs and snippets ) results = tavily_search.invoke(query) # Format results to show URLs and brief descriptions web_search_results = "Search Results:\n" for i, result in enumerate(results["results"], 1): web_search_results += f"{i}. {result['title']}: {result['url']}\n {result['content'][:150]}...\n\n" return web_search_results except Exception as e: return f"web_search tool error: {str(e)}" @tool def web_content_extract(url: str) -> str: """ Extracts and analyzes specific content from a URL using BeautifulSoup. Particularly effective for Wikipedia metadata pages, discussion pages, and structured web content. Can be used after web_search to get detailed information. """ try: headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" } response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() # Raise exception for 4XX/5XX responses soup = BeautifulSoup(response.text, 'html.parser') for element in soup.select('script, style, footer, nav, header'): if element: element.decompose() text = soup.body.get_text(separator='\n', strip=True) if soup.body else soup.get_text(separator='\n', strip=True) # Limit content length for response return f"Content extracted from {url}:\n\n{text[:10000]}..." if len(text) > 10000 else text except Exception as e: return f"web_content_extract tool error: {str(e)}"