naman1102's picture
wiki
a59a680
raw
history blame
8.37 kB
# tools.py
import pandas as pd
from langchain_community.tools import DuckDuckGoSearchRun
from pathlib import Path
from PIL import Image
import pytesseract
from state import AgentState
from langchain.schema import HumanMessage
import regex as re
def web_search_tool(state: AgentState) -> AgentState:
"""
Expects: state["web_search_query"] is a non‐empty string.
Returns: {"web_search_query": None, "web_search_result": <string>}
We also clear web_search_query so we don’t loop forever.
"""
print("reached web search tool")
query = state.get("web_search_query", "")
if not query:
return {} # nothing to do
# Run DuckDuckGo
ddg = DuckDuckGoSearchRun()
result_text = ddg.run(query)
print(f"web_search_result: {result_text}")
return {
"web_search_query": None,
"web_search_result": result_text
}
def ocr_image_tool(state: AgentState) -> AgentState:
"""
Expects: state["ocr_path"] is a path to an image file.
Returns: {"ocr_path": None, "ocr_result": <string>}.
"""
print("reached ocr image tool")
path = state.get("ocr_path", "")
if not path:
return {}
try:
img = Image.open(path)
text = pytesseract.image_to_string(img)
text = text.strip() or "(no visible text)"
except Exception as e:
text = f"Error during OCR: {e}"
print(f"ocr_result: {text}")
return {
"ocr_path": None,
"ocr_result": text
}
def parse_excel_tool(state: AgentState) -> AgentState:
"""
Attempts to read an actual .xlsx file at state["excel_path"]. If the file isn’t found,
scans the conversation history for a Markdown‐style table and returns that instead.
Returns:
{
"excel_path": None,
"excel_sheet_name": None,
"excel_result": "<either CSV‐like text or extracted Markdown table>"
}
If neither a real file nor a table block is found, returns an error message.
"""
path = state.get("excel_path", "")
sheet = state.get("excel_sheet_name", "")
if not path:
return {}
# 1) Try reading the real file first
if os.path.exists(path):
try:
xls = pd.ExcelFile(path)
if sheet and sheet in xls.sheet_names:
df = pd.read_excel(xls, sheet_name=sheet)
else:
df = pd.read_excel(xls, sheet_name=xls.sheet_names[0])
records = df.to_dict(orient="records")
text = str(records)
return {
"excel_path": None,
"excel_sheet_name": None,
"excel_result": text
}
except Exception as e:
# If there's an I/O or parsing error, fall through to table‐extraction
print(f">>> parse_excel_tool: Error reading Excel file {path}: {e}")
# 2) Fallback: extract a Markdown table from any HumanMessage in state["messages"]
messages = state.get("messages", [])
table_lines = []
collecting = False
for msg in messages:
if isinstance(msg, HumanMessage):
for line in msg.content.splitlines():
# Start collecting when we see the first table header row
if re.match(r"^\s*\|\s*[-A-Za-z0-9]", line):
collecting = True
if collecting:
if not re.match(r"^\s*\|", line):
# stop when the block ends (blank line or non‐table line)
collecting = False
break
table_lines.append(line)
if table_lines:
break
if not table_lines:
return {
"excel_path": None,
"excel_sheet_name": None,
"excel_result": "Error: No Excel file found and no Markdown table detected in prompt."
}
# Remove any separator rows like "| ---- | ---- |"
clean_rows = [row for row in table_lines if not re.match(r"^\s*\|\s*-+", row)]
table_block = "\n".join(clean_rows).strip()
return {
"excel_path": None,
"excel_sheet_name": None,
"excel_result": table_block
}
def run_tools(state: AgentState, tool_out: AgentState) -> AgentState:
"""
Merges whatever partial state the tool wrapper returned (tool_out)
into the main state. That is, combine previous keys with new keys:
new_state = { **state, **tool_out }.
This node should be wired as its own graph node, not as a transition function.
"""
new_state = {**state, **tool_out}
return new_state
import os
import os
import openai
from state import AgentState
def audio_transcriber_tool(state: AgentState) -> AgentState:
"""
LangGraph tool for transcribing audio via OpenAI’s hosted Whisper API.
Expects: state["audio_path"] to be a valid path to a .wav/.mp3/.m4a file.
Returns:
{
"audio_path": None,
"transcript": "<transcribed text or error message>"
}
If no valid audio_path is provided, returns {}.
"""
print("reached audio transcriber tool")
path = state.get("audio_path", "")
if not path or not os.path.exists(path):
return {}
try:
openai.api_key = os.getenv("OPENAI_API_KEY")
if not openai.api_key:
raise RuntimeError("OPENAI_API_KEY is not set in environment.")
with open(path, "rb") as audio_file:
# For OpenAI Python library v0.27.0+:
response = openai.Audio.transcribe("whisper-1", audio_file)
# If using an older OpenAI library, use:
# response = openai.Audio.create_transcription(file=audio_file, model="whisper-1")
text = response["text"].strip()
except Exception as e:
text = f"Error during transcription: {e}"
print(f"transcript: {text}")
return {
"audio_path": None,
"transcript": text
}
# tools.py
import re
import requests
from state import AgentState
def wikipedia_search_tool(state: AgentState) -> AgentState:
"""
LangGraph wrapper for searching Wikipedia.
Expects: state["wiki_query"] to be a non‐empty string.
Returns:
{
"wiki_query": None,
"wiki_result": "<text summary of first matching page or an error message>"
}
If no valid wiki_query is provided, returns {}.
"""
query = state.get("wiki_query", "").strip()
if not query:
return {}
try:
# 1) Use the MediaWiki API to search for page titles matching the query
search_params = {
"action": "query",
"list": "search",
"srsearch": query,
"format": "json",
"utf8": 1
}
search_resp = requests.get("https://en.wikipedia.org/w/api.php", params=search_params, timeout=10)
search_resp.raise_for_status()
search_data = search_resp.json()
search_results = search_data.get("query", {}).get("search", [])
if not search_results:
return {"wiki_query": None, "wiki_result": f"No Wikipedia page found for '{query}'."}
# 2) Take the first search result's title
first_title = search_results[0].get("title", "")
if not first_title:
return {"wiki_query": None, "wiki_result": "Unexpected format from Wikipedia search."}
# 3) Fetch the page summary for that title via the REST summary endpoint
title_for_url = requests.utils.requote_uri(first_title)
summary_url = f"https://en.wikipedia.org/api/rest_v1/page/summary/{title_for_url}"
summary_resp = requests.get(summary_url, timeout=10)
summary_resp.raise_for_status()
summary_data = summary_resp.json()
# 4) Extract either the "extract" field or a fallback message
summary_text = summary_data.get("extract")
if not summary_text:
summary_text = summary_data.get("description", "No summary available.")
return {
"wiki_query": None,
"wiki_result": f"Title: {first_title}\n\n{summary_text}"
}
except requests.exceptions.RequestException as e:
return {"wiki_query": None, "wiki_result": f"Wikipedia search error: {e}"}
except Exception as e:
return {"wiki_query": None, "wiki_result": f"Unexpected error in wikipedia_search_tool: {e}"}