New_Final_Assignment / toolsold.py
naman1102's picture
React_graph
14fa0cc
# tools.py
import pandas as pd
from pathlib import Path
import requests
import regex as re
import time
import os
from duckduckgo_search import DDGS
from langchain_core.tools import tool
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
def _download_file_for_task(task_id: str, ext: str) -> str:
"""
Helper: attempt to GET the remote file for a given task_id.
Saves under ./hf_files/{task_id}.{ext}. Returns the local path if successful,
or an empty string if no file / download failed.
"""
print("reached _download_file_for_task")
os.makedirs("hf_files", exist_ok=True)
local_path = os.path.join("hf_files", f"{task_id}.{ext}")
url = f"{DEFAULT_API_URL}/files/{task_id}"
try:
resp = requests.get(url, timeout=10)
if resp.status_code == 200 and resp.content:
print(f"Downloaded file from {url} to {local_path}")
with open(local_path, "wb") as f:
f.write(resp.content)
return local_path
except Exception:
print(f"Error downloading file from {url} to {local_path}")
pass
# If we get here, either 404 or download error
return ""
@tool
def image_tool(task_id: str) -> str:
"""
Expects: task_id is a string
Returns: "OCR text + brief caption or an error message"
"""
print("reached image_tool")
# path_or_id = state.get("ocr_path", "")
for ext in ("png", "jpg", "jpeg"):
candidate = _download_file_for_task(task_id, ext)
if candidate:
local_img = candidate
break
if not local_img or not os.path.exists(local_img):
return {
"ocr_path": None,
"ocr_result": "Error: No image file found (local nonexistent or download failed)."
}
# 2) Read raw bytes
try:
with open(local_img, "rb") as f:
image_bytes = f.read()
except Exception as e:
return f"Error reading image file: {e}"
# 3) Prepare HF Inference headers
hf_token = os.getenv("HF_TOKEN")
if not hf_token:
return "Error: HUGGINGFACE_API_KEY not set in environment."
headers = {"Authorization": f"Bearer {hf_token}"}
# 4) Call HF’s vision-ocr to extract text
ocr_text = ""
try:
ocr_resp = requests.post(
"https://api-inference.huggingface.co/models/google/vit-ocr",
headers=headers,
files={"file": image_bytes},
timeout=30
)
ocr_resp.raise_for_status()
ocr_json = ocr_resp.json()
# The JSON has “pages” → list of blocks → “lines” → each line has “text”
lines = []
for page in ocr_json.get("pages", []):
for line in page.get("lines", []):
lines.append(line.get("text", "").strip())
ocr_text = "\n".join(lines).strip() or "(no visible text)"
except Exception as e:
ocr_text = f"Error during HF OCR: {e}"
# 5) Call HF’s image-captioning to get a brief description
caption = ""
try:
cap_resp = requests.post(
"https://api-inference.huggingface.co/models/Salesforce/blip-image-captioning-base",
headers=headers,
files={"file": image_bytes},
timeout=30
)
cap_resp.raise_for_status()
cap_json = cap_resp.json()
# The response looks like: {"generated_text": "...caption..."}
caption = cap_json.get("generated_text", "").strip()
if not caption:
caption = "(no caption returned)"
except Exception as e:
caption = f"Error during HF captioning: {e}"
# 6) Combine OCR + caption
combined = f"OCR text:\n{ocr_text}\n\nImage caption:\n{caption}"
print("combined: ")
return combined
@tool
def excel_tool(task_id: str) -> str:
"""
Downloads <task_id>.xlsx (if any) and returns a stringified list of
records from the specified sheet. No fallback to user-supplied tables.
Expected keys in `task_id`:
• task_id – required (used to download the file)
returns: stringified list of records from the specified sheet
"""
print("reached excel_tool")
sheet = "Sheet1"
local_xlsx = _download_file_for_task(task_id, "xlsx")
if not local_xlsx or not os.path.exists(local_xlsx):
return "Error: Excel file not found for this task."
try:
xls = pd.ExcelFile(local_xlsx)
df = pd.read_excel(
xls,
sheet_name=sheet if sheet and sheet in xls.sheet_names else xls.sheet_names[0]
)
print(f"Excel file read successfully: {str(df.to_dict(orient='records'))}")
return str(df.to_dict(orient="records"))
except Exception as e:
return f"Error reading Excel file: {e}"
import openai
@tool
def audio_transcriber_tool(task_id: str) -> str:
"""
LangGraph tool for transcribing audio via OpenAI's Whisper API.
Expects: task_id is a string
Returns:
"<text or error message>"
Always attempts to download the file for the given path or task ID.
"""
print("reached audio_transcriber_tool")
# Always attempt to download the file, regardless of local existence
local_audio = ""
for ext in ("mp3", "wav", "m4a"):
candidate = _download_file_for_task(task_id, ext)
if candidate:
local_audio = candidate
break
if not local_audio or not os.path.exists(local_audio):
return "Error: No audio file found (download failed)."
# Send to OpenAI Whisper
try:
openai.api_key = os.getenv("OPENAI_API_KEY")
if not openai.api_key:
raise RuntimeError("OPENAI_API_KEY is not set in environment.")
with open(local_audio, "rb") as audio_file:
print("reached openai.audio.transcriptions.create")
response = openai.audio.transcriptions.create(
model="whisper-1",
file=audio_file,
)
print("reached response")
text = response.text.strip()
except Exception as e:
text = f"Error during transcription: {e}"
print(f"Transcripted as transcript: {text}")
return text
# tools.py
import re
import requests
@tool
def wikipedia_search_tool(wiki_query: str) -> str:
"""
LangGraph wrapper for searching Wikipedia.
Expects: wiki_query is a non‐empty string.
Returns: text summary of first matching page or an error message>"
If no valid wiki_query is provided, returns {}.
"""
print("reached wikipedia search tool")
query = wiki_query
if not query:
return {}
try:
# 1) Use the MediaWiki API to search for page titles matching the query
search_params = {
"action": "query",
"list": "search",
"srsearch": query,
"format": "json",
"utf8": 1
}
search_resp = requests.get("https://en.wikipedia.org/w/api.php", params=search_params, timeout=10)
search_resp.raise_for_status()
search_data = search_resp.json()
search_results = search_data.get("query", {}).get("search", [])
# print("wikipedia: search_results",search_results)
if not search_results:
print(f"No Wikipedia page found for '{query}'.")
return f"No Wikipedia page found for '{query}'."
# 2) Take the first search result's title
first_title = search_results[0].get("title", "")
if not first_title:
print("Unexpected format from Wikipedia search.")
return "Unexpected format from Wikipedia search."
# 3) Fetch the page summary for that title via the REST summary endpoint
title_for_url = requests.utils.requote_uri(first_title)
summary_url = f"https://en.wikipedia.org/api/rest_v1/page/summary/{title_for_url}"
summary_resp = requests.get(summary_url, timeout=10)
summary_resp.raise_for_status()
summary_data = summary_resp.json()
# 4) Extract either the "extract" field or a fallback message
summary_text = summary_data.get("extract")
if not summary_text:
summary_text = summary_data.get("description", "No summary available.")
print(f"Title: {first_title}\n\n{summary_text}")
return f"Title: {first_title}\n\n{summary_text}"
except requests.exceptions.RequestException as e:
return f"Wikipedia search error: {e}"
except Exception as e:
return f"Unexpected error in wikipedia_search_tool: {e}"
from langchain_openai import ChatOpenAI
from langchain.schema import SystemMessage, HumanMessage
LLM = ChatOpenAI(model_name="gpt-4.1-mini", temperature=0.2)
@tool
def analyze_code_tool(task_id: str) -> str:
"""
Either task_id OR (file + task_id)
Reads the code (max 400 lines / 10 kB) and asks the LLM for:
• plain-language summary
• list of key functions/classes
• obvious bugs or style smells
Returns that analysis as a string.
"""
print("reached analyze_code_tool")
code_txt = ""
if not task_id:
code_txt = "No code provided."
else:
path = _download_file_for_task(task_id, "py")
if not path:
return "Error: .py file not found for this task."
code_txt = Path(path).read_text(encoding="utf-8", errors="ignore")
# else:
# return "Error: neither snippet nor file provided."
# Truncate for safety
lines = code_txt.splitlines()[:400]
code_sample = "\n".join(lines)[:10_000]
prompt = [
SystemMessage(content="You are a senior Python code reviewer."),
HumanMessage(content=(
"Please analyse the following code. "
"Summarise what it does, list key functions/classes, "
"and point out any obvious bugs, performance issues or style problems.\n\n"
f"```python\n{code_sample}\n```"
"If you can then find the output of the code and return it in the output."
))
]
return LLM.invoke(prompt).content.strip()
# def web_search_tool(state: AgentState) -> AgentState:
# """
# Expects: state["web_search_query"] is a non‐empty string.
# Returns: {"web_search_query": None, "web_search_result": <string>}.
# Retries up to 5 times on either a DuckDuckGo “202 Ratelimit” response or any exception (e.g. timeout).
# """
# print("reached web_search_tool")
# query = state.get("web_search_query", "")
# if not query:
# return {} # nothing to do
# ddg = DDGS()
# max_retries = 5
# result_text = ""
# for attempt in range(1, max_retries + 1):
# try:
# result_text = str(ddg.text(query, max_results=5))
# except Exception as e:
# # Network error or timeout—retry up to max_retries
# if attempt < max_retries:
# print(f"web_search_tool: exception '{e}', retrying in 4 seconds ({attempt}/{max_retries})")
# time.sleep(4)
# continue
# else:
# # Final attempt failed
# return {
# "web_search_query": None,
# "web_search_result": f"Error during DuckDuckGo search: {e}"
# }
# # Check for DuckDuckGo rate‐limit indicator
# if "202 Ratelimit" in result_text:
# if attempt < max_retries:
# print(f"web_search_tool: received '202 Ratelimit', retrying in 4 seconds ({attempt}/{max_retries})")
# time.sleep(4)
# continue
# else:
# # Final attempt still rate‐limited
# break
# # Successful response (no exception and no rate‐limit text)
# break
# return {
# "web_search_query": None,
# "web_search_result": result_text
# }