Spaces:
Sleeping
Sleeping
import base64 | |
import json | |
import os | |
import pandas as pd | |
import re | |
import requests | |
import whisper | |
from datetime import datetime | |
from dotenv import find_dotenv, load_dotenv | |
from langchain.chains import RetrievalQA | |
from langchain.chat_models import init_chat_model | |
from langchain_community.document_loaders import ( | |
UnstructuredPDFLoader, UnstructuredPowerPointLoader, | |
UnstructuredWordDocumentLoader, WebBaseLoader) | |
from langchain_community.tools import DuckDuckGoSearchResults | |
from langchain_community.utilities import GoogleSerperAPIWrapper | |
from langchain_core.prompts import ChatPromptTemplate | |
from langchain_core.tools import tool | |
from langchain_tavily import TavilySearch | |
from typing import Optional | |
from youtube_transcript_api import YouTubeTranscriptApi | |
from yt_dlp import YoutubeDL | |
from retrieval import build_retriever, create_retrieval_qa | |
from web_utilities import get_wikipedia_article, parse_sections, fetch_wikipedia_page, MarkdownWebBaseLoader | |
def get_weather_info(location: str) -> str: | |
"""Fetches weather information for a given location. | |
Usage: | |
``` | |
# Initialize the tool | |
weather_info_tool = Tool( | |
name="get_weather_info", | |
func=get_weather_info, | |
description="Fetches weather information for a given location.") | |
``` | |
""" | |
load_dotenv(find_dotenv()) | |
api_key = os.getenv("OPENWEATHERMAP_API_KEY") | |
url = ( | |
f"https://api.openweathermap.org/data/2.5/" | |
f"weather?q={location}&appid={api_key}&units=metric" | |
) | |
res = requests.get(url, timeout=15) | |
data = res.json() | |
humidity = data["main"]["humidity"] | |
pressure = data["main"]["pressure"] | |
wind = data["wind"]["speed"] | |
description = data["weather"][0]["description"] | |
temp = data["main"]["temp"] | |
min_temp = data["main"]["temp_min"] | |
max_temp = data["main"]["temp_max"] | |
return ( | |
f"Weather in {location}: {description}, " | |
f"Temperature: {temp}°C, Min: {min_temp}°C, Max: {max_temp}°C, " | |
f"Humidity: {humidity}%, Pressure: {pressure} hPa, " | |
f"Wind Speed: {wind} m/s" | |
) | |
def add(a: int, b: int) -> int: | |
"""Adds two numbers together. | |
Args: | |
a (int): The first number. | |
b (int): The second number. | |
""" | |
return a + b | |
def get_sum(list_of_numbers: list[int]) -> int: | |
"""Sums a list of numbers. | |
Args: | |
list_of_numbers (list[int]): The list of numbers to sum. | |
""" | |
return sum(list_of_numbers) | |
def subtract(a: int, b: int) -> int: | |
"""Subtracts the second number from the first. | |
Args: | |
a (int): The first number. | |
b (int): The second number. | |
""" | |
return a - b | |
def multiply(a: int, b: int) -> int: | |
"""Multiplies two numbers together. | |
Args: | |
a (int): The first number. | |
b (int): The second number. | |
""" | |
return a * b | |
def divide(a: int, b: int) -> float: | |
"""Divides the first number by the second. | |
Args: | |
a (int): The first number. | |
b (int): The second number. | |
""" | |
if b == 0: | |
raise ValueError("Cannot divide by zero.") | |
return a / b | |
def get_current_time_and_date() -> str: | |
"""Returns the current time and date in ISO format.""" | |
return datetime.now().isoformat() | |
def reverse_text(text: str) -> str: | |
"""Reverses the given text. | |
Args: | |
text (str): The text to reverse. | |
""" | |
return text[::-1] | |
def wiki_search_qa(query: str, question: str) -> str: | |
"""Searches Wikipedia for a specific article and answers a question based on its content. | |
The function retrieves a Wikipedia article based on the provided query, converts it to Markdown, | |
and uses a retrieval-based QA system to answer the specified question. | |
Args: | |
query (str): A concise topic name with optional keywords, ideally matching the relevant Wikipedia page title. | |
question (str): The question to answer using the article. | |
Returns: | |
str: The answer to the question based on the retrieved article. | |
""" | |
article = get_wikipedia_article(query) | |
markdown = article["markdown"] | |
retriever = build_retriever(markdown) | |
qa = create_retrieval_qa(retriever=retriever) | |
return qa.invoke(question) | |
def wiki_search_article(query: str) -> str: | |
"""Search Wikipedia and return page_key plus a full table of contents (sections + subsections). | |
Args: | |
query (str): A concise topic name with optional keywords, ideally matching the relevant Wikipedia page title. | |
""" | |
article = get_wikipedia_article(query) | |
page_key = article["page_key"] | |
markdown = article["markdown"] | |
sections = parse_sections(markdown) | |
toc = [ | |
{"section": sec, "subsections": list(info["subsections"].keys())} | |
for sec, info in sections.items() | |
] | |
return json.dumps({"page_key": page_key, "toc": toc}) | |
def wiki_get_section( | |
page_key: str, section: str, subsection: Optional[str] = None | |
) -> str: | |
""" | |
Fetches the Markdown for a given top-level section or an optional subsection. | |
Args: | |
page_key: the article’s key (from wiki_search) | |
section: one of the top-level headings (## ...) | |
subsection: an optional subheading (### ...) under that section | |
Returns: | |
Markdown string of either the entire section or just the named subsection. | |
""" | |
result_dict = fetch_wikipedia_page(page_key=page_key) | |
markdown = result_dict.get("markdown") | |
sections = parse_sections(markdown) | |
sec_info = sections.get(section) | |
if not sec_info: | |
return f"Error: section '{section}' not found." | |
if subsection: | |
sub_md = sec_info["subsections"].get(subsection) | |
if not sub_md: | |
return f"Error: subsection '{subsection}' not found under '{section}'." | |
return sub_md | |
# no subsection requested → return the full section (with all its subsections) | |
return sec_info["full"] | |
def web_search(query: str, max_results: int = 5) -> str: | |
"""Searches the web for a given query and returns relevant results. | |
Args: | |
query (str): The search query. | |
max_results (int): The maximum number of results to return. Default is 3. | |
""" | |
if os.getenv("SERPER_API_KEY"): | |
# Preferred choice: Use Google Serper API for search | |
search_tool = GoogleSerperAPIWrapper() | |
results_dict = search_tool.results(query) | |
results = "\n".join( | |
[ | |
f"Title: {result['title']}\n" | |
f"URL: {result['link']}\n" | |
f"Content: {result['snippet']}\n" | |
for result in results_dict["organic"][:max_results] | |
] | |
) | |
elif os.getenv("TAVILY_API_KEY"): | |
search_tool = TavilySearch( | |
max_results=max_results, | |
topic="general", | |
) | |
results_dict = search_tool.invoke(query) | |
results = "\n".join( | |
[ | |
f"Title: {result['title']}\n" | |
f"URL: {result['url']}\n" | |
f"Content: {result['content']}\n" | |
for result in results_dict["results"] | |
] | |
) | |
else: | |
search_tool = DuckDuckGoSearchResults() | |
results = search_tool.invoke(query) | |
if results: | |
# Clean up the results to remove any unnecessary spaces or newlines, e.g. \n\n\n | |
results = re.sub(r"\n{2,}", "\n", results.strip()) | |
return results | |
else: | |
return "No results found." | |
def visit_website(url: str) -> str: | |
"""Visits a website and returns the content. | |
Args: | |
url (str): The URL of the website to visit. | |
""" | |
try: | |
page_content = MarkdownWebBaseLoader(url).load()[0].page_content | |
# Use retrieval chain if page_content is large | |
return page_content | |
except Exception as e: | |
return f"Could not retrieve website content. Error: {e}" | |
def get_youtube_video_info(video_url: str) -> str: | |
"""Fetches information about a YouTube video and its transcript if it is available. | |
Args: | |
video_url (str): The URL of the YouTube video. | |
""" | |
# Get information about the video using yt-dlp | |
try: | |
ydl_opts = { | |
"quiet": True, | |
"skip_download": True, | |
} | |
with YoutubeDL(ydl_opts) as ydl: | |
info = ydl.extract_info(video_url, download=False) | |
video_info = { | |
"Title": info.get("title"), | |
"Description": info.get("description"), | |
"Uploader": info.get("uploader"), | |
"Upload date": info.get("upload_date"), | |
"Duration": info.get("duration"), | |
"View count": info.get("view_count"), | |
"Like count": info.get("like_count"), | |
} | |
video_info_filtered = {k: v for k, v in video_info.items() if v is not None} | |
video_info_str = "\n".join( | |
[f"{k}: {v}" for k, v in video_info_filtered.items()] | |
) | |
except Exception as e: | |
print(f"Error fetching video info: {e}") | |
video_info_str = "" | |
try: | |
video_id = video_url.split("v=")[-1] | |
ytt_api = YouTubeTranscriptApi() | |
# We could add the option to load the transcript in a specific language | |
transcript = ytt_api.fetch(video_id) | |
sentences = [] | |
for t in transcript: | |
start = t.start | |
end = start + t.duration | |
sentences.append(f"{start:.2f} - {end:.2f}: {t.text}") | |
transcript_with_timestamps = "\n".join(sentences) | |
except Exception as e: | |
print(f"Error fetching transcript: {e}") | |
transcript_with_timestamps = "" | |
# Check if neither piece of data was fetched | |
if not video_info_str and not transcript_with_timestamps: | |
return "Could not fetch video information or transcript." | |
# Use fallbacks for whichever is missing | |
info = video_info_str or "Video information not available." | |
transcript_section = ( | |
f"\n\nTranscript:\n{transcript_with_timestamps}" | |
if transcript_with_timestamps | |
else "\n\nTranscript not available." | |
) | |
return f"{info}{transcript_section}" | |
def encode_image(image_path): | |
with open(image_path, "rb") as image_file: | |
return base64.b64encode(image_file.read()).decode("utf-8") | |
def ask_about_image(image_path: str, question: str) -> str: | |
"""Performs vision-based question answering on an image. | |
Args: | |
image_path (str): The path to the image file. | |
question (str): Your question about the image, as a natural language sentence. Provide as much context as possible. | |
""" | |
load_dotenv(find_dotenv()) | |
llm = init_chat_model("groq:meta-llama/llama-4-maverick-17b-128e-instruct") | |
prompt = ChatPromptTemplate( | |
[ | |
{ | |
"role": "user", | |
"content": [ | |
{ | |
"type": "text", | |
"text": "Please write a concise caption for the image that helps answer the following question: {question}", | |
}, | |
{ | |
"type": "image_url", | |
"image_url": { | |
"url": "data:image/{image_format};base64,{base64_image}", | |
}, | |
}, | |
], | |
} | |
] | |
) | |
file_suffix = os.path.splitext(image_path)[-1] | |
if file_suffix == ".png": | |
image_format = "png" | |
else: | |
# We could handle other formats explicitly, but for simplicity we assume JPEG | |
image_format = "jpeg" | |
chain = prompt | llm | |
response = chain.invoke( | |
{ | |
"question": question, | |
"base64_image": encode_image(image_path), | |
"image_format": image_format, | |
} | |
) | |
return response.text() | |
def transcribe_audio(audio_path: str) -> str: | |
"""Transcribes audio to text. | |
Args: | |
audio_path (str): The path to the audio file. | |
""" | |
model = whisper.load_model("base") | |
result = model.transcribe(audio_path) | |
text = result.get("text") | |
return text | |
def get_table_description(table: pd.DataFrame) -> str: | |
"""Generates a description of the table. If applicable, calculates sum and mean of numeric | |
columns. | |
Args: | |
table (pd.DataFrame): The table to describe. | |
""" | |
if table.empty: | |
return "The table is empty." | |
description = [] | |
total_sum = 0 | |
for column in table.select_dtypes(include=[int, float]).columns: | |
column_sum = table[column].sum() | |
column_mean = table[column].mean() | |
description.append( | |
f"Column '{column}': Sum = {column_sum}, Mean = {column_mean:.2f}" | |
) | |
total_sum += column_sum | |
if total_sum: | |
description.append(f"Total Sum of all numeric columns: {total_sum}") | |
if description: | |
description = "\n".join(description) | |
else: | |
description = "No numeric columns to summarize." | |
# Add the number of rows and columns | |
description += f"\n\nTable has {table.shape[0]} rows and {table.shape[1]} columns." | |
df_as_markdown = table.to_markdown() | |
description += f"\n\nTable:\n{df_as_markdown}" | |
return description | |
def inspect_file_as_text(file_path: str) -> str: | |
"""This tool reads a file as markdown text. It handles [".csv", ".xlsx", ".pptx", ".pdf", ".docx"], | |
and all other types of text files. IT DOES NOT HANDLE IMAGES. | |
Args: | |
file_path (str): The path to the file you want to read as text. If it is an image, use `vision_qa` tool. | |
""" | |
# TODO we could also pass the file content to a retrieval chain | |
try: | |
suffix = os.path.splitext(file_path)[-1] | |
if suffix in [".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff"]: | |
raise Exception( | |
"Cannot use inspect_file_as_text tool with images: use `vision_qa` tool instead!" | |
) | |
elif suffix in [".mp3", ".wav", ".flac", ".m4a"]: | |
raise Exception( | |
"Cannot use inspect_file_as_text tool with audio files: use `transcribe_audio` tool instead!" | |
) | |
elif suffix in [".csv", ".tsv", ".xlsx"]: | |
if suffix == ".csv": | |
df = pd.read_csv(file_path) | |
elif suffix == ".tsv": | |
df = pd.read_csv(file_path, sep="\t") | |
elif suffix == ".xlsx": | |
df = pd.read_excel(file_path) | |
else: | |
raise Exception(f"Unsupported file type: {suffix}") | |
table_description = get_table_description(df) | |
return table_description | |
elif suffix == ".pptx": | |
doc = UnstructuredPowerPointLoader(file_path) | |
return doc.load()[0].page_content | |
elif suffix == ".pdf": | |
doc = UnstructuredPDFLoader(file_path) | |
return doc.load()[0].page_content | |
elif suffix == ".docx": | |
doc = UnstructuredWordDocumentLoader(file_path) | |
return doc.load()[0].page_content | |
else: | |
# All other text files | |
with open(file_path, "r", encoding="utf-8") as file: | |
content = file.read() | |
return content | |
except Exception as e: | |
return f"Error file: {e}" | |