Spaces:
Sleeping
Sleeping
File size: 8,368 Bytes
1f5cba5 0e29657 1f5cba5 9fb6d05 51b14d9 3563dd6 0e29657 66102de 0e29657 9d6ba16 0e29657 1f5cba5 0e29657 1f5cba5 0e29657 1f5cba5 66102de 0e29657 1f5cba5 0e29657 1f5cba5 0e29657 9d6ba16 0e29657 7dbc634 0e29657 1f5cba5 7dbc634 0e29657 7dbc634 0e29657 65abbbc 7fb0070 92c94e2 7fb0070 92c94e2 7fb0070 09b1a3d 7fb0070 09b1a3d 7fb0070 09b1a3d 7fb0070 66102de 7fb0070 09b1a3d 7fb0070 09b1a3d 7fb0070 9d6ba16 7fb0070 a59a680 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 |
# tools.py
import pandas as pd
from langchain_community.tools import DuckDuckGoSearchRun
from pathlib import Path
from PIL import Image
import pytesseract
from state import AgentState
from langchain.schema import HumanMessage
import regex as re
def web_search_tool(state: AgentState) -> AgentState:
"""
Expects: state["web_search_query"] is a non‐empty string.
Returns: {"web_search_query": None, "web_search_result": <string>}
We also clear web_search_query so we don’t loop forever.
"""
print("reached web search tool")
query = state.get("web_search_query", "")
if not query:
return {} # nothing to do
# Run DuckDuckGo
ddg = DuckDuckGoSearchRun()
result_text = ddg.run(query)
print(f"web_search_result: {result_text}")
return {
"web_search_query": None,
"web_search_result": result_text
}
def ocr_image_tool(state: AgentState) -> AgentState:
"""
Expects: state["ocr_path"] is a path to an image file.
Returns: {"ocr_path": None, "ocr_result": <string>}.
"""
print("reached ocr image tool")
path = state.get("ocr_path", "")
if not path:
return {}
try:
img = Image.open(path)
text = pytesseract.image_to_string(img)
text = text.strip() or "(no visible text)"
except Exception as e:
text = f"Error during OCR: {e}"
print(f"ocr_result: {text}")
return {
"ocr_path": None,
"ocr_result": text
}
def parse_excel_tool(state: AgentState) -> AgentState:
"""
Attempts to read an actual .xlsx file at state["excel_path"]. If the file isn’t found,
scans the conversation history for a Markdown‐style table and returns that instead.
Returns:
{
"excel_path": None,
"excel_sheet_name": None,
"excel_result": "<either CSV‐like text or extracted Markdown table>"
}
If neither a real file nor a table block is found, returns an error message.
"""
path = state.get("excel_path", "")
sheet = state.get("excel_sheet_name", "")
if not path:
return {}
# 1) Try reading the real file first
if os.path.exists(path):
try:
xls = pd.ExcelFile(path)
if sheet and sheet in xls.sheet_names:
df = pd.read_excel(xls, sheet_name=sheet)
else:
df = pd.read_excel(xls, sheet_name=xls.sheet_names[0])
records = df.to_dict(orient="records")
text = str(records)
return {
"excel_path": None,
"excel_sheet_name": None,
"excel_result": text
}
except Exception as e:
# If there's an I/O or parsing error, fall through to table‐extraction
print(f">>> parse_excel_tool: Error reading Excel file {path}: {e}")
# 2) Fallback: extract a Markdown table from any HumanMessage in state["messages"]
messages = state.get("messages", [])
table_lines = []
collecting = False
for msg in messages:
if isinstance(msg, HumanMessage):
for line in msg.content.splitlines():
# Start collecting when we see the first table header row
if re.match(r"^\s*\|\s*[-A-Za-z0-9]", line):
collecting = True
if collecting:
if not re.match(r"^\s*\|", line):
# stop when the block ends (blank line or non‐table line)
collecting = False
break
table_lines.append(line)
if table_lines:
break
if not table_lines:
return {
"excel_path": None,
"excel_sheet_name": None,
"excel_result": "Error: No Excel file found and no Markdown table detected in prompt."
}
# Remove any separator rows like "| ---- | ---- |"
clean_rows = [row for row in table_lines if not re.match(r"^\s*\|\s*-+", row)]
table_block = "\n".join(clean_rows).strip()
return {
"excel_path": None,
"excel_sheet_name": None,
"excel_result": table_block
}
def run_tools(state: AgentState, tool_out: AgentState) -> AgentState:
"""
Merges whatever partial state the tool wrapper returned (tool_out)
into the main state. That is, combine previous keys with new keys:
new_state = { **state, **tool_out }.
This node should be wired as its own graph node, not as a transition function.
"""
new_state = {**state, **tool_out}
return new_state
import os
import os
import openai
from state import AgentState
def audio_transcriber_tool(state: AgentState) -> AgentState:
"""
LangGraph tool for transcribing audio via OpenAI’s hosted Whisper API.
Expects: state["audio_path"] to be a valid path to a .wav/.mp3/.m4a file.
Returns:
{
"audio_path": None,
"transcript": "<transcribed text or error message>"
}
If no valid audio_path is provided, returns {}.
"""
print("reached audio transcriber tool")
path = state.get("audio_path", "")
if not path or not os.path.exists(path):
return {}
try:
openai.api_key = os.getenv("OPENAI_API_KEY")
if not openai.api_key:
raise RuntimeError("OPENAI_API_KEY is not set in environment.")
with open(path, "rb") as audio_file:
# For OpenAI Python library v0.27.0+:
response = openai.Audio.transcribe("whisper-1", audio_file)
# If using an older OpenAI library, use:
# response = openai.Audio.create_transcription(file=audio_file, model="whisper-1")
text = response["text"].strip()
except Exception as e:
text = f"Error during transcription: {e}"
print(f"transcript: {text}")
return {
"audio_path": None,
"transcript": text
}
# tools.py
import re
import requests
from state import AgentState
def wikipedia_search_tool(state: AgentState) -> AgentState:
"""
LangGraph wrapper for searching Wikipedia.
Expects: state["wiki_query"] to be a non‐empty string.
Returns:
{
"wiki_query": None,
"wiki_result": "<text summary of first matching page or an error message>"
}
If no valid wiki_query is provided, returns {}.
"""
query = state.get("wiki_query", "").strip()
if not query:
return {}
try:
# 1) Use the MediaWiki API to search for page titles matching the query
search_params = {
"action": "query",
"list": "search",
"srsearch": query,
"format": "json",
"utf8": 1
}
search_resp = requests.get("https://en.wikipedia.org/w/api.php", params=search_params, timeout=10)
search_resp.raise_for_status()
search_data = search_resp.json()
search_results = search_data.get("query", {}).get("search", [])
if not search_results:
return {"wiki_query": None, "wiki_result": f"No Wikipedia page found for '{query}'."}
# 2) Take the first search result's title
first_title = search_results[0].get("title", "")
if not first_title:
return {"wiki_query": None, "wiki_result": "Unexpected format from Wikipedia search."}
# 3) Fetch the page summary for that title via the REST summary endpoint
title_for_url = requests.utils.requote_uri(first_title)
summary_url = f"https://en.wikipedia.org/api/rest_v1/page/summary/{title_for_url}"
summary_resp = requests.get(summary_url, timeout=10)
summary_resp.raise_for_status()
summary_data = summary_resp.json()
# 4) Extract either the "extract" field or a fallback message
summary_text = summary_data.get("extract")
if not summary_text:
summary_text = summary_data.get("description", "No summary available.")
return {
"wiki_query": None,
"wiki_result": f"Title: {first_title}\n\n{summary_text}"
}
except requests.exceptions.RequestException as e:
return {"wiki_query": None, "wiki_result": f"Wikipedia search error: {e}"}
except Exception as e:
return {"wiki_query": None, "wiki_result": f"Unexpected error in wikipedia_search_tool: {e}"}
|