Spaces:
Sleeping
Sleeping
import os | |
import pandas as pd | |
# import wikipediaapi | |
from markdownify import markdownify as md | |
from smolagents import tool, LiteLLMModel | |
import whisper | |
from youtube_transcript_api import YouTubeTranscriptApi | |
from youtube_transcript_api.formatters import JSONFormatter | |
import base64 | |
import mimetypes | |
import requests # Keep for consistency, though not used for fetching image in this version | |
import os # Added for os.path.join | |
import re | |
from bs4 import BeautifulSoup, Tag, Comment | |
# that could be better done via a managed agent, but this is a quick hack to get it working | |
def describe_image_file(local_image_path: str) -> str: | |
""" | |
Describe the contents of a local image file in detail and return the description as text. | |
Args: | |
local_image_path (str): The path to the local image file to be described. | |
Returns: | |
str: A detailed description of the image contents. | |
""" | |
model = LiteLLMModel( | |
model_id='ollama/gemma3:27b', | |
api_base="https://192.168.5.217:8000", # replace with remote open-ai compatible server if necessary | |
api_key=os.getenv("OLLAMA_REVPROXY_SRVML"), | |
num_ctx=16384, # ollama default is 2048 which will often fail horribly. 8192 works for easy tasks, more is better. Check https://huggingface.co/spaces/NyxKrage/LLM-Model-VRAM-Calculator to calculate how much VRAM this will need for the selected model | |
ssl_verify=False, # Explicitly disable SSL verification | |
extra_headers={ | |
"Authorization": f"Bearer {os.getenv('OLLAMA_REVPROXY_SRVML')}", # Explicitly set auth header | |
}, | |
flatten_messages_as_text = False | |
) | |
text_prompt = "What is in this image? Describe it in detail." | |
try: | |
if not os.path.exists(local_image_path): | |
raise FileNotFoundError(f"Image file not found at {local_image_path}. Please ensure it was downloaded correctly.") | |
# 1. Read the image content from the local file | |
with open(local_image_path, "rb") as image_file: | |
image_content_bytes = image_file.read() | |
# 2. Base64 encode the image content | |
base64_image_bytes = base64.b64encode(image_content_bytes) | |
base64_image_string = base64_image_bytes.decode('utf-8') | |
# 3. Set MIME type based on file extension | |
if local_image_path.lower().endswith('.png'): | |
content_type = 'image/png' | |
elif local_image_path.lower().endswith('.jpg') or local_image_path.lower().endswith('.jpeg'): | |
content_type = 'image/jpeg' | |
elif local_image_path.lower().endswith('.gif'): | |
content_type = 'image/gif' | |
elif local_image_path.lower().endswith('.bmp'): | |
content_type = 'image/bmp' | |
elif local_image_path.lower().endswith('.webp'): | |
content_type = 'image/webp' | |
else: | |
content_type = mimetypes.guess_type(local_image_path)[0] or 'application/octet-stream' | |
print(f"Using specified MIME type: {content_type}") | |
# 4. Construct the data URI | |
data_uri = f"data:{content_type};base64,{base64_image_string}" | |
# Construct the messages payload | |
messages = [ | |
{ | |
"role": "user", | |
"content": [ | |
{"type": "text", "text": text_prompt}, | |
{ | |
"type": "image_url", | |
"image_url": { | |
"url": data_uri # Use the base64 data URI here | |
} | |
} | |
] | |
} | |
] | |
# Assuming 'model' is your LiteLLMModel instance initialized in a previous cell (e.g., cell 'dfc845ab') | |
if 'model' not in locals(): | |
raise NameError("Variable 'model' is not defined. Please run the cell that initializes the LiteLLMModel.") | |
response = model.generate(messages) | |
return response | |
except FileNotFoundError as fnf_err: | |
print(f"File error: {fnf_err}") | |
except NameError as ne: | |
print(f"A required variable might not be defined (e.g., filename, model): {ne}") | |
print("Please ensure the cells defining these variables have been run.") | |
except Exception as e: | |
print(f"An error occurred: {e}") | |
def get_youtube_video_transcript(video_id: str) -> str: | |
""" | |
Fetches the transcript of a YouTube video by its ID and returns it in JSON format. | |
The video ID can be found in the YouTube video URL: | |
https://www.youtube.com/watch?v=VIDEO_ID, where VIDEO_ID is the part after "v=". | |
example: for the url https://www.youtube.com/watch?v=L1vXCYZAYYM the video_id is "L1vXCYZAYYM". | |
Args: | |
video_id (str): The YouTube video ID. | |
Returns: | |
str: The transcript in JSON format. | |
""" | |
ytt_api = YouTubeTranscriptApi() | |
transcript = ytt_api.fetch(video_id) | |
formatter = JSONFormatter() | |
# .format_transcript(transcript) turns the transcript into a JSON string. | |
json_formatted = formatter.format_transcript(transcript) | |
return json_formatted | |
def transcribe_mp3(mp3_path: str, model_size: str = "base") -> str: | |
""" | |
Transcribe an MP3 file to text using Whisper. | |
Args: | |
mp3_path (str): Path to the MP3 file. | |
model_size (str): Whisper model size (tiny, base, small, medium, large). | |
Returns: | |
str: Transcribed text. | |
""" | |
transcription_path = mp3_path.replace(".mp3", "_transcript.txt") | |
# Check if transcription already exists | |
if os.path.exists(transcription_path): | |
with open(transcription_path, 'r', encoding='utf-8') as f: | |
return f.read() | |
# Load model | |
model = whisper.load_model(model_size) | |
# Transcribe | |
result = model.transcribe(mp3_path) | |
transcription = result["text"] | |
# Save transcription to file | |
with open(transcription_path, 'w', encoding='utf-8') as f: | |
f.write(transcription) | |
# Return the text | |
return transcription | |
def get_text_from_ascii_file(filepath: str) -> str: | |
""" | |
Reads the content of an ASCII text file and returns it as a string. | |
Args: | |
filepath (str): The path to the ASCII text file. | |
Returns: | |
str: The content of the file as a string. | |
""" | |
if not os.path.exists(filepath): | |
raise FileNotFoundError(f"The file at {filepath} does not exist.") | |
with open(filepath, "r") as f: | |
return f.read() | |
# @tool | |
# def get_wikipedia_page_content(page_title: str, lang: str='en') -> str: | |
# """ | |
# This function uses the `wikipediaapi` library to retrieve the content of a specified Wikipedia page in a given language. | |
# For example: for the url 'https://en.wikipedia.org/wiki/Python_(programming_language)' the page_title would be 'Python_(programming_language)' and the lang would be 'en'. | |
# It returns the content of the page as a Markdown-formatted string. | |
# Args: | |
# page_title (str): The title of the Wikipedia page to fetch. | |
# lang (str): The language of the Wikipedia page (default is 'en' for English). | |
# Returns: | |
# str: The content of the Wikipedia page. | |
# """ | |
# MY_EMAIL = os.getenv("MY_EMAIL", None) | |
# if MY_EMAIL is None: | |
# raise ValueError("MY_EMAIL environment variable is not set. Please set it to your email address.") | |
# wiki_wiki = wikipediaapi.Wikipedia(user_agent=f'Wiki Agent ({MY_EMAIL})', language=lang) | |
# page = wiki_wiki.page(page_title) | |
# if not page.exists(): | |
# raise ValueError(f"The Wikipedia page '{page_title}' does not exist.") | |
# return md(page.text) | |
def get_wikipedia_markdown( | |
title: str, | |
lang: str = 'en', | |
ignore_references: bool = True, | |
ignore_links: bool = True | |
) -> str: | |
""" | |
Fetches the main content of a Wikipedia page and returns it as Markdown, | |
excluding infoboxes, navigation templates, images, and—if requested—the | |
References, Further reading, and External links sections. It's recommended | |
to start with ignore_references=True and ignore_links=True | |
to reduce the amount of output to the pure infomation. | |
Args: | |
title (str): Wikipedia page title (e.g., "Mercedes_Sosa"). | |
lang (str): Language code (default 'en'). | |
ignore_references (bool): If True, drop "References", "Further reading", | |
and "External links" sections entirely. | |
ignore_links (bool): If True, strip out all <a> tags entirely. | |
Returns: | |
str: Markdown-formatted content of the main article body. | |
""" | |
# 1. Fetch raw HTML | |
url = f"https://{lang}.wikipedia.org/wiki/{title}" | |
try: | |
response = requests.get(url) | |
response.raise_for_status() | |
except requests.exceptions.HTTPError as e: | |
# use wikipedia's API to check if the page exists | |
api_url = f"https://{lang}.wikipedia.org/w/api.php" | |
search_params = { | |
'list': 'search', | |
'srprop': '', | |
'srlimit': 10, | |
'limit': 10, | |
'srsearch': title.replace("_", " "), | |
'srinfo': 'suggestion', | |
'format': 'json', | |
'action': 'query' | |
} | |
headers = { | |
'User-Agent': "mozilla /5.0 (Windows NT 10.0; Win64; x64)" | |
} | |
r = requests.get(api_url, params=search_params, headers=headers) | |
raw_results = r.json() | |
search_results = [d['title'].replace(" ", "_") for d in raw_results['query']['search']] | |
if ('searchinfo' in raw_results['query']) and ('suggestion' in raw_results['query']['searchinfo']): | |
search_results.insert(0, raw_results['query']['searchinfo']['suggestion'].replace(" ", "_")) | |
errorMsg = f"Could not fetch page '{title}' for language '{lang}' (HTTP {response.status_code})." | |
if search_results: | |
errorMsg += f" Did you mean one of these pages? {', '.join(search_results)}" | |
raise ValueError(errorMsg) from e | |
html = response.text | |
# 2. Parse with BeautifulSoup and isolate the article’s main <div> | |
soup = BeautifulSoup(html, "lxml") | |
content_div = soup.find("div", class_="mw-parser-output") # | |
if content_div is None: | |
raise ValueError(f"Could not find main content for page '{title}'") | |
# 2a. Remove all “[edit]” links (<span class="mw-editsection">…) | |
for edit_span in content_div.find_all("span", class_="mw-editsection"): | |
edit_span.decompose() # | |
# 2b. Remove any superscript footnote markers (<sup class="reference">…) | |
for sup in content_div.find_all("sup", class_="reference"): | |
sup.decompose() # | |
# 2c. Remove any parser‐debug comments (e.g., “NewPP limit report…”, “Transclusion expansion time report…”) | |
for comment in content_div.find_all(string=lambda text: isinstance(text, Comment)): | |
comment_text = str(comment) | |
# If the comment contains debug keywords, extract it | |
if ( | |
"NewPP limit report" in comment_text | |
or "Transclusion expansion time report" in comment_text | |
or "Saved in parser cache" in comment_text | |
): | |
comment.extract() # | |
# 3. Remove unwanted “boilerplate” elements: | |
# a) Infoboxes (sidebars) | |
for infobox in content_div.find_all("table", class_=re.compile(r"infobox")): | |
infobox.decompose() # | |
# b) Table of Contents | |
toc = content_div.find("div", id="toc") | |
if toc: | |
toc.decompose() # | |
# c) Navigation templates (navbox/vertical-navbox/metadata) | |
for nav in content_div.find_all( | |
["div", "table"], | |
class_=re.compile(r"navbox|vertical-navbox|metadata") | |
): | |
nav.decompose() # | |
# d) Thumbnails / image wrappers | |
for thumb in content_div.find_all("div", class_=re.compile(r"thumb")): | |
thumb.decompose() # | |
# e) Raw <img> tags | |
for img in content_div.find_all("img"): | |
img.decompose() # | |
# 4. Convert any remaining <table> into a Markdown table **in-place** | |
def table_to_markdown(table_tag: Tag) -> str: | |
""" | |
Converts a <table> into a Markdown-formatted table, preserving <th> headers. | |
""" | |
headers = [] | |
header_row = table_tag.find("tr") | |
if header_row: | |
for th in header_row.find_all("th"): | |
headers.append(th.get_text(strip=True)) | |
md_table = "" | |
if headers: | |
md_table += "| " + " | ".join(headers) + " |\n" | |
md_table += "| " + " | ".join("---" for _ in headers) + " |\n" | |
# Now process data rows (skip the first <tr> if it was header row) | |
for row in table_tag.find_all("tr")[1:]: | |
cells = row.find_all(["td", "th"]) | |
if not cells: | |
continue | |
row_texts = [cell.get_text(strip=True) for cell in cells] | |
md_table += "| " + " | ".join(row_texts) + " |\n" | |
return md_table.rstrip() | |
for table in content_div.find_all("table"): | |
# Skip infobox/navigation tables (already removed above) | |
if "infobox" in table.get("class", []) or table.get("role") == "navigation": | |
continue | |
markdown_table = table_to_markdown(table) # | |
new_node = soup.new_string("\n\n" + markdown_table + "\n\n") | |
table.replace_with(new_node) | |
# 5. Remove “References”, “Further reading” & “External links” sections if requested | |
if ignore_references: | |
section_ids = {"references", "further_reading", "external_links"} | |
# We look for wrapper <div class="mw-heading mw-heading2"> or mw-heading3 | |
for wrapper in content_div.find_all("div", class_=re.compile(r"mw-heading mw-heading[23]")): | |
heading_tag = wrapper.find(re.compile(r"^h[2-3]$")) | |
if heading_tag and heading_tag.get("id", "").strip().lower() in section_ids: | |
# Collect every sibling until the next wrapper of the same form | |
siblings_to_remove = [] | |
for sib in wrapper.find_next_siblings(): | |
if ( | |
sib.name == "div" | |
and "mw-heading" in (sib.get("class") or []) | |
and re.match(r"mw-heading mw-heading[23]", " ".join(sib.get("class") or [])) | |
): | |
break | |
siblings_to_remove.append(sib) | |
# First delete those siblings | |
for node in siblings_to_remove: | |
node.decompose() # | |
# Finally delete the wrapper itself | |
wrapper.decompose() # | |
# 6. Convert the cleaned HTML into Markdown | |
markdown_options = {} | |
if ignore_links: | |
markdown_options["strip"] = ["a"] # strip all <a> tags (keep only their text) | |
raw_html = "".join(str(child) for child in content_div.children) | |
markdown_text = md(raw_html, **markdown_options) # | |
# 7. Collapse 3+ blank lines into exactly two | |
markdown_text = re.sub(r"\n{3,}", "\n\n", markdown_text).strip() | |
return markdown_text | |
def read_xls_File(file_path: str) -> object: | |
"""This tool loads xls file into pandas and returns it. | |
Args: | |
file_path (str): File path to the xls file. | |
Returns: | |
object: The loaded xls file as a pandas DataFrame. | |
""" | |
return pd.read_excel(file_path) |