Spaces:
Sleeping
Sleeping
| # tools.py | |
| import pandas as pd | |
| from pathlib import Path | |
| import requests | |
| import regex as re | |
| import time | |
| import os | |
| from duckduckgo_search import DDGS | |
| from langchain_core.tools import tool | |
| from langchain_community.document_loaders import WikipediaLoader, ArxivLoader | |
| import arxiv | |
| import fitz # PyMuPDF | |
| import tempfile | |
| DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
| # Removed complex safety wrapper - keeping things simple | |
| def _download_file_for_task(task_id: str, ext: str) -> str: | |
| """ | |
| Helper: attempt to GET the remote file for a given task_id. | |
| Saves under ./hf_files/{task_id}.{ext}. Returns the local path if successful, | |
| or an empty string if no file / download failed. | |
| """ | |
| print("reached _download_file_for_task") | |
| os.makedirs("hf_files", exist_ok=True) | |
| local_path = os.path.join("hf_files", f"{task_id}.{ext}") | |
| url = f"{DEFAULT_API_URL}/files/{task_id}" | |
| try: | |
| resp = requests.get(url, timeout=10) | |
| if resp.status_code == 200 and resp.content: | |
| print(f"Downloaded file from {url} to {local_path}") | |
| with open(local_path, "wb") as f: | |
| f.write(resp.content) | |
| return local_path | |
| except Exception: | |
| print(f"Error downloading file from {url} to {local_path}") | |
| pass | |
| # If we get here, either 404 or download error | |
| return "" | |
| def image_tool(task_id: str) -> str: | |
| """ | |
| TOOL NAME: Image Analysis Tool | |
| Purpose: When the user asks about images, photos, or visual content, use this tool to get a description of the image. | |
| Input: A task_id string that identifies the specific image to analyze. | |
| Example usage: | |
| - "What is shown in this image?" | |
| - "Describe the contents of the picture" | |
| - "What objects are visible in the photo?" | |
| """ | |
| import requests, os | |
| # Try downloading image with one of the allowed extensions | |
| for ext in ("png", "jpg", "jpeg"): | |
| file_path = _download_file_for_task(task_id, ext) | |
| if file_path and os.path.exists(file_path): | |
| break | |
| else: | |
| return f"Error: Image file for task_id '{task_id}' not found." | |
| # Read the image bytes | |
| try: | |
| with open(file_path, "rb") as f: | |
| image_bytes = f.read() | |
| except Exception as e: | |
| return f"Error reading image: {str(e)}" | |
| # Load HF token | |
| hf_token = os.getenv("HF_TOKEN") | |
| if not hf_token: | |
| return "Error: HF_TOKEN not set in environment." | |
| # Use a single reliable model | |
| model = "Salesforce/blip-image-captioning-base" | |
| headers = {"Authorization": f"Bearer {hf_token}"} | |
| try: | |
| response = requests.post( | |
| f"https://api-inference.huggingface.co/models/{model}", | |
| headers=headers, | |
| files={"file": image_bytes}, | |
| timeout=30 | |
| ) | |
| except Exception as e: | |
| return f"Error calling HuggingFace API: {e}" | |
| # Parse response | |
| if response.status_code != 200: | |
| return f"Error from model ({model}): {response.status_code} - {response.text}" | |
| try: | |
| result = response.json() | |
| if isinstance(result, list) and result: | |
| caption = result[0].get("generated_text", "").strip() | |
| elif isinstance(result, dict): | |
| caption = result.get("generated_text", "").strip() | |
| else: | |
| caption = "" | |
| except Exception as e: | |
| return f"Error parsing response: {e}" | |
| if not caption: | |
| return "No caption generated by model." | |
| return f"Image Caption:\n{caption}" | |
| def excel_tool(task_id: str) -> str: | |
| """ | |
| TOOL NAME: Excel Data Analysis Tool | |
| Purpose: When the user asks about data in spreadsheets, tables, or Excel files, use this tool to read and analyze the data. | |
| Input: A task_id string that identifies the specific Excel file to analyze. | |
| Example usage: | |
| - "What data is in this spreadsheet?" | |
| - "Analyze the Excel file contents" | |
| - "Show me the data from the table" | |
| """ | |
| print("reached excel_tool") | |
| sheet = "Sheet1" | |
| local_xlsx = _download_file_for_task(task_id, "xlsx") | |
| if not local_xlsx or not os.path.exists(local_xlsx): | |
| return "Error: Excel file not found for this task." | |
| try: | |
| xls = pd.ExcelFile(local_xlsx) | |
| df = pd.read_excel( | |
| xls, | |
| sheet_name=sheet if sheet and sheet in xls.sheet_names else xls.sheet_names[0] | |
| ) | |
| print(f"Excel file read successfully: {str(df.to_dict(orient='records'))}") | |
| return str(df.to_dict(orient="records")) | |
| except Exception as e: | |
| return f"Error reading Excel file: {e}" | |
| import openai | |
| def audio_transcriber_tool(task_id: str) -> str: | |
| """ | |
| TOOL NAME: Audio Transcription Tool | |
| Purpose: When the user asks about audio files, speech, or wants to know what was said in an audio recording, use this tool. | |
| Input: A task_id string that identifies the specific audio file to transcribe. | |
| Example usage: | |
| - "What is said in this audio file?" | |
| - "Transcribe the speech from the recording" | |
| - "Convert the audio to text" | |
| """ | |
| print("reached audio_transcriber_tool") | |
| # Always attempt to download the file, regardless of local existence | |
| local_audio = "" | |
| for ext in ("mp3", "wav", "m4a"): | |
| candidate = _download_file_for_task(task_id, ext) | |
| if candidate: | |
| local_audio = candidate | |
| break | |
| if not local_audio or not os.path.exists(local_audio): | |
| return "Error: No audio file found (download failed)." | |
| # Send to OpenAI Whisper | |
| try: | |
| openai.api_key = os.getenv("OPENAI_API_KEY") | |
| if not openai.api_key: | |
| raise RuntimeError("OPENAI_API_KEY is not set in environment.") | |
| with open(local_audio, "rb") as audio_file: | |
| print("reached openai.audio.transcriptions.create") | |
| response = openai.audio.transcriptions.create( | |
| model="whisper-1", | |
| file=audio_file, | |
| ) | |
| print("reached response") | |
| text = response.text.strip() | |
| except Exception as e: | |
| text = f"Error during transcription: {e}" | |
| print(f"Transcripted as transcript: {text}") | |
| return text | |
| # tools.py | |
| import re | |
| import requests | |
| def wikipedia_search_tool(wiki_query: str) -> str: | |
| """ | |
| TOOL NAME: Wikipedia Search Tool | |
| Purpose: When the user asks for historical, biographical, scientific, or factual information, use this tool. | |
| Input: A string describing a topic to search on Wikipedia. | |
| Example usage: | |
| - "Who was Marie Curie?" | |
| - "Explain quantum entanglement" | |
| - "Tell me about the French Revolution" | |
| """ | |
| print(f"DEBUG: reached wikipedia_search_tool with query: {wiki_query}") | |
| try: | |
| docs = WikipediaLoader(query=wiki_query, load_max_docs=3).load() # Reduced from 5 to 3 | |
| print(f"DEBUG: WikipediaLoader returned {len(docs)} documents") | |
| result = "" | |
| counter = 1 | |
| for doc in docs: | |
| print(f"DEBUG: Processing Wikipedia document {counter}") | |
| print(f"DEBUG: Document metadata: {doc.metadata}") | |
| print(f"DEBUG: Document content length: {len(doc.page_content)}") | |
| # Handle different metadata structures | |
| title = "Unknown Title" | |
| if hasattr(doc, 'metadata') and doc.metadata: | |
| # Try different possible title keys | |
| if 'title' in doc.metadata: | |
| title = doc.metadata['title'] | |
| elif 'Title' in doc.metadata: | |
| title = doc.metadata['Title'] | |
| elif 'source' in doc.metadata: | |
| title = doc.metadata['source'] | |
| else: | |
| # Use first available key as title | |
| if doc.metadata: | |
| first_key = list(doc.metadata.keys())[0] | |
| title = f"Wikipedia: {doc.metadata[first_key]}" | |
| print(f"DEBUG: Using Wikipedia title: {title}") | |
| # Trim content to key information only (reduced from 2000 to 800 characters) | |
| content = doc.page_content[:800] if len(doc.page_content) > 800 else doc.page_content | |
| # Add document but keep it concise | |
| result += f"\n\nWikipedia Result {counter}: {title}\nSummary: {content}..." | |
| counter += 1 | |
| # Stop after 2 documents to keep response manageable | |
| if counter > 2: | |
| break | |
| if not result.strip(): | |
| return "No Wikipedia results found for the given query. [END_OF_SEARCH]" | |
| # Add clear end marker | |
| result += "\n\n[END_OF_SEARCH] - Wikipedia search complete. Use this information to answer the question." | |
| print(f"DEBUG: Final Wikipedia result length: {len(result)}") | |
| return result | |
| except Exception as e: | |
| error_msg = f"Error during Wikipedia search: {str(e)} [END_OF_SEARCH]" | |
| print(f"DEBUG: {error_msg}") | |
| return error_msg | |
| def arxiv_search_tool(query: str) -> str: | |
| """ | |
| TOOL NAME: ArXiv Academic Search Tool | |
| Purpose: When the user asks for academic research, scientific papers, or technical information, use this tool. | |
| Input: A string describing the academic topic to search for on ArXiv. | |
| Example usage: | |
| - "Find research papers about machine learning" | |
| - "What are recent studies on climate change?" | |
| - "Search for papers on quantum computing" | |
| """ | |
| try: | |
| # Search arXiv for the top result | |
| search = arxiv.Search(query=query, max_results=1, sort_by=arxiv.SortCriterion.Relevance) | |
| result = next(search.results(), None) | |
| if not result: | |
| return "No results found. [END_OF_SEARCH]" | |
| # Download PDF | |
| pdf_url = result.pdf_url | |
| response = requests.get(pdf_url) | |
| response.raise_for_status() | |
| # Save and open PDF | |
| with tempfile.NamedTemporaryFile(suffix=".pdf", delete=True) as tmp: | |
| tmp.write(response.content) | |
| tmp.flush() | |
| doc = fitz.open(tmp.name) | |
| text = "" | |
| for page in doc: | |
| text += page.get_text() | |
| # Clean and trim text | |
| text = " ".join(text.split()) | |
| summary = text[:3000] + "..." if len(text) > 3000 else text | |
| return f"Title: {result.title}\n\nSummary:\n{summary}\n\n[END_OF_SEARCH]" | |
| except Exception as e: | |
| return f"Error fetching arXiv content: {e} [END_OF_SEARCH]" | |
| from langchain_openai import ChatOpenAI | |
| from langchain.schema import SystemMessage, HumanMessage | |
| LLM = ChatOpenAI(model_name="gpt-4o-mini", temperature=0.2) | |
| def analyze_code_tool(task_id: str) -> str: | |
| """ | |
| TOOL NAME: Code Analysis Tool | |
| Purpose: When the user asks about code, programming files, or wants to understand what a script does, use this tool. | |
| Input: A task_id string that identifies the specific code file to analyze. | |
| Example usage: | |
| - "What does this Python code do?" | |
| - "Analyze the code file for bugs" | |
| - "Explain the functions in this script" | |
| """ | |
| print("reached analyze_code_tool") | |
| code_txt = "" | |
| if not task_id: | |
| code_txt = "No code provided." | |
| else: | |
| path = _download_file_for_task(task_id, "py") | |
| if not path: | |
| return "Error: .py file not found for this task." | |
| code_txt = Path(path).read_text(encoding="utf-8", errors="ignore") | |
| # else: | |
| # return "Error: neither snippet nor file provided." | |
| # Truncate for safety | |
| lines = code_txt.splitlines()[:400] | |
| code_sample = "\n".join(lines)[:10_000] | |
| prompt = [ | |
| SystemMessage(content="You are a senior Python code reviewer."), | |
| HumanMessage(content=( | |
| "Please analyse the following code. " | |
| "Summarise what it does, list key functions/classes, " | |
| "and point out any obvious bugs, performance issues or style problems.\n\n" | |
| f"```python\n{code_sample}\n```" | |
| "If you can then find the output of the code and return it in the output." | |
| )) | |
| ] | |
| return LLM.invoke(prompt).content.strip() | |
| # ─────────────────────────── Math Tools ─────────────────────────────── | |
| def add_tool(a: float, b: float) -> str: | |
| """ | |
| TOOL NAME: Addition Tool | |
| Purpose: When the user asks to add numbers or perform addition calculations, use this tool. | |
| Input: Two numbers (a and b) to add together. | |
| Example usage: | |
| - "What is 25 + 17?" | |
| - "Add 3.14 and 2.86" | |
| - "Calculate the sum of 100 and 250" | |
| """ | |
| result = a + b | |
| return f"Addition result: {a} + {b} = {result}" | |
| def subtract_tool(a: float, b: float) -> str: | |
| """ | |
| TOOL NAME: Subtraction Tool | |
| Purpose: When the user asks to subtract numbers or perform subtraction calculations, use this tool. | |
| Input: Two numbers (a and b) where b is subtracted from a. | |
| Example usage: | |
| - "What is 50 - 23?" | |
| - "Subtract 15.5 from 40.2" | |
| - "Calculate 1000 minus 347" | |
| """ | |
| result = a - b | |
| return f"Subtraction result: {a} - {b} = {result}" | |
| def multiply_tool(a: float, b: float) -> str: | |
| """ | |
| TOOL NAME: Multiplication Tool | |
| Purpose: When the user asks to multiply numbers or perform multiplication calculations, use this tool. | |
| Input: Two numbers (a and b) to multiply together. | |
| Example usage: | |
| - "What is 8 × 7?" | |
| - "Multiply 12.5 by 4" | |
| - "Calculate the product of 15 and 20" | |
| """ | |
| result = a * b | |
| return f"Multiplication result: {a} × {b} = {result}" | |
| def divide_tool(a: float, b: float) -> str: | |
| """ | |
| TOOL NAME: Division Tool | |
| Purpose: When the user asks to divide numbers or perform division calculations, use this tool. | |
| Input: Two numbers (a and b) where a is divided by b. | |
| Example usage: | |
| - "What is 100 ÷ 4?" | |
| - "Divide 75 by 3" | |
| - "Calculate 144 divided by 12" | |
| """ | |
| if b == 0: | |
| return "Division error: Cannot divide by zero" | |
| result = a / b | |
| return f"Division result: {a} ÷ {b} = {result}" | |
| # def web_search_tool(state: AgentState) -> AgentState: | |
| # """ | |
| # Expects: state["web_search_query"] is a non‐empty string. | |
| # Returns: {"web_search_query": None, "web_search_result": <string>}. | |
| # Retries up to 5 times on either a DuckDuckGo "202 Ratelimit" response or any exception (e.g. timeout). | |
| # """ | |
| # print("reached web_search_tool") | |
| # query = state.get("web_search_query", "") | |
| # if not query: | |
| # return {} # nothing to do | |
| # ddg = DDGS() | |
| # max_retries = 5 | |
| # result_text = "" | |
| # for attempt in range(1, max_retries + 1): | |
| # try: | |
| # result_text = str(ddg.text(query, max_results=5)) | |
| # except Exception as e: | |
| # # Network error or timeout—retry up to max_retries | |
| # if attempt < max_retries: | |
| # print(f"web_search_tool: exception '{e}', retrying in 4 seconds ({attempt}/{max_retries})") | |
| # time.sleep(4) | |
| # continue | |
| # else: | |
| # # Final attempt failed | |
| # return { | |
| # "web_search_query": None, | |
| # "web_search_result": f"Error during DuckDuckGo search: {e}" | |
| # } | |
| # # Check for DuckDuckGo rate‐limit indicator | |
| # if "202 Ratelimit" in result_text: | |
| # if attempt < max_retries: | |
| # print(f"web_search_tool: received '202 Ratelimit', retrying in 4 seconds ({attempt}/{max_retries})") | |
| # time.sleep(4) | |
| # continue | |
| # else: | |
| # # Final attempt still rate‐limited | |
| # break | |
| # # Successful response (no exception and no rate‐limit text) | |
| # break | |
| # return { | |
| # "web_search_query": None, | |
| # "web_search_result": result_text | |
| # } |