Spaces:
Sleeping
Sleeping
# tools.py | |
import pandas as pd | |
from pathlib import Path | |
import requests | |
import regex as re | |
import time | |
import os | |
from duckduckgo_search import DDGS | |
from langchain_core.tools import tool | |
from langchain_community.document_loaders import ArxivLoader | |
import arxiv | |
import fitz # PyMuPDF | |
import tempfile | |
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
# Removed complex safety wrapper - keeping things simple | |
def _download_file_for_task(task_id: str, ext: str) -> str: | |
""" | |
Helper: attempt to GET the remote file for a given task_id. | |
Saves under ./hf_files/{task_id}.{ext}. Returns the local path if successful, | |
or an empty string if no file / download failed. | |
""" | |
print("reached _download_file_for_task") | |
os.makedirs("hf_files", exist_ok=True) | |
local_path = os.path.join("hf_files", f"{task_id}.{ext}") | |
url = f"{DEFAULT_API_URL}/files/{task_id}" | |
try: | |
resp = requests.get(url, timeout=10) | |
if resp.status_code == 200 and resp.content: | |
print(f"\n Downloaded file from {url} to {local_path} \n") | |
with open(local_path, "wb") as f: | |
f.write(resp.content) | |
return local_path | |
except Exception: | |
print(f"Error downloading file from {url} to {local_path}") | |
pass | |
# If we get here, either 404 or download error | |
return "" | |
def image_tool(task_id: str) -> str: | |
""" | |
TOOL NAME: Image Analysis Tool | |
Purpose: When the user asks about images, photos, or visual content, use this tool to get a description of the image. | |
Input: A task_id string that identifies the specific image to analyze. | |
Example usage: | |
- "What is shown in this image?" | |
- "Describe the contents of the picture" | |
- "What objects are visible in the photo?" | |
""" | |
import requests, os | |
# Try downloading image with one of the allowed extensions | |
for ext in ("png", "jpg", "jpeg"): | |
file_path = _download_file_for_task(task_id, ext) | |
if file_path and os.path.exists(file_path): | |
break | |
else: | |
return f"Error: Image file for task_id '{task_id}' not found." | |
# Read the image bytes | |
try: | |
with open(file_path, "rb") as f: | |
image_bytes = f.read() | |
except Exception as e: | |
return f"Error reading image: {str(e)}" | |
# Load HF token | |
hf_token = os.getenv("HF_TOKEN") | |
if not hf_token: | |
return "Error: HF_TOKEN not set in environment." | |
# Use a single reliable model | |
model = "Salesforce/blip-image-captioning-base" | |
headers = {"Authorization": f"Bearer {hf_token}"} | |
try: | |
response = requests.post( | |
f"https://api-inference.huggingface.co/models/{model}", | |
headers=headers, | |
files={"file": image_bytes}, | |
timeout=30 | |
) | |
except Exception as e: | |
return f"Error calling HuggingFace API: {e}" | |
# Parse response | |
if response.status_code != 200: | |
return f"Error from model ({model}): {response.status_code} - {response.text}" | |
try: | |
result = response.json() | |
if isinstance(result, list) and result: | |
caption = result[0].get("generated_text", "").strip() | |
elif isinstance(result, dict): | |
caption = result.get("generated_text", "").strip() | |
else: | |
caption = "" | |
except Exception as e: | |
return f"Error parsing response: {e}" | |
if not caption: | |
return "No caption generated by model." | |
return f"Image Caption:\n{caption}" | |
def excel_tool(task_id: str) -> str: | |
""" | |
TOOL NAME: Excel Data Analysis Tool | |
Purpose: When the user asks about data in spreadsheets, tables, or Excel files, use this tool to read and analyze the data. | |
Input: A task_id string that identifies the specific Excel file to analyze. | |
Example usage: | |
- "What data is in this spreadsheet?" | |
- "Analyze the Excel file contents" | |
- "Show me the data from the table" | |
""" | |
print("reached excel_tool") | |
sheet = "Sheet1" | |
local_xlsx = _download_file_for_task(task_id, "xlsx") | |
if not local_xlsx or not os.path.exists(local_xlsx): | |
return "Error: Excel file not found for this task." | |
try: | |
xls = pd.ExcelFile(local_xlsx) | |
df = pd.read_excel( | |
xls, | |
sheet_name=sheet if sheet and sheet in xls.sheet_names else xls.sheet_names[0] | |
) | |
print(f"Excel file read successfully: {str(df.to_dict(orient='records'))}") | |
return str(df.to_dict(orient="records")) | |
except Exception as e: | |
return f"Error reading Excel file: {e}" | |
import openai | |
def audio_transcriber_tool(task_id: str) -> str: | |
""" | |
TOOL NAME: Audio Transcription Tool | |
Purpose: When the user asks about audio files, speech, or wants to know what was said in an audio recording, use this tool. | |
Input: A task_id string that identifies the specific audio file to transcribe. | |
Example usage: | |
- "What is said in this audio file?" | |
- "Transcribe the speech from the recording" | |
- "Convert the audio to text" | |
""" | |
print("reached audio_transcriber_tool") | |
# Always attempt to download the file, regardless of local existence | |
local_audio = "" | |
for ext in ("mp3", "wav", "m4a"): | |
candidate = _download_file_for_task(task_id, ext) | |
if candidate: | |
local_audio = candidate | |
break | |
if not local_audio or not os.path.exists(local_audio): | |
print("Error: No audio file found (download failed).") | |
return "Error: No audio file found (download failed)." | |
# Send to OpenAI Whisper | |
try: | |
openai.api_key = os.getenv("OPENAI_API_KEY") | |
if not openai.api_key: | |
raise RuntimeError("OPENAI_API_KEY is not set in environment.") | |
with open(local_audio, "rb") as audio_file: | |
print("reached openai.audio.transcriptions.create") | |
response = openai.audio.transcriptions.create( | |
model="whisper-1", | |
file=audio_file, | |
) | |
# print("reached response") | |
text = response.text.strip() | |
except Exception as e: | |
text = f"Error during transcription: {e}" | |
print(f"Transcripted as transcript: {text}") | |
return text | |
# tools.py | |
import re | |
import requests | |
def wikipedia_search_tool(wiki_query: str) -> str: | |
""" | |
TOOL NAME: Wikipedia Search Tool | |
Purpose: When the user asks about general knowledge, facts, or wants to know about a specific topic, use this tool. | |
Input: A string describing the topic to search for on Wikipedia. | |
Example usage: | |
- "What is the capital of France?" | |
- "Find information about quantum computing" | |
- "What is the history of the internet?" | |
If no valid wiki_query is provided, returns an empty string. | |
""" | |
print("reached wikipedia search tool") | |
# --- Simple in-memory cache to avoid repeated look-ups in a single session | |
if not hasattr(wikipedia_search_tool, "_cache"): | |
wikipedia_search_tool._cache = {} | |
query = wiki_query.strip() | |
if not query: | |
return "" | |
if query in wikipedia_search_tool._cache: | |
print("Returning cached Wikipedia result for query:", query) | |
return wikipedia_search_tool._cache[query] | |
try: | |
# 1) Use the MediaWiki API to search for page titles matching the query | |
search_params = { | |
"action": "query", | |
"list": "search", | |
"srsearch": query, | |
"format": "json", | |
"utf8": 1 | |
} | |
search_resp = requests.get("https://en.wikipedia.org/w/api.php", params=search_params, timeout=10) | |
search_resp.raise_for_status() | |
search_data = search_resp.json() | |
search_results = search_data.get("query", {}).get("search", []) | |
if not search_results: | |
msg = f"No Wikipedia page found for '{query}'. [END_OF_SEARCH]" | |
wikipedia_search_tool._cache[query] = msg | |
return msg | |
# 2) Take the first search result's title | |
first_title = search_results[0].get("title", "") | |
if not first_title: | |
msg = "Unexpected format from Wikipedia search. [END_OF_SEARCH]" | |
wikipedia_search_tool._cache[query] = msg | |
return msg | |
# 3) Fetch the page summary for that title via the REST summary endpoint | |
title_for_url = requests.utils.requote_uri(first_title) | |
summary_url = f"https://en.wikipedia.org/api/rest_v1/page/summary/{title_for_url}" | |
summary_resp = requests.get(summary_url, timeout=10) | |
summary_resp.raise_for_status() | |
summary_data = summary_resp.json() | |
# 4) Extract either the "extract" field or a fallback message | |
summary_text = summary_data.get("extract") | |
if not summary_text: | |
summary_text = summary_data.get("description", "No summary available.") | |
result = f"Title: {first_title}\n\n{summary_text}\n\n[END_OF_SEARCH]" | |
wikipedia_search_tool._cache[query] = result | |
print("Submitted wiki successfully") | |
return result | |
except requests.exceptions.RequestException as e: | |
print("Wikipedia search error: ", e) | |
return f"Wikipedia search error: {e} [END_OF_SEARCH]" | |
except Exception as e: | |
print("Unexpected error in wikipedia_search_tool: ", e) | |
return f"Unexpected error in wikipedia_search_tool: {e} [END_OF_SEARCH]" | |
def arxiv_search_tool(query: str) -> str: | |
""" | |
TOOL NAME: ArXiv Academic Search Tool | |
Purpose: When the user asks for academic research, scientific papers, or technical information, use this tool. | |
Input: A string describing the academic topic to search for on ArXiv. | |
Example usage: | |
- "Find research papers about machine learning" | |
- "What are recent studies on climate change?" | |
- "Search for papers on quantum computing" | |
""" | |
print("Reached ArXiv tool, with query = ", query) | |
try: | |
# Search arXiv for the top result | |
search = arxiv.Search(query=query, max_results=1, sort_by=arxiv.SortCriterion.Relevance) | |
result = next(search.results(), None) | |
if not result: | |
print("No arXiv result found") | |
return "No results found. [END_OF_SEARCH]" | |
# Download PDF | |
pdf_url = result.pdf_url | |
response = requests.get(pdf_url) | |
response.raise_for_status() | |
# Save and open PDF | |
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=True) as tmp: | |
tmp.write(response.content) | |
tmp.flush() | |
doc = fitz.open(tmp.name) | |
text = "" | |
for page in doc: | |
text += page.get_text() | |
# Clean and trim text | |
text = " ".join(text.split()) | |
summary = text[:3000] + "..." if len(text) > 3000 else text | |
return f"Title: {result.title}\n\nSummary:\n{summary}\n\n[END_OF_SEARCH]" | |
except Exception as e: | |
return f"Error fetching arXiv content: {e} [END_OF_SEARCH]" | |
from langchain_openai import ChatOpenAI | |
from langchain.schema import SystemMessage, HumanMessage | |
LLM = ChatOpenAI(model_name="gpt-4o-mini", temperature=0.2) | |
def analyze_code_tool(task_id: str) -> str: | |
""" | |
TOOL NAME: Code Analysis Tool | |
Purpose: When the user asks about code, programming files, or wants to understand what a script does, use this tool. | |
Input: A task_id string that identifies the specific code file to analyze. | |
Example usage: | |
- "What does this Python code do?" | |
- "Analyze the code file for bugs" | |
- "Explain the functions in this script" | |
""" | |
print("Reached analyze_code_tool") | |
code_txt = "" | |
if not task_id: | |
code_txt = "No code provided." | |
else: | |
path = _download_file_for_task(task_id, "py") | |
if not path: | |
print("Error: .py file not found for this task.") | |
return "Error: .py file not found for this task." | |
code_txt = Path(path).read_text(encoding="utf-8", errors="ignore") | |
lines = code_txt.splitlines() | |
code_sample = "\n".join(lines) | |
prompt = [ | |
SystemMessage(content="You are a senior Python code reviewer."), | |
HumanMessage(content=( | |
"Please analyse the following code. " | |
"Summarise what it does, list key functions/classes, " | |
"and point out any obvious bugs, performance issues or style problems.\n\n" | |
f"```python\n{code_sample}\n```" | |
"If you can then find the output of the code and return it in the output." | |
)) | |
] | |
return LLM.invoke(prompt).content.strip() | |
# ─────────────────────────── Math Tools ─────────────────────────────── | |
def add_tool(a: float, b: float) -> str: | |
""" | |
TOOL NAME: Addition Tool | |
Purpose: When the user asks to add numbers or perform addition calculations, use this tool. | |
Input: Two numbers (a and b) to add together. | |
Example usage: | |
- "What is 25 + 17?" | |
- "Add 3.14 and 2.86" | |
- "Calculate the sum of 100 and 250" | |
""" | |
print("Reached add_tool") | |
result = a + b | |
return f"Addition result: {a} + {b} = {result}" | |
def subtract_tool(a: float, b: float) -> str: | |
""" | |
TOOL NAME: Subtraction Tool | |
Purpose: When the user asks to subtract numbers or perform subtraction calculations, use this tool. | |
Input: Two numbers (a and b) where b is subtracted from a. | |
Example usage: | |
- "What is 50 - 23?" | |
- "Subtract 15.5 from 40.2" | |
- "Calculate 1000 minus 347" | |
""" | |
print("Reached subtract_tool") | |
result = a - b | |
return f"Subtraction result: {a} - {b} = {result}" | |
def multiply_tool(a: float, b: float) -> str: | |
""" | |
TOOL NAME: Multiplication Tool | |
Purpose: When the user asks to multiply numbers or perform multiplication calculations, use this tool. | |
Input: Two numbers (a and b) to multiply together. | |
Example usage: | |
- "What is 8 × 7?" | |
- "Multiply 12.5 by 4" | |
- "Calculate the product of 15 and 20" | |
""" | |
print("Reached multiply_tool") | |
result = a * b | |
return f"Multiplication result: {a} × {b} = {result}" | |
def divide_tool(a: float, b: float) -> str: | |
""" | |
TOOL NAME: Division Tool | |
Purpose: When the user asks to divide numbers or perform division calculations, use this tool. | |
Input: Two numbers (a and b) where a is divided by b. | |
Example usage: | |
- "What is 100 ÷ 4?" | |
- "Divide 75 by 3" | |
- "Calculate 144 divided by 12" | |
""" | |
print("Reached divide_tool") | |
if b == 0: | |
return "Division error: Cannot divide by zero" | |
result = a / b | |
return f"Division result: {a} ÷ {b} = {result}" | |
def web_search_tool(query: str) -> str: | |
""" | |
TOOL NAME: Web Search Tool | |
Purpose: When the user asks for current information, recent news, or topics not covered by Wikipedia, use this tool. | |
Input: A string describing what to search for on the web. | |
""" | |
print("reached web_search_tool") | |
if not hasattr(web_search_tool, "_cache"): | |
web_search_tool._cache = {} | |
query = query.strip() | |
if not query: | |
return "No search query provided." | |
if query in web_search_tool._cache: | |
print("Returning cached web search result for query:", query) | |
return web_search_tool._cache[query] | |
ddg = DDGS() | |
max_retries = 5 | |
result_text = "" | |
for attempt in range(1, max_retries + 1): | |
try: | |
result_text = str(ddg.text(query, max_results=5)) | |
except Exception as e: | |
if attempt < max_retries: | |
print(f"web_search_tool: exception '{e}', retrying in 4 seconds ({attempt}/{max_retries})") | |
time.sleep(4) | |
continue | |
else: | |
return f"Error during DuckDuckGo search: {e} [END_OF_SEARCH]" | |
if "202 Ratelimit" in result_text: | |
if attempt < max_retries: | |
print(f"web_search_tool: received '202 Ratelimit', retrying in 4 seconds ({attempt}/{max_retries})") | |
time.sleep(4) | |
continue | |
else: | |
break | |
break # Successful | |
result_text += "\n\n[END_OF_SEARCH]" | |
web_search_tool._cache[query] = result_text | |
print("Submitted web search successfully") | |
return result_text |