Spaces:
Sleeping
Sleeping
File size: 3,829 Bytes
1f5cba5 0e29657 1f5cba5 9fb6d05 0e29657 1f5cba5 0e29657 1f5cba5 0e29657 1f5cba5 0e29657 1f5cba5 0e29657 1f5cba5 0e29657 1f5cba5 0e29657 1f5cba5 0e29657 65abbbc 7fb0070 92c94e2 7fb0070 92c94e2 7fb0070 09b1a3d 7fb0070 09b1a3d 7fb0070 09b1a3d 7fb0070 09b1a3d 7fb0070 09b1a3d 7fb0070 09b1a3d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# tools.py
import pandas as pd
from langchain_community.tools import DuckDuckGoSearchRun
from pathlib import Path
from PIL import Image
import pytesseract
from state import AgentState
def web_search_tool(state: AgentState) -> AgentState:
"""
Expects: state["web_search_query"] is a non‐empty string.
Returns: {"web_search_query": None, "web_search_result": <string>}
We also clear web_search_query so we don’t loop forever.
"""
query = state.get("web_search_query", "")
if not query:
return {} # nothing to do
# Run DuckDuckGo
ddg = DuckDuckGoSearchRun()
result_text = ddg.run(query)
return {
"web_search_query": None,
"web_search_result": result_text
}
def ocr_image_tool(state: AgentState) -> AgentState:
"""
Expects: state["ocr_path"] is a path to an image file.
Returns: {"ocr_path": None, "ocr_result": <string>}.
"""
path = state.get("ocr_path", "")
if not path:
return {}
try:
img = Image.open(path)
text = pytesseract.image_to_string(img)
text = text.strip() or "(no visible text)"
except Exception as e:
text = f"Error during OCR: {e}"
return {
"ocr_path": None,
"ocr_result": text
}
def parse_excel_tool(state: AgentState) -> AgentState:
"""
Expects: state["excel_path"] is a path to an .xlsx file,
and state["excel_sheet_name"] optionally names a sheet.
Returns: {"excel_path": None, "excel_sheet_name": None, "excel_result": <string>}.
"""
path = state.get("excel_path", "")
sheet = state.get("excel_sheet_name", "")
if not path:
return {}
try:
xls = pd.ExcelFile(path)
if sheet and sheet in xls.sheet_names:
df = pd.read_excel(xls, sheet_name=sheet)
else:
df = pd.read_excel(xls, sheet_name=xls.sheet_names[0])
records = df.to_dict(orient="records")
text = str(records)
except Exception as e:
text = f"Error reading Excel: {e}"
return {
"excel_path": None,
"excel_sheet_name": None,
"excel_result": text
}
def run_tools(state: AgentState, tool_out: AgentState) -> AgentState:
"""
Merges whatever partial state the tool wrapper returned (tool_out)
into the main state. That is, combine previous keys with new keys:
new_state = { **state, **tool_out }.
This node should be wired as its own graph node, not as a transition function.
"""
new_state = {**state, **tool_out}
return new_state
import os
import os
import openai
from state import AgentState
def audio_transcriber_tool(state: AgentState) -> AgentState:
"""
LangGraph tool for transcribing audio via OpenAI’s hosted Whisper API.
Expects: state["audio_path"] to be a valid path to a .wav/.mp3/.m4a file.
Returns:
{
"audio_path": None,
"transcript": "<transcribed text or error message>"
}
If no valid audio_path is provided, returns {}.
"""
path = state.get("audio_path", "")
if not path or not os.path.exists(path):
return {}
try:
openai.api_key = os.getenv("OPENAI_API_KEY")
if not openai.api_key:
raise RuntimeError("OPENAI_API_KEY is not set in environment.")
with open(path, "rb") as audio_file:
# For OpenAI Python library v0.27.0+:
response = openai.Audio.transcribe("whisper-1", audio_file)
# If using an older OpenAI library, use:
# response = openai.Audio.create_transcription(file=audio_file, model="whisper-1")
text = response["text"].strip()
except Exception as e:
text = f"Error during transcription: {e}"
return {
"audio_path": None,
"transcript": text
} |