# tools.py import pandas as pd from pathlib import Path import requests import regex as re import time import os from duckduckgo_search import DDGS from langchain_core.tools import tool from langchain_community.document_loaders import WikipediaLoader, ArxivLoader DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" def _download_file_for_task(task_id: str, ext: str) -> str: """ Helper: attempt to GET the remote file for a given task_id. Saves under ./hf_files/{task_id}.{ext}. Returns the local path if successful, or an empty string if no file / download failed. """ print("reached _download_file_for_task") os.makedirs("hf_files", exist_ok=True) local_path = os.path.join("hf_files", f"{task_id}.{ext}") url = f"{DEFAULT_API_URL}/files/{task_id}" try: resp = requests.get(url, timeout=10) if resp.status_code == 200 and resp.content: print(f"Downloaded file from {url} to {local_path}") with open(local_path, "wb") as f: f.write(resp.content) return local_path except Exception: print(f"Error downloading file from {url} to {local_path}") pass # If we get here, either 404 or download error return "" @tool def image_tool(task_id: str) -> str: """ Expects: task_id is a string Returns: "OCR text + brief caption or an error message" """ print(f"DEBUG: image_tool called with task_id: {task_id}") local_img = None # Initialize the variable # Try to download image file with different extensions for ext in ("png", "jpg", "jpeg"): print(f"DEBUG: Trying to download {task_id}.{ext}") candidate = _download_file_for_task(task_id, ext) if candidate: local_img = candidate print(f"DEBUG: Successfully downloaded image: {local_img}") break else: print(f"DEBUG: Failed to download {task_id}.{ext}") if not local_img or not os.path.exists(local_img): error_msg = f"Error: No image file found for task_id {task_id} (tried png, jpg, jpeg extensions)" print(f"DEBUG: {error_msg}") return error_msg # 2) Read raw bytes try: print(f"DEBUG: Reading image file: {local_img}") with open(local_img, "rb") as f: image_bytes = f.read() print(f"DEBUG: Successfully read {len(image_bytes)} bytes from image") except Exception as e: error_msg = f"Error reading image file: {e}" print(f"DEBUG: {error_msg}") return error_msg # 3) Prepare HF Inference headers hf_token = os.getenv("HF_TOKEN") if not hf_token: error_msg = "Error: HF_TOKEN not set in environment." print(f"DEBUG: {error_msg}") return error_msg headers = {"Authorization": f"Bearer {hf_token}"} print("DEBUG: HF token found, proceeding with API calls") # Try different HF models for image analysis models_to_try = [ "nlpconnect/vit-gpt2-image-captioning", "Salesforce/blip-image-captioning-large", "microsoft/git-base-coco", "microsoft/git-large-coco" ] result_text = "" success = False for model_name in models_to_try: try: print(f"DEBUG: Trying model: {model_name}") resp = requests.post( f"https://api-inference.huggingface.co/models/{model_name}", headers=headers, files={"file": image_bytes}, timeout=30 ) print(f"DEBUG: {model_name} response status: {resp.status_code}") if resp.status_code == 200: resp_json = resp.json() print(f"DEBUG: {model_name} response: {resp_json}") # Handle different response formats if isinstance(resp_json, list) and len(resp_json) > 0: result_text = resp_json[0].get("generated_text", "").strip() elif isinstance(resp_json, dict): result_text = resp_json.get("generated_text", "").strip() if result_text: print(f"DEBUG: Successfully got result from {model_name}: {result_text}") success = True break else: print(f"DEBUG: {model_name} failed with status {resp.status_code}") except Exception as e: print(f"DEBUG: {model_name} failed with error: {e}") continue if not success or not result_text: result_text = "Unable to analyze image - all HuggingFace models failed or returned empty results" # Format the result final_result = f"Image Analysis Result:\n{result_text}" print(f"DEBUG: Final result: {final_result}") return final_result @tool def excel_tool(task_id: str) -> str: """ Downloads .xlsx (if any) and returns a stringified list of records from the specified sheet. No fallback to user-supplied tables. Expected keys in `task_id`: • task_id – required (used to download the file) returns: stringified list of records from the specified sheet """ print("reached excel_tool") sheet = "Sheet1" local_xlsx = _download_file_for_task(task_id, "xlsx") if not local_xlsx or not os.path.exists(local_xlsx): return "Error: Excel file not found for this task." try: xls = pd.ExcelFile(local_xlsx) df = pd.read_excel( xls, sheet_name=sheet if sheet and sheet in xls.sheet_names else xls.sheet_names[0] ) print(f"Excel file read successfully: {str(df.to_dict(orient='records'))}") return str(df.to_dict(orient="records")) except Exception as e: return f"Error reading Excel file: {e}" import openai @tool def audio_transcriber_tool(task_id: str) -> str: """ LangGraph tool for transcribing audio via OpenAI's Whisper API. Expects: task_id is a string Returns: "" Always attempts to download the file for the given path or task ID. """ print("reached audio_transcriber_tool") # Always attempt to download the file, regardless of local existence local_audio = "" for ext in ("mp3", "wav", "m4a"): candidate = _download_file_for_task(task_id, ext) if candidate: local_audio = candidate break if not local_audio or not os.path.exists(local_audio): return "Error: No audio file found (download failed)." # Send to OpenAI Whisper try: openai.api_key = os.getenv("OPENAI_API_KEY") if not openai.api_key: raise RuntimeError("OPENAI_API_KEY is not set in environment.") with open(local_audio, "rb") as audio_file: print("reached openai.audio.transcriptions.create") response = openai.audio.transcriptions.create( model="whisper-1", file=audio_file, ) print("reached response") text = response.text.strip() except Exception as e: text = f"Error during transcription: {e}" print(f"Transcripted as transcript: {text}") return text # tools.py import re import requests @tool def wikipedia_search_tool(wiki_query: str) -> str: """ Searches Wikipedia for the given query and returns the first 5 pages. Expects: wiki_query is a non‐empty string. Returns: text summary of first matching page or an error message>" If no valid wiki_query is provided, returns {}. """ print(f"DEBUG: reached wikipedia_search_tool with query: {wiki_query}") try: docs = WikipediaLoader(query=wiki_query, load_max_docs=2).load() print(f"DEBUG: WikipediaLoader returned {len(docs)} documents") result = "" counter = 1 for doc in docs: print(f"DEBUG: Processing Wikipedia document {counter}") print(f"DEBUG: Document metadata: {doc.metadata}") print(f"DEBUG: Document content length: {len(doc.page_content)}") # Handle different metadata structures title = "Unknown Title" if hasattr(doc, 'metadata') and doc.metadata: # Try different possible title keys if 'title' in doc.metadata: title = doc.metadata['title'] elif 'Title' in doc.metadata: title = doc.metadata['Title'] elif 'source' in doc.metadata: title = doc.metadata['source'] else: # Use first available key as title if doc.metadata: first_key = list(doc.metadata.keys())[0] title = f"Wikipedia: {doc.metadata[first_key]}" print(f"DEBUG: Using Wikipedia title: {title}") # Truncate content if too long content = doc.page_content[:2000] if len(doc.page_content) > 2000 else doc.page_content result += f"\n\nDocument{counter}: {title}\n{content}" counter += 1 if not result.strip(): return "No Wikipedia results found for the given query" print(f"DEBUG: Final Wikipedia result length: {len(result)}") return result except Exception as e: error_msg = f"Error during Wikipedia search: {str(e)}" print(f"DEBUG: {error_msg}") return error_msg @tool def arxiv_search_tool(arxiv_query: str) -> str: """ Searches Arxiv for the given query and returns the first 5 pages. Expects: arxiv_query is a non‐empty string. Returns: text summary of first matching page or an error message>" """ print(f"DEBUG: reached arxiv_search_tool with query: {arxiv_query}") try: docs = ArxivLoader(query=arxiv_query, load_max_docs=2).load() print(f"DEBUG: ArxivLoader returned {len(docs)} documents") result = "" counter = 1 for doc in docs: print(f"DEBUG: Processing document {counter}") print(f"DEBUG: Document metadata: {doc.metadata}") print(f"DEBUG: Document content length: {len(doc.page_content)}") # Handle different metadata structures title = "Unknown Title" if hasattr(doc, 'metadata') and doc.metadata: # Try different possible title keys if 'title' in doc.metadata: title = doc.metadata['title'] elif 'Title' in doc.metadata: title = doc.metadata['Title'] elif 'entry_id' in doc.metadata: title = doc.metadata['entry_id'] elif 'summary' in doc.metadata: title = f"ArXiv Paper {counter}" else: # Use first available key as title if doc.metadata: first_key = list(doc.metadata.keys())[0] title = f"{first_key}: {doc.metadata[first_key]}" print(f"DEBUG: Using title: {title}") # Truncate content if too long content = doc.page_content[:2000] if len(doc.page_content) > 2000 else doc.page_content result += f"\n\nDocument{counter}: {title}\n{content}" counter += 1 if not result.strip(): return "No ArXiv results found for the given query" print(f"DEBUG: Final ArXiv result length: {len(result)}") return result except Exception as e: error_msg = f"Error during Arxiv search: {str(e)}" print(f"DEBUG: {error_msg}") return error_msg from langchain_openai import ChatOpenAI from langchain.schema import SystemMessage, HumanMessage LLM = ChatOpenAI(model_name="gpt-4.1-mini", temperature=0.2) @tool def analyze_code_tool(task_id: str) -> str: """ Either task_id OR (file + task_id) Reads the code (max 400 lines / 10 kB) and asks the LLM for: • plain-language summary • list of key functions/classes • obvious bugs or style smells Returns that analysis as a string. """ print("reached analyze_code_tool") code_txt = "" if not task_id: code_txt = "No code provided." else: path = _download_file_for_task(task_id, "py") if not path: return "Error: .py file not found for this task." code_txt = Path(path).read_text(encoding="utf-8", errors="ignore") # else: # return "Error: neither snippet nor file provided." # Truncate for safety lines = code_txt.splitlines()[:400] code_sample = "\n".join(lines)[:10_000] prompt = [ SystemMessage(content="You are a senior Python code reviewer."), HumanMessage(content=( "Please analyse the following code. " "Summarise what it does, list key functions/classes, " "and point out any obvious bugs, performance issues or style problems.\n\n" f"```python\n{code_sample}\n```" "If you can then find the output of the code and return it in the output." )) ] return LLM.invoke(prompt).content.strip() # def web_search_tool(state: AgentState) -> AgentState: # """ # Expects: state["web_search_query"] is a non‐empty string. # Returns: {"web_search_query": None, "web_search_result": }. # Retries up to 5 times on either a DuckDuckGo "202 Ratelimit" response or any exception (e.g. timeout). # """ # print("reached web_search_tool") # query = state.get("web_search_query", "") # if not query: # return {} # nothing to do # ddg = DDGS() # max_retries = 5 # result_text = "" # for attempt in range(1, max_retries + 1): # try: # result_text = str(ddg.text(query, max_results=5)) # except Exception as e: # # Network error or timeout—retry up to max_retries # if attempt < max_retries: # print(f"web_search_tool: exception '{e}', retrying in 4 seconds ({attempt}/{max_retries})") # time.sleep(4) # continue # else: # # Final attempt failed # return { # "web_search_query": None, # "web_search_result": f"Error during DuckDuckGo search: {e}" # } # # Check for DuckDuckGo rate‐limit indicator # if "202 Ratelimit" in result_text: # if attempt < max_retries: # print(f"web_search_tool: received '202 Ratelimit', retrying in 4 seconds ({attempt}/{max_retries})") # time.sleep(4) # continue # else: # # Final attempt still rate‐limited # break # # Successful response (no exception and no rate‐limit text) # break # return { # "web_search_query": None, # "web_search_result": result_text # }