# tools.py import pandas as pd # from langchain_community.tools import DuckDuckGoSearchRun from pathlib import Path # from PIL import Image # import pytesseract from old.old2state import AgentState from langchain.schema import HumanMessage import regex as re import time from duckduckgo_search import DDGS DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" def _download_file_for_task(task_id: str, ext: str) -> str: """ Helper: attempt to GET the remote file for a given task_id. Saves under ./hf_files/{task_id}.{ext}. Returns the local path if successful, or an empty string if no file / download failed. """ print("reached _download_file_for_task") os.makedirs("hf_files", exist_ok=True) local_path = os.path.join("hf_files", f"{task_id}.{ext}") url = f"{DEFAULT_API_URL}/files/{task_id}" try: resp = requests.get(url, timeout=10) if resp.status_code == 200 and resp.content: print(f"Downloaded file from {url} to {local_path}") with open(local_path, "wb") as f: f.write(resp.content) return local_path except Exception: pass # If we get here, either 404 or download error return "" def web_search_tool(state: AgentState) -> AgentState: """ Expects: state["web_search_query"] is a non‐empty string. Returns: {"web_search_query": None, "web_search_result": }. Retries up to 5 times on either a DuckDuckGo “202 Ratelimit” response or any exception (e.g. timeout). """ print("reached web_search_tool") query = state.get("web_search_query", "") if not query: return {} # nothing to do ddg = DDGS() max_retries = 5 result_text = "" for attempt in range(1, max_retries + 1): try: result_text = str(ddg.text(query, max_results=5)) except Exception as e: # Network error or timeout—retry up to max_retries if attempt < max_retries: print(f"web_search_tool: exception '{e}', retrying in 4 seconds ({attempt}/{max_retries})") time.sleep(4) continue else: # Final attempt failed return { "web_search_query": None, "web_search_result": f"Error during DuckDuckGo search: {e}" } # Check for DuckDuckGo rate‐limit indicator if "202 Ratelimit" in result_text: if attempt < max_retries: print(f"web_search_tool: received '202 Ratelimit', retrying in 4 seconds ({attempt}/{max_retries})") time.sleep(4) continue else: # Final attempt still rate‐limited break # Successful response (no exception and no rate‐limit text) break return { "web_search_query": None, "web_search_result": result_text } def ocr_image_tool(state: AgentState) -> AgentState: """ Expects: state["ocr_path"] is either: • a local image path (e.g. "./hf_files/abc.png"), OR • a Task ID (e.g. "abc123"), in which case we try downloading GET {DEFAULT_API_URL}/files/{task_id} with .png/.jpg/.jpeg extensions. Returns: { "ocr_path": None, "ocr_result": "" } """ print("reached ocr_image_tool") path_or_id = state.get("ocr_path", "") # if not path_or_id: # return {} # 1) Determine local_img: either existing path_or_id or download by Task ID # local_img = "" # if os.path.exists(path_or_id): # local_img = path_or_id # else: for ext in ("png", "jpg", "jpeg"): candidate = _download_file_for_task(state.get("task_id"), ext) if candidate: local_img = candidate break if not local_img or not os.path.exists(local_img): return { "ocr_path": None, "ocr_result": "Error: No image file found (local nonexistent or download failed)." } # 2) Read raw bytes try: with open(local_img, "rb") as f: image_bytes = f.read() except Exception as e: return { "ocr_path": None, "ocr_result": f"Error reading image file: {e}" } # 3) Prepare HF Inference headers hf_token = os.getenv("HF_TOKEN") if not hf_token: return { "ocr_path": None, "ocr_result": "Error: HUGGINGFACE_API_KEY not set in environment." } headers = {"Authorization": f"Bearer {hf_token}"} # 4) Call HF’s vision-ocr to extract text ocr_text = "" try: ocr_resp = requests.post( "https://api-inference.huggingface.co/models/google/vit-ocr", headers=headers, files={"file": image_bytes}, timeout=30 ) ocr_resp.raise_for_status() ocr_json = ocr_resp.json() # The JSON has “pages” → list of blocks → “lines” → each line has “text” lines = [] for page in ocr_json.get("pages", []): for line in page.get("lines", []): lines.append(line.get("text", "").strip()) ocr_text = "\n".join(lines).strip() or "(no visible text)" except Exception as e: ocr_text = f"Error during HF OCR: {e}" # 5) Call HF’s image-captioning to get a brief description caption = "" try: cap_resp = requests.post( "https://api-inference.huggingface.co/models/Salesforce/blip-image-captioning-base", headers=headers, files={"file": image_bytes}, timeout=30 ) cap_resp.raise_for_status() cap_json = cap_resp.json() # The response looks like: {"generated_text": "...caption..."} caption = cap_json.get("generated_text", "").strip() if not caption: caption = "(no caption returned)" except Exception as e: caption = f"Error during HF captioning: {e}" # 6) Combine OCR + caption combined = f"OCR text:\n{ocr_text}\n\nImage caption:\n{caption}" print("combined: ") return { "ocr_path": None, "ocr_result": combined } def parse_excel_tool(state: AgentState) -> AgentState: """ Expects state["excel_path"] to be either: • A real local .xlsx path, or • A Task ID string (e.g. "abc123"), in which case we GET /files/abc123.xlsx. Returns: { "excel_path": None, "excel_sheet_name": None, "excel_result": "" } Always attempts to download the file for the given path or task ID. """ print("reached parse_excel_tool") local_xlsx = _download_file_for_task(state.get("task_id"), "xlsx") path_or_id = state.get("excel_path", "") sheet = state.get("excel_sheet_name", "") if not path_or_id: return {} # Always attempt to download the file, regardless of local existence # If we finally have a real file, read it if local_xlsx and os.path.exists(local_xlsx): try: print("reached excel file found") xls = pd.ExcelFile(local_xlsx) if sheet and sheet in xls.sheet_names: df = pd.read_excel(xls, sheet_name=sheet) else: df = pd.read_excel(xls, sheet_name=xls.sheet_names[0]) records = df.to_dict(orient="records") text = str(records) print("reached excel file found: ") print(text) print() return { "excel_path": None, "excel_sheet_name": None, "excel_result": text } except Exception as e: print(f">>> parse_excel_tool: Error reading Excel file {local_xlsx}: {e}") # Fall back to scanning for Markdown below # Fallback: scan any HumanMessage for a Markdown‐style table messages = state.get("messages", []) table_lines = [] collecting = False for msg in messages: if isinstance(msg, HumanMessage): for line in msg.content.splitlines(): if re.match(r"^\s*\|\s*[-A-Za-z0-9]", line): collecting = True if collecting: if not re.match(r"^\s*\|", line): collecting = False break table_lines.append(line) if table_lines: break if not table_lines: return { "excel_path": None, "excel_sheet_name": None, "excel_result": "Error: No Excel file found and no Markdown table detected in prompt." } clean_rows = [row for row in table_lines if not re.match(r"^\s*\|\s*-+", row)] table_block = "\n".join(clean_rows).strip() print(f"Parsed excel as excel_result: {table_block}") return { "excel_path": None, "excel_sheet_name": None, "excel_result": table_block } import os import os import openai from old.old2state import AgentState def audio_transcriber_tool(state: AgentState) -> AgentState: """ LangGraph tool for transcribing audio via OpenAI's Whisper API. Expects: state["audio_path"] to be either: • A local file path (e.g. "./hf_files/abc.mp3"), OR • A Task ID (e.g. "abc123"), in which case we try downloading GET {DEFAULT_API_URL}/files/{task_id} with .mp3, .wav, .m4a extensions. Returns: { "audio_path": None, "transcript": "" } Always attempts to download the file for the given path or task ID. """ print("reached audio_transcriber_tool") path_or_id = state.get("audio_path", "") if not path_or_id: return {} # Always attempt to download the file, regardless of local existence local_audio = "" for ext in ("mp3", "wav", "m4a"): candidate = _download_file_for_task(state.get("task_id"), ext) if candidate: local_audio = candidate break if not local_audio or not os.path.exists(local_audio): return { "audio_path": None, "transcript": "Error: No audio file found (download failed)." } # Send to OpenAI Whisper try: openai.api_key = os.getenv("OPENAI_API_KEY") if not openai.api_key: raise RuntimeError("OPENAI_API_KEY is not set in environment.") with open(local_audio, "rb") as audio_file: print("reached openai.audio.transcriptions.create") response = openai.audio.transcriptions.create( model="whisper-1", file=audio_file, ) print("reached response") text = response.text.strip() except Exception as e: text = f"Error during transcription: {e}" print(f"Transcripted as transcript: {text}") return { "audio_path": None, "transcript": text } # tools.py import re import requests from old.old2state import AgentState def wikipedia_search_tool(state: AgentState) -> AgentState: """ LangGraph wrapper for searching Wikipedia. Expects: state["wiki_query"] to be a non‐empty string. Returns: { "wiki_query": None, "wiki_result": "" } If no valid wiki_query is provided, returns {}. """ print("reached wikipedia search tool") query = state.get("wiki_query", "").strip() if not query: return {} try: # 1) Use the MediaWiki API to search for page titles matching the query search_params = { "action": "query", "list": "search", "srsearch": query, "format": "json", "utf8": 1 } search_resp = requests.get("https://en.wikipedia.org/w/api.php", params=search_params, timeout=10) search_resp.raise_for_status() search_data = search_resp.json() search_results = search_data.get("query", {}).get("search", []) # print("wikipedia: search_results",search_results) if not search_results: return {"wiki_query": None, "wiki_result": f"No Wikipedia page found for '{query}'."} # 2) Take the first search result's title first_title = search_results[0].get("title", "") if not first_title: return {"wiki_query": None, "wiki_result": "Unexpected format from Wikipedia search."} # 3) Fetch the page summary for that title via the REST summary endpoint title_for_url = requests.utils.requote_uri(first_title) summary_url = f"https://en.wikipedia.org/api/rest_v1/page/summary/{title_for_url}" summary_resp = requests.get(summary_url, timeout=10) summary_resp.raise_for_status() summary_data = summary_resp.json() # 4) Extract either the "extract" field or a fallback message summary_text = summary_data.get("extract") if not summary_text: summary_text = summary_data.get("description", "No summary available.") return { "wiki_query": None, "wiki_result": f"Title: {first_title}\n\n{summary_text}" } except requests.exceptions.RequestException as e: return {"wiki_query": None, "wiki_result": f"Wikipedia search error: {e}"} except Exception as e: return {"wiki_query": None, "wiki_result": f"Unexpected error in wikipedia_search_tool: {e}"} def run_tools(state: AgentState, tool_out: AgentState) -> AgentState: """ Merges whatever partial state the tool wrapper returned (tool_out) into the main state. That is, combine previous keys with new keys: new_state = { **state, **tool_out }. This node should be wired as its own graph node, not as a transition function. """ new_state = {**state, **tool_out} return new_state