Spaces:
Sleeping
Sleeping
File size: 5,913 Bytes
1f5cba5 0e29657 1f5cba5 9fb6d05 51b14d9 0e29657 66102de 0e29657 9d6ba16 0e29657 1f5cba5 0e29657 1f5cba5 0e29657 1f5cba5 66102de 0e29657 1f5cba5 0e29657 1f5cba5 0e29657 9d6ba16 0e29657 7dbc634 0e29657 1f5cba5 7dbc634 0e29657 7dbc634 0e29657 65abbbc 7fb0070 92c94e2 7fb0070 92c94e2 7fb0070 09b1a3d 7fb0070 09b1a3d 7fb0070 09b1a3d 7fb0070 66102de 7fb0070 09b1a3d 7fb0070 09b1a3d 7fb0070 9d6ba16 7fb0070 09b1a3d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
# tools.py
import pandas as pd
from langchain_community.tools import DuckDuckGoSearchRun
from pathlib import Path
from PIL import Image
import pytesseract
from state import AgentState
from langchain.schema import HumanMessage
def web_search_tool(state: AgentState) -> AgentState:
"""
Expects: state["web_search_query"] is a non‐empty string.
Returns: {"web_search_query": None, "web_search_result": <string>}
We also clear web_search_query so we don’t loop forever.
"""
print("reached web search tool")
query = state.get("web_search_query", "")
if not query:
return {} # nothing to do
# Run DuckDuckGo
ddg = DuckDuckGoSearchRun()
result_text = ddg.run(query)
print(f"web_search_result: {result_text}")
return {
"web_search_query": None,
"web_search_result": result_text
}
def ocr_image_tool(state: AgentState) -> AgentState:
"""
Expects: state["ocr_path"] is a path to an image file.
Returns: {"ocr_path": None, "ocr_result": <string>}.
"""
print("reached ocr image tool")
path = state.get("ocr_path", "")
if not path:
return {}
try:
img = Image.open(path)
text = pytesseract.image_to_string(img)
text = text.strip() or "(no visible text)"
except Exception as e:
text = f"Error during OCR: {e}"
print(f"ocr_result: {text}")
return {
"ocr_path": None,
"ocr_result": text
}
def parse_excel_tool(state: AgentState) -> AgentState:
"""
Attempts to read an actual .xlsx file at state["excel_path"]. If the file isn’t found,
scans the conversation history for a Markdown‐style table and returns that instead.
Returns:
{
"excel_path": None,
"excel_sheet_name": None,
"excel_result": "<either CSV‐like text or extracted Markdown table>"
}
If neither a real file nor a table block is found, returns an error message.
"""
path = state.get("excel_path", "")
sheet = state.get("excel_sheet_name", "")
if not path:
return {}
# 1) Try reading the real file first
if os.path.exists(path):
try:
xls = pd.ExcelFile(path)
if sheet and sheet in xls.sheet_names:
df = pd.read_excel(xls, sheet_name=sheet)
else:
df = pd.read_excel(xls, sheet_name=xls.sheet_names[0])
records = df.to_dict(orient="records")
text = str(records)
return {
"excel_path": None,
"excel_sheet_name": None,
"excel_result": text
}
except Exception as e:
# If there's an I/O or parsing error, fall through to table‐extraction
print(f">>> parse_excel_tool: Error reading Excel file {path}: {e}")
# 2) Fallback: extract a Markdown table from any HumanMessage in state["messages"]
messages = state.get("messages", [])
table_lines = []
collecting = False
for msg in messages:
if isinstance(msg, HumanMessage):
for line in msg.content.splitlines():
# Start collecting when we see the first table header row
if re.match(r"^\s*\|\s*[-A-Za-z0-9]", line):
collecting = True
if collecting:
if not re.match(r"^\s*\|", line):
# stop when the block ends (blank line or non‐table line)
collecting = False
break
table_lines.append(line)
if table_lines:
break
if not table_lines:
return {
"excel_path": None,
"excel_sheet_name": None,
"excel_result": "Error: No Excel file found and no Markdown table detected in prompt."
}
# Remove any separator rows like "| ---- | ---- |"
clean_rows = [row for row in table_lines if not re.match(r"^\s*\|\s*-+", row)]
table_block = "\n".join(clean_rows).strip()
return {
"excel_path": None,
"excel_sheet_name": None,
"excel_result": table_block
}
def run_tools(state: AgentState, tool_out: AgentState) -> AgentState:
"""
Merges whatever partial state the tool wrapper returned (tool_out)
into the main state. That is, combine previous keys with new keys:
new_state = { **state, **tool_out }.
This node should be wired as its own graph node, not as a transition function.
"""
new_state = {**state, **tool_out}
return new_state
import os
import os
import openai
from state import AgentState
def audio_transcriber_tool(state: AgentState) -> AgentState:
"""
LangGraph tool for transcribing audio via OpenAI’s hosted Whisper API.
Expects: state["audio_path"] to be a valid path to a .wav/.mp3/.m4a file.
Returns:
{
"audio_path": None,
"transcript": "<transcribed text or error message>"
}
If no valid audio_path is provided, returns {}.
"""
print("reached audio transcriber tool")
path = state.get("audio_path", "")
if not path or not os.path.exists(path):
return {}
try:
openai.api_key = os.getenv("OPENAI_API_KEY")
if not openai.api_key:
raise RuntimeError("OPENAI_API_KEY is not set in environment.")
with open(path, "rb") as audio_file:
# For OpenAI Python library v0.27.0+:
response = openai.Audio.transcribe("whisper-1", audio_file)
# If using an older OpenAI library, use:
# response = openai.Audio.create_transcription(file=audio_file, model="whisper-1")
text = response["text"].strip()
except Exception as e:
text = f"Error during transcription: {e}"
print(f"transcript: {text}")
return {
"audio_path": None,
"transcript": text
} |