# tools.py import pandas as pd from langchain_community.tools import DuckDuckGoSearchRun from pathlib import Path from PIL import Image import pytesseract from state import AgentState from langchain.schema import HumanMessage import regex as re def web_search_tool(state: AgentState) -> AgentState: """ Expects: state["web_search_query"] is a non‐empty string. Returns: {"web_search_query": None, "web_search_result": } We also clear web_search_query so we don’t loop forever. """ print("reached web search tool") query = state.get("web_search_query", "") if not query: return {} # nothing to do # Run DuckDuckGo ddg = DuckDuckGoSearchRun() result_text = ddg.run(query) print(f"web_search_result: {result_text}") return { "web_search_query": None, "web_search_result": result_text } def ocr_image_tool(state: AgentState) -> AgentState: """ Expects: state["ocr_path"] is a path to an image file. Returns: {"ocr_path": None, "ocr_result": }. """ print("reached ocr image tool") path = state.get("ocr_path", "") if not path: return {} try: img = Image.open(path) text = pytesseract.image_to_string(img) text = text.strip() or "(no visible text)" except Exception as e: text = f"Error during OCR: {e}" print(f"ocr_result: {text}") return { "ocr_path": None, "ocr_result": text } def parse_excel_tool(state: AgentState) -> AgentState: """ Attempts to read an actual .xlsx file at state["excel_path"]. If the file isn’t found, scans the conversation history for a Markdown‐style table and returns that instead. Returns: { "excel_path": None, "excel_sheet_name": None, "excel_result": "" } If neither a real file nor a table block is found, returns an error message. """ path = state.get("excel_path", "") sheet = state.get("excel_sheet_name", "") if not path: return {} # 1) Try reading the real file first if os.path.exists(path): try: xls = pd.ExcelFile(path) if sheet and sheet in xls.sheet_names: df = pd.read_excel(xls, sheet_name=sheet) else: df = pd.read_excel(xls, sheet_name=xls.sheet_names[0]) records = df.to_dict(orient="records") text = str(records) return { "excel_path": None, "excel_sheet_name": None, "excel_result": text } except Exception as e: # If there's an I/O or parsing error, fall through to table‐extraction print(f">>> parse_excel_tool: Error reading Excel file {path}: {e}") # 2) Fallback: extract a Markdown table from any HumanMessage in state["messages"] messages = state.get("messages", []) table_lines = [] collecting = False for msg in messages: if isinstance(msg, HumanMessage): for line in msg.content.splitlines(): # Start collecting when we see the first table header row if re.match(r"^\s*\|\s*[-A-Za-z0-9]", line): collecting = True if collecting: if not re.match(r"^\s*\|", line): # stop when the block ends (blank line or non‐table line) collecting = False break table_lines.append(line) if table_lines: break if not table_lines: return { "excel_path": None, "excel_sheet_name": None, "excel_result": "Error: No Excel file found and no Markdown table detected in prompt." } # Remove any separator rows like "| ---- | ---- |" clean_rows = [row for row in table_lines if not re.match(r"^\s*\|\s*-+", row)] table_block = "\n".join(clean_rows).strip() return { "excel_path": None, "excel_sheet_name": None, "excel_result": table_block } def run_tools(state: AgentState, tool_out: AgentState) -> AgentState: """ Merges whatever partial state the tool wrapper returned (tool_out) into the main state. That is, combine previous keys with new keys: new_state = { **state, **tool_out }. This node should be wired as its own graph node, not as a transition function. """ new_state = {**state, **tool_out} return new_state import os import os import openai from state import AgentState def audio_transcriber_tool(state: AgentState) -> AgentState: """ LangGraph tool for transcribing audio via OpenAI’s hosted Whisper API. Expects: state["audio_path"] to be a valid path to a .wav/.mp3/.m4a file. Returns: { "audio_path": None, "transcript": "" } If no valid audio_path is provided, returns {}. """ print("reached audio transcriber tool") path = state.get("audio_path", "") if not path or not os.path.exists(path): return {} try: openai.api_key = os.getenv("OPENAI_API_KEY") if not openai.api_key: raise RuntimeError("OPENAI_API_KEY is not set in environment.") with open(path, "rb") as audio_file: # For OpenAI Python library v0.27.0+: response = openai.Audio.transcribe("whisper-1", audio_file) # If using an older OpenAI library, use: # response = openai.Audio.create_transcription(file=audio_file, model="whisper-1") text = response["text"].strip() except Exception as e: text = f"Error during transcription: {e}" print(f"transcript: {text}") return { "audio_path": None, "transcript": text } # tools.py import re import requests from state import AgentState def wikipedia_search_tool(state: AgentState) -> AgentState: """ LangGraph wrapper for searching Wikipedia. Expects: state["wiki_query"] to be a non‐empty string. Returns: { "wiki_query": None, "wiki_result": "" } If no valid wiki_query is provided, returns {}. """ query = state.get("wiki_query", "").strip() if not query: return {} try: # 1) Use the MediaWiki API to search for page titles matching the query search_params = { "action": "query", "list": "search", "srsearch": query, "format": "json", "utf8": 1 } search_resp = requests.get("https://en.wikipedia.org/w/api.php", params=search_params, timeout=10) search_resp.raise_for_status() search_data = search_resp.json() search_results = search_data.get("query", {}).get("search", []) if not search_results: return {"wiki_query": None, "wiki_result": f"No Wikipedia page found for '{query}'."} # 2) Take the first search result's title first_title = search_results[0].get("title", "") if not first_title: return {"wiki_query": None, "wiki_result": "Unexpected format from Wikipedia search."} # 3) Fetch the page summary for that title via the REST summary endpoint title_for_url = requests.utils.requote_uri(first_title) summary_url = f"https://en.wikipedia.org/api/rest_v1/page/summary/{title_for_url}" summary_resp = requests.get(summary_url, timeout=10) summary_resp.raise_for_status() summary_data = summary_resp.json() # 4) Extract either the "extract" field or a fallback message summary_text = summary_data.get("extract") if not summary_text: summary_text = summary_data.get("description", "No summary available.") return { "wiki_query": None, "wiki_result": f"Title: {first_title}\n\n{summary_text}" } except requests.exceptions.RequestException as e: return {"wiki_query": None, "wiki_result": f"Wikipedia search error: {e}"} except Exception as e: return {"wiki_query": None, "wiki_result": f"Unexpected error in wikipedia_search_tool: {e}"}