Spaces:
Sleeping
Sleeping
File size: 4,993 Bytes
9623335 52d1305 73bb16b 52d1305 790cac2 9623335 790cac2 9623335 73bb16b 790cac2 73bb16b 9623335 73bb16b 9623335 73bb16b 9623335 73bb16b 9623335 73bb16b 9623335 73bb16b 9623335 52d1305 9623335 73bb16b 790cac2 9623335 52d1305 73bb16b 9623335 73bb16b 9623335 73bb16b 9623335 73bb16b 9623335 52d1305 9623335 73bb16b 790cac2 73bb16b 9623335 52d1305 73bb16b 9623335 73bb16b 9623335 73bb16b 9623335 52d1305 9623335 73bb16b 790cac2 73bb16b 9623335 52d1305 73bb16b 9623335 73bb16b 9623335 73bb16b 9623335 73bb16b 9623335 52d1305 9623335 73bb16b 790cac2 73bb16b 9623335 52d1305 73bb16b 9623335 73bb16b 9623335 73bb16b 9623335 52d1305 9623335 73bb16b 790cac2 9623335 52d1305 73bb16b 9623335 73bb16b 9623335 73bb16b 9623335 790cac2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
"""
Custom function tools for OpenAI Agents SDK GAIA agent.
"""
from __future__ import annotations
import contextlib
import io
import os
import time
import datetime
from typing import List, Dict
from agents import function_tool
def log(msg):
print(f"[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] {msg}")
def log_tool_call(func):
def wrapper(*args, **kwargs):
t0 = time.time()
log(f"Step: {func.__name__} started.")
try:
result = func(*args, **kwargs)
log(f"Step: {func.__name__} completed in {time.time() - t0:.2f}s.")
return result
except Exception as e:
log(f"Step: {func.__name__} error: {e}")
raise
return wrapper
# 1. --------------------------------------------------------------------
@function_tool
@log_tool_call
def python_run(code: str) -> str:
"""Execute trusted Python code and return the captured stdout together with
the repr() of the last expression (or `_result` variable if set).
Args:
code: Python code to execute.
"""
buf = io.StringIO()
ns: dict = {}
last = None
try:
with contextlib.redirect_stdout(buf):
exec(compile(code, "<agent-python>", "exec"), {}, ns)
last = ns.get("_result")
except Exception as e:
raise RuntimeError(f"python_run error: {e}") from e
out = buf.getvalue()
return (out + (repr(last) if last is not None else "")).strip()
# 2. --------------------------------------------------------------------
@function_tool
@log_tool_call
def load_spreadsheet(path: str, sheet: str | int | None = None) -> list[Dict[str, str]]:
"""Read .csv, .xls or .xlsx from disk and return rows as list of dictionaries.
Args:
path: Path to spreadsheet file.
sheet: Sheet name or index (for Excel files only).
"""
import pandas as pd
if not os.path.isfile(path):
raise FileNotFoundError(path)
ext = os.path.splitext(path)[1].lower()
if ext == ".csv":
df = pd.read_csv(path)
dfs = [df]
else:
sheets = pd.read_excel(path, sheet_name=sheet if sheet not in ("", None) else None)
if isinstance(sheets, dict):
dfs = sheets.values()
else:
dfs = [sheets]
results = []
for df in dfs:
results.extend([{str(k): v for k, v in row.items()} for row in df.to_dict(orient="records")])
return results
# 3. --------------------------------------------------------------------
@function_tool
@log_tool_call
def youtube_transcript(url: str, lang: str = "en") -> str:
"""Fetch the subtitles of a YouTube video.
Args:
url: YouTube video URL.
lang: Preferred transcript language code (default "en").
"""
from urllib.parse import urlparse, parse_qs
from youtube_transcript_api._api import YouTubeTranscriptApi
vid = parse_qs(urlparse(url).query).get("v", [None])[0] or url.split("/")[-1]
data = YouTubeTranscriptApi.get_transcript(
vid, languages=[lang, "en", "en-US", "en-GB"]
)
return " ".join(chunk["text"] for chunk in data).strip()
# 4. --------------------------------------------------------------------
@function_tool
@log_tool_call
def transcribe_audio(path: str, model: str = "whisper-1") -> str:
"""Transcribe an audio file using OpenAI Whisper.
Args:
path: Path to audio file (wav / mp3 / m4a / etc.).
model: Whisper model name (default "whisper-1").
"""
import openai
if not os.path.isfile(path):
raise FileNotFoundError(path)
client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
with open(path, "rb") as fp:
transcript = client.audio.transcriptions.create(model=model, file=fp)
return transcript.text.strip()
# 5. --------------------------------------------------------------------
@function_tool
@log_tool_call
def image_ocr(path: str) -> str:
"""Perform OCR on an image using Tesseract.
Args:
path: Path to image file.
"""
from PIL import Image
import pytesseract
if not os.path.isfile(path):
raise FileNotFoundError(path)
return pytesseract.image_to_string(Image.open(path)).strip()
# 6. --------------------------------------------------------------------
@function_tool
@log_tool_call
def duckduckgo_search(query: str, max_results: int = 5) -> List[Dict[str, str]]:
"""Search DuckDuckGo and return a list of result dicts with title, href and body.
Args:
query: The search query.
max_results: Maximum results to return (default 5).
"""
from duckduckgo_search import DDGS
results = []
with DDGS() as ddgs:
for r in ddgs.text(query, max_results=max_results):
results.append(
{
"title": r.get("title", ""),
"href": r.get("href", ""),
"body": r.get("body", ""),
}
)
return results
|