Spaces:
Sleeping
Sleeping
import base64 | |
import datetime | |
import math | |
import os | |
import urllib.parse | |
from pathlib import Path | |
from typing import Dict, Union | |
# from dotenv import load_dotenv | |
import pandas | |
import pytz | |
import requests | |
from bs4 import BeautifulSoup | |
from langchain_community.document_loaders import ( | |
ArxivLoader, AssemblyAIAudioTranscriptLoader, WikipediaLoader) | |
from langchain_community.document_loaders.generic import GenericLoader | |
from langchain_community.document_loaders.parsers import LanguageParser | |
from langchain_core.messages import HumanMessage | |
# from langchain_community.tools import DuckDuckGoSearchRun | |
# from langchain_community.tools import DuckDuckGoSearchRun | |
from langchain_core.tools import tool | |
from langchain_google_genai import ChatGoogleGenerativeAI | |
from langchain_openai import ChatOpenAI | |
from langchain_tavily import TavilySearch | |
# load_dotenv() | |
# OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
# os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY | |
def download_file(url: str, filename: str = None) -> str: | |
""" | |
Download a file from a URL and save it locally for analysis. | |
Analysis can be done using one or more other tools. | |
Args: | |
url: The URL of the file to download | |
filename: Optional custom filename. If not provided, will extract from URL | |
Returns: | |
The local file path where the file was saved | |
""" | |
try: | |
# Clean and validate URL | |
url = url.strip() | |
if not url.startswith(('http://', 'https://')): | |
return f"Error: Invalid URL format: {url}" | |
# Create downloads directory if it doesn't exist | |
download_dir = Path("downloads") | |
download_dir.mkdir(exist_ok=True) | |
# Determine filename | |
if not filename: | |
# Extract filename from URL | |
parsed_url = urllib.parse.urlparse(url) | |
filename = os.path.basename(parsed_url.path) | |
# If no filename in URL, create one based on URL hash | |
if not filename or '.' not in filename: | |
url_hash = str(hash(url))[-8:] # Last 8 characters of hash | |
filename = f"downloaded_file_{url_hash}" | |
# Ensure filename has proper extension if we can detect content type | |
filepath = download_dir / filename | |
# Download the file | |
headers = { | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
} | |
print(f"Downloading file from: {url}") | |
response = requests.get(url, headers=headers, timeout=30, stream=True) | |
response.raise_for_status() | |
# Try to get better filename from Content-Disposition header | |
if 'content-disposition' in response.headers: | |
import re | |
cd = response.headers['content-disposition'] | |
filename_match = re.findall('filename=(.+)', cd) | |
if filename_match: | |
suggested_filename = filename_match[0].strip('"') | |
if suggested_filename: | |
filepath = download_dir / suggested_filename | |
# Write file to disk | |
with open(filepath, 'wb') as f: | |
for chunk in response.iter_content(chunk_size=8192): | |
f.write(chunk) | |
file_size = os.path.getsize(filepath) | |
print(f"Successfully downloaded {filename} ({file_size} bytes)") | |
return str(filepath) | |
except requests.exceptions.RequestException as e: | |
return f"Error downloading file: {str(e)}" | |
except Exception as e: | |
return f"Error saving file: {str(e)}" | |
def multiply(a: Union[int, float], b: Union[int, float]) -> Union[int, float]: | |
"""Multiplies two numbers and returns the product. | |
Args: | |
a: The first number. | |
b: The second number. | |
Returns: | |
The product of the two input numbers. | |
""" | |
try: | |
result = a * b | |
return int(result) if isinstance(a, int) and isinstance(b, int) else result | |
except Exception as e: | |
return f"Error in multiplication: {str(e)}" | |
def add(a: Union[int, float], b: Union[int, float]) -> Union[int, float]: | |
"""Adds two numbers and returns the sum. | |
Args: | |
a: The first number. | |
b: The second number. | |
Returns: | |
The sum of the two input numbers. | |
""" | |
try: | |
result = a + b | |
return int(result) if isinstance(a, int) and isinstance(b, int) else result | |
except Exception as e: | |
return f"Error in addition: {str(e)}" | |
def power(a: Union[int, float], b: Union[int, float]) -> float: | |
"""Raises a number to the power of another. | |
Args: | |
a: The base number. | |
b: The exponent. | |
Returns: | |
The result of raising `a` to the power of `b`. | |
""" | |
try: | |
if a == 0 and b < 0: | |
return "Error: Cannot raise 0 to a negative power" | |
result = a ** b | |
return result | |
except OverflowError: | |
return "Error: Result too large to compute" | |
except Exception as e: | |
return f"Error in power calculation: {str(e)}" | |
def subtract(a: Union[int, float], b: Union[int, float]) -> Union[int, float]: | |
"""Subtracts the second number from the first. | |
Args: | |
a: The number from which to subtract. | |
b: The number to subtract. | |
Returns: | |
The result of `a` minus `b`. | |
""" | |
try: | |
result = a - b | |
return int(result) if isinstance(a, int) and isinstance(b, int) else result | |
except Exception as e: | |
return f"Error in subtraction: {str(e)}" | |
def divide(a: Union[int, float], b: Union[int, float]) -> float: | |
"""Divides one number by another. | |
Args: | |
a: The numerator. | |
b: The denominator. | |
Returns: | |
The result of `a` divided by `b`. | |
""" | |
try: | |
if b == 0: | |
return "Error: Division by zero is not allowed" | |
return a / b | |
except Exception as e: | |
return f"Error in division: {str(e)}" | |
def modulus(a: int, b: int) -> Union[int, str]: | |
"""Returns the remainder of the division of two integers. | |
Args: | |
a: The dividend. | |
b: The divisor. | |
Returns: | |
The remainder when `a` is divided by `b`. | |
""" | |
try: | |
if b == 0: | |
return "Error: Modulus by zero is not allowed" | |
return a % b | |
except Exception as e: | |
return f"Error in modulus operation: {str(e)}" | |
def square_root(x: Union[int, float]) -> Union[float, str]: | |
"""Returns the square root of a number. | |
Args: | |
x: The input number. Must be non-negative. | |
Returns: | |
The square root of `x`. | |
""" | |
try: | |
if x < 0: | |
return "Error: Square root of negative number is not allowed" | |
return math.sqrt(x) | |
except Exception as e: | |
return f"Error in square root calculation: {str(e)}" | |
def floor_divide(a: int, b: int) -> Union[int, str]: | |
"""Performs integer division (floor division) of two numbers. | |
Args: | |
a: The dividend. | |
b: The divisor. | |
Returns: | |
The floor of the quotient. | |
""" | |
try: | |
if b == 0: | |
return "Error: Division by zero is not allowed" | |
return a // b | |
except Exception as e: | |
return f"Error in floor division: {str(e)}" | |
def absolute(x: Union[int, float]) -> Union[int, float]: | |
"""Returns the absolute value of a number. | |
Args: | |
x: The input number. | |
Returns: | |
The absolute value of `x`. | |
""" | |
try: | |
result = abs(x) | |
return int(result) if isinstance(x, int) else result | |
except Exception as e: | |
return f"Error in absolute value calculation: {str(e)}" | |
def logarithm(x: Union[int, float], base: Union[int, float] = math.e) -> Union[float, str]: | |
"""Returns the logarithm of a number with a given base. | |
Args: | |
x: The number to take the logarithm of. Must be positive. | |
base: The logarithmic base. Must be positive and not equal to 1. | |
Returns: | |
The logarithm of `x` to the given base. | |
""" | |
try: | |
if x <= 0: | |
return "Error: Logarithm input must be positive" | |
if base <= 0 or base == 1: | |
return "Error: Logarithm base must be positive and not equal to 1" | |
return math.log(x, base) | |
except Exception as e: | |
return f"Error in logarithm calculation: {str(e)}" | |
def exponential(x: Union[int, float]) -> Union[float, str]: | |
"""Returns e raised to the power of `x`. | |
Args: | |
x: The exponent. | |
Returns: | |
The value of e^x. | |
""" | |
try: | |
if x > 700: # Prevent overflow | |
return "Error: Exponent too large, would cause overflow" | |
return math.exp(x) | |
except OverflowError: | |
return "Error: Result too large to compute" | |
except Exception as e: | |
return f"Error in exponential calculation: {str(e)}" | |
def roman_calculator_converter(value1: int, value2: int, oper: str) -> str: | |
"""Performs an operation on 2 numbers and returns the result as a Roman numeral. | |
Args: | |
value1: The first value | |
value2: The second value | |
oper: Operator for the calculation ("add", "subtract", "multiply", "divide") | |
Returns: | |
The result as a Roman numeral string. | |
""" | |
try: | |
# Input validation | |
if not isinstance(value1, int) or not isinstance(value2, int): | |
return "Error: Both values must be integers" | |
if oper not in ["add", "subtract", "multiply", "divide"]: | |
return "Error: Operator must be 'add', 'subtract', 'multiply', or 'divide'" | |
# Roman numeral mapping | |
roman_numerals = [ | |
(1000, "M"), (900, "CM"), (500, "D"), (400, "CD"), | |
(100, "C"), (90, "XC"), (50, "L"), (40, "XL"), | |
(10, "X"), (9, "IX"), (5, "V"), (4, "IV"), (1, "I") | |
] | |
# Perform calculation | |
if oper == "add": | |
result = value1 + value2 | |
elif oper == "subtract": | |
result = value1 - value2 | |
elif oper == "multiply": | |
result = value1 * value2 | |
elif oper == "divide": | |
if value2 == 0: | |
return "Error: Division by zero is not allowed" | |
result = int(value1 / value2) # Integer division for Roman numerals | |
# Handle invalid results for Roman numerals | |
if result <= 0: | |
return f"Error: Roman numerals cannot represent zero or negative numbers. Result was: {result}" | |
if result > 3999: # Roman numerals traditionally don't go beyond this | |
return f"Error: Result ({result}) is too large for standard Roman numeral representation" | |
# Convert to Roman numeral | |
roman_string = "" | |
for value, numeral in roman_numerals: | |
count = result // value | |
if count: | |
roman_string += numeral * count | |
result -= value * count | |
return f"The result of {oper}ing {value1} and {value2} is: {roman_string}" | |
except Exception as e: | |
return f"Error in Roman calculator: {str(e)}" | |
def get_current_time_in_timezone(timezone: str) -> str: | |
"""Fetches the current local time in a specified timezone. | |
Args: | |
timezone: A string representing a valid timezone (e.g., 'America/New_York', 'Europe/London'). | |
Returns: | |
The current time in the specified timezone. | |
""" | |
try: | |
if not timezone or not timezone.strip(): | |
return "Error: Timezone cannot be empty" | |
# Clean the timezone string | |
timezone = timezone.strip() | |
# Handle common timezone aliases | |
timezone_aliases = { | |
'EST': 'America/New_York', | |
'PST': 'America/Los_Angeles', | |
'MST': 'America/Denver', | |
'CST': 'America/Chicago', | |
'GMT': 'GMT', | |
'UTC': 'UTC', | |
'CET': 'Europe/Berlin', | |
'JST': 'Asia/Tokyo', | |
} | |
if timezone.upper() in timezone_aliases: | |
timezone = timezone_aliases[timezone.upper()] | |
# Create timezone object | |
tz = pytz.timezone(timezone) | |
# Get current time in that timezone | |
local_time = datetime.datetime.now(tz) | |
formatted_time = local_time.strftime("%Y-%m-%d %H:%M:%S %Z") | |
return f"The current local time in {timezone} is: {formatted_time}" | |
except pytz.exceptions.UnknownTimeZoneError: | |
return f"Error: Unknown timezone '{timezone}'. Please use a valid timezone like 'America/New_York' or 'Europe/London'" | |
except Exception as e: | |
return f"Error fetching time for timezone '{timezone}': {str(e)}" | |
def factorial(n: int) -> Union[int, str]: | |
"""Calculates the factorial of a non-negative integer. | |
Args: | |
n: A non-negative integer. | |
Returns: | |
The factorial of n. | |
""" | |
try: | |
if not isinstance(n, int): | |
return "Error: Input must be an integer" | |
if n < 0: | |
return "Error: Factorial is not defined for negative numbers" | |
if n > 170: # Prevent overflow | |
return "Error: Number too large for factorial calculation" | |
result = math.factorial(n) | |
return result | |
except Exception as e: | |
return f"Error calculating factorial: {str(e)}" | |
def greatest_common_divisor(a: int, b: int) -> Union[int, str]: | |
"""Finds the greatest common divisor of two integers. | |
Args: | |
a: First integer. | |
b: Second integer. | |
Returns: | |
The greatest common divisor of a and b. | |
""" | |
try: | |
if not isinstance(a, int) or not isinstance(b, int): | |
return "Error: Both inputs must be integers" | |
return math.gcd(abs(a), abs(b)) | |
except Exception as e: | |
return f"Error calculating GCD: {str(e)}" | |
def least_common_multiple(a: int, b: int) -> Union[int, str]: | |
"""Finds the least common multiple of two integers. | |
Args: | |
a: First integer. | |
b: Second integer. | |
Returns: | |
The least common multiple of a and b. | |
""" | |
try: | |
if not isinstance(a, int) or not isinstance(b, int): | |
return "Error: Both inputs must be integers" | |
if a == 0 or b == 0: | |
return 0 | |
return abs(a * b) // math.gcd(abs(a), abs(b)) | |
except Exception as e: | |
return f"Error calculating LCM: {str(e)}" | |
def is_prime(n: int) -> Union[bool, str]: | |
"""Checks if a number is prime. | |
Args: | |
n: The number to check. | |
Returns: | |
True if n is prime, False otherwise. | |
""" | |
try: | |
if not isinstance(n, int): | |
return "Error: Input must be an integer" | |
if n < 2: | |
return False | |
if n == 2: | |
return True | |
if n % 2 == 0: | |
return False | |
# Check odd divisors up to sqrt(n) | |
for i in range(3, int(math.sqrt(n)) + 1, 2): | |
if n % i == 0: | |
return False | |
return True | |
except Exception as e: | |
return f"Error checking if prime: {str(e)}" | |
def percentage_calculator(part: Union[int, float], whole: Union[int, float]) -> Union[float, str]: | |
"""Calculates what percentage 'part' is of 'whole'. | |
Args: | |
part: The part value. | |
whole: The whole value. | |
Returns: | |
The percentage as a float. | |
""" | |
try: | |
if whole == 0: | |
return "Error: Cannot calculate percentage when whole is zero" | |
percentage = (part / whole) * 100 | |
return round(percentage, 2) | |
except Exception as e: | |
return f"Error calculating percentage: {str(e)}" | |
def compound_interest(principal: Union[int, float], rate: Union[int, float], | |
time: Union[int, float], compound_frequency: int = 1) -> Union[float, str]: | |
"""Calculates compound interest. | |
Args: | |
principal: The initial amount of money. | |
rate: The annual interest rate (as a percentage, e.g., 5 for 5%). | |
time: The time period in years. | |
compound_frequency: How many times per year the interest is compounded (default: 1). | |
Returns: | |
The final amount after compound interest. | |
""" | |
try: | |
if principal <= 0: | |
return "Error: Principal must be positive" | |
if rate < 0: | |
return "Error: Interest rate cannot be negative" | |
if time < 0: | |
return "Error: Time cannot be negative" | |
if compound_frequency <= 0: | |
return "Error: Compound frequency must be positive" | |
# Convert percentage to decimal | |
rate_decimal = rate / 100 | |
# Compound interest formula: A = P(1 + r/n)^(nt) | |
amount = principal * (1 + rate_decimal / compound_frequency) ** (compound_frequency * time) | |
return round(amount, 2) | |
except Exception as e: | |
return f"Error calculating compound interest: {str(e)}" | |
def convert_temperature(value: Union[int, float], from_unit: str, to_unit: str) -> Union[float, str]: | |
"""Converts temperature between Celsius, Fahrenheit, and Kelvin. | |
Args: | |
value: The temperature value to convert. | |
from_unit: The source unit ('C', 'F', or 'K'). | |
to_unit: The target unit ('C', 'F', or 'K'). | |
Returns: | |
The converted temperature value. | |
""" | |
try: | |
from_unit = from_unit.upper().strip() | |
to_unit = to_unit.upper().strip() | |
valid_units = ['C', 'F', 'K', 'CELSIUS', 'FAHRENHEIT', 'KELVIN'] | |
# Normalize unit names | |
unit_map = { | |
'CELSIUS': 'C', 'FAHRENHEIT': 'F', 'KELVIN': 'K' | |
} | |
from_unit = unit_map.get(from_unit, from_unit) | |
to_unit = unit_map.get(to_unit, to_unit) | |
if from_unit not in ['C', 'F', 'K'] or to_unit not in ['C', 'F', 'K']: | |
return "Error: Units must be 'C' (Celsius), 'F' (Fahrenheit), or 'K' (Kelvin)" | |
if from_unit == to_unit: | |
return float(value) | |
# Convert to Celsius first | |
if from_unit == 'F': | |
celsius = (value - 32) * 5/9 | |
elif from_unit == 'K': | |
celsius = value - 273.15 | |
else: # from_unit == 'C' | |
celsius = value | |
# Convert from Celsius to target unit | |
if to_unit == 'F': | |
result = celsius * 9/5 + 32 | |
elif to_unit == 'K': | |
result = celsius + 273.15 | |
else: # to_unit == 'C' | |
result = celsius | |
return round(result, 2) | |
except Exception as e: | |
return f"Error converting temperature: {str(e)}" | |
def wikipedia_search(query: str) -> str: | |
""" | |
Search Wikipedia for a query and return maximum 2 results. | |
Args: | |
query: The search query. | |
""" | |
search_docs = WikipediaLoader(query=query, load_max_docs=2).load() | |
formatted_search_docs = "\n\n---\n\n".join( | |
[ | |
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>' | |
for doc in search_docs | |
]) | |
return {"wiki_results": formatted_search_docs} | |
def arxiv_search(query: str) -> str: | |
""" | |
Search Arxiv for a query and return maximum 3 result. | |
Args: | |
query: The search query. | |
""" | |
search_docs = ArxivLoader(query=query, load_max_docs=3).load() | |
formatted_search_docs = "\n\n---\n\n".join( | |
[ | |
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content[:1000]}\n</Document>' | |
for doc in search_docs | |
]) | |
return {"arxiv_results": formatted_search_docs} | |
def analyze_excel_file(file_path: str, query: str) -> str: | |
""" | |
Analyze an Excel file using pandas and answer a question about it. | |
Args: | |
file_path (str): the path to the Excel file. | |
query (str): Question about the data | |
""" | |
try: | |
file = pandas.read_excel(file_path) | |
result = ( | |
f"Excel file loaded with {len(file)} rows and {len(file.columns)} columns.\n" | |
) | |
result += f"Columns: {', '.join(file.columns)}\n\n" | |
result += "Summary statistics:\n" | |
result += str(file.describe()) | |
return result | |
except Exception as e: | |
return f"Error analyzing Excel file: {str(e)}" | |
def python_code_parser(file_path: str) -> str: | |
""" | |
Parse Python code to extract function names and their docstrings. | |
Args: | |
file_path: The path to the Python file to parse. | |
Returns: | |
Interpreted Python code as a string. | |
""" | |
if not os.path.exists(file_path): | |
return "0" | |
loader = GenericLoader.from_filesystem( | |
file_path, | |
glob="**/*", | |
suffixes=[".py"], | |
parser=LanguageParser() | |
) | |
search_docs = loader.load() | |
formatted_search_docs = "\n\n---\n\n".join( | |
[ | |
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>' | |
for doc in search_docs | |
]) | |
return {"python_results": formatted_search_docs} | |
def audio_transcription(file_path: str) -> str: | |
""" | |
Transcribe an audio file to text using AssemblyAI. | |
Args: | |
file_path: The path to the audio file. | |
Returns: | |
The transcribed text from the audio file. | |
""" | |
search_docs = AssemblyAIAudioTranscriptLoader(file_path=file_path).load() | |
formatted_search_docs = "\n\n---\n\n".join( | |
[ | |
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>' | |
for doc in search_docs | |
]) | |
return {"audio_results": formatted_search_docs} | |
def analyze_csv_file(file_path: str, query: str) -> str: | |
""" | |
Analyze a CSV file using pandas and answer a question about it. | |
Args: | |
file_path (str): the path to the CSV file. | |
query (str): Question about the data | |
""" | |
try: | |
file = pandas.read_csv(file_path) | |
result = f"CSV file loaded with {len(file)} rows and {len(file.columns)} columns.\n" | |
result += f"Columns: {', '.join(file.columns)}\n\n" | |
result += "Summary statistics:\n" | |
result += str(file.describe()) | |
return result | |
except Exception as e: | |
return f"Error analyzing CSV file: {str(e)}" | |
def extract_text(img_path: str) -> str: | |
""" | |
Extract text from an image file using a multimodal model. | |
This allowsto properly analyze the contents. | |
""" | |
vision_llm = ChatOpenAI(model="gpt-4o") | |
all_text = "" | |
# Read image and encode as base64 | |
with open(img_path, "rb") as image_file: | |
image_bytes = image_file.read() | |
image_base64 = base64.b64encode(image_bytes).decode("utf-8") | |
# Prepare the prompt including the base64 image data | |
message = [ | |
HumanMessage( | |
content=[ | |
{ | |
"type": "text", | |
"text": ( | |
"Extract all the text from this image. " | |
"Return only the extracted text, no explanations." | |
), | |
}, | |
{ | |
"type": "image_url", | |
"image_url": { | |
"url": f"data:image/png;base64,{image_base64}" | |
}, | |
}, | |
] | |
) | |
] | |
# Call the vision-capable model | |
response = vision_llm.invoke(message) | |
# Append extracted text | |
all_text += response.content + "\n\n" | |
return all_text.strip() | |
def reverse_sentence(text: str) -> str: | |
""" | |
Reverses the input text. In case a question is written in reversed text, it can be corrected with this tool. | |
Args: | |
text (str): The input string to be reversed. | |
Returns: | |
str: The reversed string. | |
""" | |
return text[::-1] | |
def web_search(query: str) -> str: | |
""" | |
Searches the web and returns a list of the most relevant URLs. | |
Use this FIRST for complex queries, metadata questions, or to find the right sources. | |
Then follow up with web_content_extract on the most promising URL. | |
""" | |
try: | |
tavily_search = TavilySearch( | |
max_results=5, | |
topic="general", | |
search_depth="advanced", | |
include_raw_content=False, # Just URLs and snippets | |
) | |
results = tavily_search.invoke(query) | |
# Format results to show URLs and brief descriptions | |
web_search_results = "Search Results:\n" | |
for i, result in enumerate(results["results"], 1): | |
web_search_results += f"{i}. {result['title']}: {result['url']}\n {result['content'][:150]}...\n\n" | |
return web_search_results | |
except Exception as e: | |
return f"web_search tool error: {str(e)}" | |
def web_content_extract(url: str) -> str: | |
""" | |
Extracts and analyzes specific content from a URL using BeautifulSoup. | |
Particularly effective for Wikipedia metadata pages, discussion pages, | |
and structured web content. | |
Can be used after web_search to get detailed information. | |
""" | |
try: | |
headers = { | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" | |
} | |
response = requests.get(url, headers=headers, timeout=10) | |
response.raise_for_status() # Raise exception for 4XX/5XX responses | |
soup = BeautifulSoup(response.text, 'html.parser') | |
for element in soup.select('script, style, footer, nav, header'): | |
if element: | |
element.decompose() | |
text = soup.body.get_text(separator='\n', strip=True) if soup.body else soup.get_text(separator='\n', strip=True) | |
# Limit content length for response | |
return f"Content extracted from {url}:\n\n{text[:10000]}..." if len(text) > 10000 else text | |
except Exception as e: | |
return f"web_content_extract tool error: {str(e)}" | |