File size: 4,993 Bytes
9623335
 
 
 
52d1305
73bb16b
52d1305
 
 
790cac2
 
9623335
 
 
 
790cac2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9623335
73bb16b
790cac2
73bb16b
9623335
 
73bb16b
 
9623335
73bb16b
9623335
 
73bb16b
 
 
 
9623335
73bb16b
9623335
 
73bb16b
9623335
 
52d1305
9623335
73bb16b
790cac2
9623335
 
52d1305
73bb16b
9623335
 
73bb16b
9623335
 
73bb16b
 
 
 
 
9623335
73bb16b
9623335
 
 
 
 
 
 
 
 
 
52d1305
9623335
73bb16b
790cac2
73bb16b
9623335
52d1305
73bb16b
9623335
 
73bb16b
9623335
 
 
73bb16b
9623335
 
 
 
52d1305
9623335
 
73bb16b
790cac2
73bb16b
9623335
52d1305
73bb16b
9623335
 
73bb16b
9623335
 
73bb16b
 
9623335
73bb16b
 
9623335
 
 
52d1305
9623335
73bb16b
790cac2
73bb16b
9623335
52d1305
73bb16b
9623335
73bb16b
9623335
 
 
73bb16b
 
9623335
 
52d1305
9623335
73bb16b
790cac2
9623335
 
52d1305
73bb16b
9623335
 
73bb16b
9623335
 
 
73bb16b
9623335
 
 
 
 
 
 
 
790cac2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
"""
Custom function tools for OpenAI Agents SDK GAIA agent.
"""

from __future__ import annotations

import contextlib
import io
import os
import time
import datetime
from typing import List, Dict

from agents import function_tool

def log(msg):
    print(f"[{datetime.datetime.now():%Y-%m-%d %H:%M:%S}] {msg}")

def log_tool_call(func):
    def wrapper(*args, **kwargs):
        t0 = time.time()
        log(f"Step: {func.__name__} started.")
        try:
            result = func(*args, **kwargs)
            log(f"Step: {func.__name__} completed in {time.time() - t0:.2f}s.")
            return result
        except Exception as e:
            log(f"Step: {func.__name__} error: {e}")
            raise
    return wrapper

# 1. --------------------------------------------------------------------
@function_tool
@log_tool_call
def python_run(code: str) -> str:
    """Execute trusted Python code and return the captured stdout together with
    the repr() of the last expression (or `_result` variable if set).

    Args:
        code: Python code to execute.
    """
    buf = io.StringIO()
    ns: dict = {}
    last = None
    try:
        with contextlib.redirect_stdout(buf):
            exec(compile(code, "<agent-python>", "exec"), {}, ns)
        last = ns.get("_result")
    except Exception as e:
        raise RuntimeError(f"python_run error: {e}") from e

    out = buf.getvalue()
    return (out + (repr(last) if last is not None else "")).strip()


# 2. --------------------------------------------------------------------
@function_tool
@log_tool_call
def load_spreadsheet(path: str, sheet: str | int | None = None) -> list[Dict[str, str]]:
    """Read .csv, .xls or .xlsx from disk and return rows as list of dictionaries.

    Args:
        path: Path to spreadsheet file.
        sheet: Sheet name or index (for Excel files only).
    """
    import pandas as pd

    if not os.path.isfile(path):
        raise FileNotFoundError(path)
    ext = os.path.splitext(path)[1].lower()
    if ext == ".csv":
        df = pd.read_csv(path)
        dfs = [df]
    else:
        sheets = pd.read_excel(path, sheet_name=sheet if sheet not in ("", None) else None)
        if isinstance(sheets, dict):
            dfs = sheets.values()
        else:
            dfs = [sheets]
    results = []
    for df in dfs:
        results.extend([{str(k): v for k, v in row.items()} for row in df.to_dict(orient="records")])
    return results


# 3. --------------------------------------------------------------------
@function_tool
@log_tool_call
def youtube_transcript(url: str, lang: str = "en") -> str:
    """Fetch the subtitles of a YouTube video.

    Args:
        url: YouTube video URL.
        lang: Preferred transcript language code (default "en").
    """
    from urllib.parse import urlparse, parse_qs
    from youtube_transcript_api._api import YouTubeTranscriptApi

    vid = parse_qs(urlparse(url).query).get("v", [None])[0] or url.split("/")[-1]
    data = YouTubeTranscriptApi.get_transcript(
        vid, languages=[lang, "en", "en-US", "en-GB"]
    )
    return " ".join(chunk["text"] for chunk in data).strip()


# 4. --------------------------------------------------------------------
@function_tool
@log_tool_call
def transcribe_audio(path: str, model: str = "whisper-1") -> str:
    """Transcribe an audio file using OpenAI Whisper.

    Args:
        path: Path to audio file (wav / mp3 / m4a / etc.).
        model: Whisper model name (default "whisper-1").
    """
    import openai

    if not os.path.isfile(path):
        raise FileNotFoundError(path)

    client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
    with open(path, "rb") as fp:
        transcript = client.audio.transcriptions.create(model=model, file=fp)
    return transcript.text.strip()


# 5. --------------------------------------------------------------------
@function_tool
@log_tool_call
def image_ocr(path: str) -> str:
    """Perform OCR on an image using Tesseract.

    Args:
        path: Path to image file.
    """
    from PIL import Image
    import pytesseract

    if not os.path.isfile(path):
        raise FileNotFoundError(path)
    return pytesseract.image_to_string(Image.open(path)).strip()


# 6. --------------------------------------------------------------------
@function_tool
@log_tool_call
def duckduckgo_search(query: str, max_results: int = 5) -> List[Dict[str, str]]:
    """Search DuckDuckGo and return a list of result dicts with title, href and body.

    Args:
        query: The search query.
        max_results: Maximum results to return (default 5).
    """
    from duckduckgo_search import DDGS

    results = []
    with DDGS() as ddgs:
        for r in ddgs.text(query, max_results=max_results):
            results.append(
                {
                    "title": r.get("title", ""),
                    "href": r.get("href", ""),
                    "body": r.get("body", ""),
                }
            )
    return results