Spaces:
Sleeping
Sleeping
# Custom tools for OpenAI Agents | |
from __future__ import annotations | |
import contextlib | |
import io | |
import os | |
from typing import Any, List, Union | |
from openai_agents import function_tool # Using openai_agents | |
import pandas as pd | |
import openai | |
from PIL import Image | |
import pytesseract | |
from duckduckgo_search import DDGS | |
from urllib.parse import urlparse, parse_qs # For youtube_transcript | |
from youtube_transcript_api import YouTubeTranscriptApi # For youtube_transcript, corrected import | |
# ---- 1. PythonRunTool -> python_run function ---------------------------------- | |
def python_run(code: str) -> str: | |
""" | |
Execute trusted Python code and return printed output + repr() of the last expression (or _result variable). | |
Args: | |
code (str): Python code to execute. | |
""" | |
buf, ns = io.StringIO(), {} | |
last = None | |
try: | |
with contextlib.redirect_stdout(buf): | |
exec(compile(code, "<agent-python>", "exec"), {}, ns) | |
last = ns.get("_result", None) | |
except Exception as e: | |
raise RuntimeError(f"PythonRunTool error: {e}") from e | |
out = buf.getvalue() | |
# Always return a string | |
result = (out + (repr(last) if last is not None else "")).strip() | |
return str(result) | |
# ---- 2. ExcelLoaderTool -> load_spreadsheet function -------------------------- | |
def load_spreadsheet(path: str, sheet: Union[str, int, None] = None) -> str: | |
""" | |
Read .xlsx/.xls/.csv from disk and return rows as a list of dictionaries with string keys. | |
Args: | |
path (str): Path to .csv/.xls/.xlsx file. | |
sheet (Union[str, int, None], optional): Sheet name or index (optional, required for Excel files only). Defaults to None. | |
""" | |
if not os.path.isfile(path): | |
raise FileNotFoundError(path) | |
ext = os.path.splitext(path)[1].lower() | |
if sheet == "": # Treat empty string as None for sheet name | |
sheet = None | |
if ext == ".csv": | |
df = pd.read_csv(path) | |
else: | |
df = pd.read_excel(path, sheet_name=sheet) | |
records = [{str(k): v for k, v in row.items()} for row in df.to_dict(orient="records")] | |
# Always return a string | |
return str(records) | |
# ---- 3. YouTubeTranscriptTool -> youtube_transcript function ------------------ | |
def youtube_transcript(url: str, lang: str = "en") -> str: | |
""" | |
Return the subtitles of a YouTube URL using youtube-transcript-api. | |
Args: | |
url (str): YouTube URL. | |
lang (str, optional): Transcript language. Defaults to "en". | |
""" | |
vid = parse_qs(urlparse(url).query).get("v", [None])[0] or url.split("/")[-1] | |
# Corrected import: from youtube_transcript_api import YouTubeTranscriptApi | |
data = YouTubeTranscriptApi.get_transcript(vid, languages=[lang, "en", "en-US", "en-GB"]) | |
text = " ".join(d["text"] for d in data).strip() | |
return str(text) | |
# ---- 4. AudioTranscriptionTool -> transcribe_audio function ------------------- | |
def transcribe_audio(path: str, model: str = "whisper-1") -> str: | |
""" | |
Transcribe an audio file with OpenAI Whisper, returns plain text. | |
Args: | |
path (str): Path to audio file. | |
model (str, optional): Model name for transcription. Defaults to "whisper-1". | |
""" | |
if not os.path.isfile(path): | |
raise FileNotFoundError(path) | |
client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
with open(path, "rb") as fp: | |
transcript_data = client.audio.transcriptions.create(model=model, file=fp) # Renamed to transcript_data | |
return str(transcript_data.text.strip()) | |
# ---- 5. SimpleOCRTool -> image_ocr function ------------------------------------ | |
def image_ocr(path: str) -> str: | |
""" | |
Return any text spotted in an image via pytesseract OCR. | |
Args: | |
path (str): Path to image file. | |
""" | |
if not os.path.isfile(path): | |
raise FileNotFoundError(path) | |
return str(pytesseract.image_to_string(Image.open(path)).strip()) | |
# ---- 6. New DuckDuckGo Search Tool --------------------------------------------- | |
def duckduckgo_search(query: str) -> str: | |
""" | |
Searches the web using DuckDuckGo and returns a summary of results. | |
Args: | |
query (str): The search query. | |
""" | |
with DDGS() as ddgs: | |
results = ddgs.text(query, max_results=5) # Get top 5 results | |
summary = "\n".join([f"{r['title']}: {r['body']}" for r in results]) if results else "No results found." | |
return summary | |
# --------------------------------------------------------------------------- | |
__all__ = [ | |
"python_run", | |
"load_spreadsheet", | |
"youtube_transcript", | |
"transcribe_audio", | |
"image_ocr", | |
"duckduckgo_search", | |
] | |