Spaces:
Sleeping
Sleeping
# tools.py | |
import pandas as pd | |
from langchain_community.tools import DuckDuckGoSearchRun | |
from pathlib import Path | |
from PIL import Image | |
import pytesseract | |
from state import AgentState | |
from langchain.schema import HumanMessage | |
def web_search_tool(state: AgentState) -> AgentState: | |
""" | |
Expects: state["web_search_query"] is a non‐empty string. | |
Returns: {"web_search_query": None, "web_search_result": <string>} | |
We also clear web_search_query so we don’t loop forever. | |
""" | |
print("reached web search tool") | |
query = state.get("web_search_query", "") | |
if not query: | |
return {} # nothing to do | |
# Run DuckDuckGo | |
ddg = DuckDuckGoSearchRun() | |
result_text = ddg.run(query) | |
print(f"web_search_result: {result_text}") | |
return { | |
"web_search_query": None, | |
"web_search_result": result_text | |
} | |
def ocr_image_tool(state: AgentState) -> AgentState: | |
""" | |
Expects: state["ocr_path"] is a path to an image file. | |
Returns: {"ocr_path": None, "ocr_result": <string>}. | |
""" | |
print("reached ocr image tool") | |
path = state.get("ocr_path", "") | |
if not path: | |
return {} | |
try: | |
img = Image.open(path) | |
text = pytesseract.image_to_string(img) | |
text = text.strip() or "(no visible text)" | |
except Exception as e: | |
text = f"Error during OCR: {e}" | |
print(f"ocr_result: {text}") | |
return { | |
"ocr_path": None, | |
"ocr_result": text | |
} | |
def parse_excel_tool(state: AgentState) -> AgentState: | |
""" | |
Attempts to read an actual .xlsx file at state["excel_path"]. If the file isn’t found, | |
scans the conversation history for a Markdown‐style table and returns that instead. | |
Returns: | |
{ | |
"excel_path": None, | |
"excel_sheet_name": None, | |
"excel_result": "<either CSV‐like text or extracted Markdown table>" | |
} | |
If neither a real file nor a table block is found, returns an error message. | |
""" | |
path = state.get("excel_path", "") | |
sheet = state.get("excel_sheet_name", "") | |
if not path: | |
return {} | |
# 1) Try reading the real file first | |
if os.path.exists(path): | |
try: | |
xls = pd.ExcelFile(path) | |
if sheet and sheet in xls.sheet_names: | |
df = pd.read_excel(xls, sheet_name=sheet) | |
else: | |
df = pd.read_excel(xls, sheet_name=xls.sheet_names[0]) | |
records = df.to_dict(orient="records") | |
text = str(records) | |
return { | |
"excel_path": None, | |
"excel_sheet_name": None, | |
"excel_result": text | |
} | |
except Exception as e: | |
# If there's an I/O or parsing error, fall through to table‐extraction | |
print(f">>> parse_excel_tool: Error reading Excel file {path}: {e}") | |
# 2) Fallback: extract a Markdown table from any HumanMessage in state["messages"] | |
messages = state.get("messages", []) | |
table_lines = [] | |
collecting = False | |
for msg in messages: | |
if isinstance(msg, HumanMessage): | |
for line in msg.content.splitlines(): | |
# Start collecting when we see the first table header row | |
if re.match(r"^\s*\|\s*[-A-Za-z0-9]", line): | |
collecting = True | |
if collecting: | |
if not re.match(r"^\s*\|", line): | |
# stop when the block ends (blank line or non‐table line) | |
collecting = False | |
break | |
table_lines.append(line) | |
if table_lines: | |
break | |
if not table_lines: | |
return { | |
"excel_path": None, | |
"excel_sheet_name": None, | |
"excel_result": "Error: No Excel file found and no Markdown table detected in prompt." | |
} | |
# Remove any separator rows like "| ---- | ---- |" | |
clean_rows = [row for row in table_lines if not re.match(r"^\s*\|\s*-+", row)] | |
table_block = "\n".join(clean_rows).strip() | |
return { | |
"excel_path": None, | |
"excel_sheet_name": None, | |
"excel_result": table_block | |
} | |
def run_tools(state: AgentState, tool_out: AgentState) -> AgentState: | |
""" | |
Merges whatever partial state the tool wrapper returned (tool_out) | |
into the main state. That is, combine previous keys with new keys: | |
new_state = { **state, **tool_out }. | |
This node should be wired as its own graph node, not as a transition function. | |
""" | |
new_state = {**state, **tool_out} | |
return new_state | |
import os | |
import os | |
import openai | |
from state import AgentState | |
def audio_transcriber_tool(state: AgentState) -> AgentState: | |
""" | |
LangGraph tool for transcribing audio via OpenAI’s hosted Whisper API. | |
Expects: state["audio_path"] to be a valid path to a .wav/.mp3/.m4a file. | |
Returns: | |
{ | |
"audio_path": None, | |
"transcript": "<transcribed text or error message>" | |
} | |
If no valid audio_path is provided, returns {}. | |
""" | |
print("reached audio transcriber tool") | |
path = state.get("audio_path", "") | |
if not path or not os.path.exists(path): | |
return {} | |
try: | |
openai.api_key = os.getenv("OPENAI_API_KEY") | |
if not openai.api_key: | |
raise RuntimeError("OPENAI_API_KEY is not set in environment.") | |
with open(path, "rb") as audio_file: | |
# For OpenAI Python library v0.27.0+: | |
response = openai.Audio.transcribe("whisper-1", audio_file) | |
# If using an older OpenAI library, use: | |
# response = openai.Audio.create_transcription(file=audio_file, model="whisper-1") | |
text = response["text"].strip() | |
except Exception as e: | |
text = f"Error during transcription: {e}" | |
print(f"transcript: {text}") | |
return { | |
"audio_path": None, | |
"transcript": text | |
} |