Spaces:
Running
Running
from langchain_core.tools import tool | |
import datetime | |
import requests | |
import openai | |
import os | |
import tempfile | |
import pandas as pd | |
from urllib.parse import urlparse, parse_qs | |
from openai import OpenAI | |
from youtube_transcript_api import YouTubeTranscriptApi | |
from youtube_transcript_api._errors import TranscriptsDisabled, NoTranscriptFound, VideoUnavailable | |
from pytube import extract | |
from openai import OpenAI | |
from bs4 import BeautifulSoup | |
from io import BytesIO | |
from PyPDF2 import PdfReader | |
def add(a: float, b: float) -> float: | |
""" Adds two numbers. | |
Args: | |
a (float): first number | |
b (float): second number | |
""" | |
return a + b | |
def subtract(a: float, b: float) -> int: | |
""" Subtracts two numbers. | |
Args: | |
a (float): first number | |
b (float): second number | |
""" | |
return a - b | |
def multiply(a: float, b: float) -> float: | |
""" Multiplies two numbers. | |
Args: | |
a (float): first number | |
b (float): second number | |
""" | |
return a * b | |
def divide(a: float, b: float) -> float: | |
""" Divides two numbers. | |
Args: | |
a (float): first number | |
b (float): second number | |
""" | |
if b == 0: | |
raise ValueError("Cannot divide by zero.") | |
return a / b | |
def power(a: float, b: float) -> float: | |
""" Calculates the power of two numbers. | |
Args: | |
a (float): first number | |
b (float): second number | |
""" | |
return a**b | |
calculator_basic = [add, subtract, multiply, divide, power] | |
def current_date(_) -> str : | |
""" Returns the current date in YYYY-MM-DD format """ | |
return datetime.datetime.now().strftime("%Y-%m-%d") | |
def day_of_week(_) -> str : | |
""" Returns the current day of the week (e.g., Monday, Tuesday) """ | |
return datetime.datetime.now().strftime("%A") | |
def days_until(date_str: str) -> str : | |
""" Returns the number of days from today until a given date (input format: YYYY-MM-DD) """ | |
try: | |
future_date = datetime.datetime.strptime(date_str, "%Y-%m-%d").date() | |
today = datetime.date.today() | |
delta_days = (future_date - today).days | |
return f"{delta_days} days until {date_str}" | |
except Exception as e: | |
return f"Error parsing date: {str(e)}" | |
datetime_tools = [current_date, day_of_week, days_until] | |
def transcribe_audio(audio_file: str, file_extension: str) -> str: | |
""" Transcribes an audio file to text | |
Args: | |
audio_file (str): local file path to the audio file (.mp3, .m4a, etc.) | |
file_extension (str): file extension of the audio, e.g. mp3 | |
Returns: | |
str: The transcribed text from the audio. | |
""" | |
try: | |
response = requests.get(audio_file) # download the audio_file | |
response.raise_for_status() # check if the http request was successful | |
# clean file extension and save to disk | |
file_extension = file_extension.replace('.','') | |
filename = f'tmp.{file_extension}' | |
with open(filename, 'wb') as file: # opens a new file for writing with a name like, e.g. tmp.mp3 | |
file.write(response.content) # write(w) the binary(b) contents (audio file) to disk | |
# transcribe audio with OpenAI Whisper | |
client = OpenAI() | |
# read(r) the audio file from disk in binary(b) mode "rb"; the "with" block ensures the file is automatically closed afterward | |
with open(filename, "rb") as audio_content: | |
transcription = client.audio.transcriptions.create( | |
model="whisper-1", | |
file=audio_content | |
) | |
return transcription.text | |
except Exception as e: | |
return f"transcribe_audio failed: {e}" | |
def transcribe_youtube(youtube_url: str) -> str: | |
""" Transcribes a YouTube video | |
Args: | |
youtube_url (str): youtube video's url | |
Returns: | |
str: The transcribed text from the video. | |
""" | |
try: | |
query = urlparse(youtube_url).query | |
video_id = parse_qs(query)['v'][0] | |
except Exception: | |
return "invalid YouTube URL" | |
try: | |
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id) | |
transcript = transcript_list.find_transcript(['en']).fetch() | |
# keep only text | |
text = '\n'.join([t['text'] for t in transcript]) | |
return text | |
except (TranscriptsDisabled, NoTranscriptFound, VideoUnavailable) as e: | |
return f"transcript unavailable: {str(e)}" | |
except Exception as e: | |
return f"transcribe_youtube failed: {e}" | |
def query_image(query: str, image_url: str) -> str: | |
""" Ask anything about an image using a Vision Language Model | |
Args: | |
query (str): the query about the image, e.g. how many animals are on the image? | |
image_url (str): the image's URL | |
""" | |
try: | |
client = OpenAI() | |
response = client.responses.create( | |
model="gpt-4o-mini", | |
input=[ | |
{ | |
"role": "user", | |
"content": [ | |
{"type": "input_text", "text": query}, | |
{"type": "input_image","image_url": image_url}, | |
], | |
} | |
], | |
) | |
return response.output_text | |
except Exception as e: | |
return f"query_image failed: {e}" | |
def webpage_content(url: str) -> str: | |
""" Fetch text from a webpage or PDF file. | |
Args: | |
url (str): The URL of the webpage to fetch. | |
Returns: | |
str: Extracted text. | |
""" | |
try: | |
response = requests.get(url) | |
response.raise_for_status() | |
content_type = response.headers.get("Content-Type", "") | |
# PDF file | |
if "pdf" in content_type: | |
pdf_content = BytesIO(response.content) | |
reader = PdfReader(pdf_content) | |
return "\n".join(page.extract_text() or "" for page in reader.pages) | |
# HTML file | |
soup = BeautifulSoup(response.text, "html.parser") | |
body = soup.body | |
return body.get_text(separator="\n", strip=True) if body else soup.get_text(strip=True) | |
except Exception as e: | |
return f"webpage_content failed: {e}" | |
def read_excel(file_url: str) -> str: | |
""" Reads an Excel file from a URL and returns the content as CSV text. | |
Args: | |
file_url (str): URL to the Excel file (.xlsx, .xls) | |
Returns: | |
str: Content of the Excel file as CSV text. | |
""" | |
try: | |
response = requests.get(file_url) | |
response.raise_for_status() | |
excel_content = BytesIO(response.content) | |
df = pd.read_excel(excel_content) | |
return df.to_csv(index=False) # convert dataframe to CSV string for easy processing | |
except Exception as e: | |
return f"read_excel failed: {str(e)}" |