File size: 4,378 Bytes
9623335
 
 
 
52d1305
73bb16b
52d1305
 
 
6e06cc8
9623335
 
 
 
73bb16b
 
9623335
 
73bb16b
 
9623335
73bb16b
9623335
 
73bb16b
 
 
 
9623335
73bb16b
9623335
 
73bb16b
9623335
 
52d1305
9623335
73bb16b
6e06cc8
9623335
52d1305
73bb16b
9623335
 
73bb16b
9623335
 
73bb16b
 
 
 
 
9623335
73bb16b
9623335
 
 
 
 
 
 
 
 
 
52d1305
9623335
73bb16b
 
9623335
52d1305
73bb16b
9623335
 
73bb16b
9623335
 
 
73bb16b
9623335
 
 
 
52d1305
9623335
 
73bb16b
 
9623335
52d1305
73bb16b
9623335
 
73bb16b
9623335
 
73bb16b
 
9623335
73bb16b
 
9623335
 
 
52d1305
9623335
73bb16b
 
9623335
52d1305
73bb16b
9623335
73bb16b
9623335
 
 
73bb16b
 
9623335
 
52d1305
9623335
73bb16b
6e06cc8
9623335
52d1305
73bb16b
9623335
 
73bb16b
9623335
 
 
73bb16b
9623335
 
 
 
 
 
 
 
6e06cc8
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
"""
Custom function tools for OpenAI Agents SDK GAIA agent.
"""

from __future__ import annotations

import contextlib
import io
import os
from typing import List, Dict

from agents import function_tool

# 1. --------------------------------------------------------------------
@function_tool
def python_run(code: str) -> str:
    """Execute trusted Python code and return the captured stdout together with
    the repr() of the last expression (or `_result` variable if set).

    Args:
        code: Python code to execute.
    """
    buf = io.StringIO()
    ns: dict = {}
    last = None
    try:
        with contextlib.redirect_stdout(buf):
            exec(compile(code, "<agent-python>", "exec"), {}, ns)
        last = ns.get("_result")
    except Exception as e:
        raise RuntimeError(f"python_run error: {e}") from e

    out = buf.getvalue()
    return (out + (repr(last) if last is not None else "")).strip()


# 2. --------------------------------------------------------------------
@function_tool
def load_spreadsheet(path: str, sheet: str | int | None = None) -> list[Dict[str, str]]:
    """Read .csv, .xls or .xlsx from disk and return rows as list of dictionaries.

    Args:
        path: Path to spreadsheet file.
        sheet: Sheet name or index (for Excel files only).
    """
    import pandas as pd

    if not os.path.isfile(path):
        raise FileNotFoundError(path)
    ext = os.path.splitext(path)[1].lower()
    if ext == ".csv":
        df = pd.read_csv(path)
        dfs = [df]
    else:
        sheets = pd.read_excel(path, sheet_name=sheet if sheet not in ("", None) else None)
        if isinstance(sheets, dict):
            dfs = sheets.values()
        else:
            dfs = [sheets]
    results = []
    for df in dfs:
        results.extend([{str(k): v for k, v in row.items()} for row in df.to_dict(orient="records")])
    return results


# 3. --------------------------------------------------------------------
@function_tool
def youtube_transcript(url: str, lang: str = "en") -> str:
    """Fetch the subtitles of a YouTube video.

    Args:
        url: YouTube video URL.
        lang: Preferred transcript language code (default "en").
    """
    from urllib.parse import urlparse, parse_qs
    from youtube_transcript_api._api import YouTubeTranscriptApi

    vid = parse_qs(urlparse(url).query).get("v", [None])[0] or url.split("/")[-1]
    data = YouTubeTranscriptApi.get_transcript(
        vid, languages=[lang, "en", "en-US", "en-GB"]
    )
    return " ".join(chunk["text"] for chunk in data).strip()


# 4. --------------------------------------------------------------------
@function_tool
def transcribe_audio(path: str, model: str = "whisper-1") -> str:
    """Transcribe an audio file using OpenAI Whisper.

    Args:
        path: Path to audio file (wav / mp3 / m4a / etc.).
        model: Whisper model name (default "whisper-1").
    """
    import openai

    if not os.path.isfile(path):
        raise FileNotFoundError(path)

    client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
    with open(path, "rb") as fp:
        transcript = client.audio.transcriptions.create(model=model, file=fp)
    return transcript.text.strip()


# 5. --------------------------------------------------------------------
@function_tool
def image_ocr(path: str) -> str:
    """Perform OCR on an image using Tesseract.

    Args:
        path: Path to image file.
    """
    from PIL import Image
    import pytesseract

    if not os.path.isfile(path):
        raise FileNotFoundError(path)
    return pytesseract.image_to_string(Image.open(path)).strip()


# 6. --------------------------------------------------------------------
@function_tool
def duckduckgo_search(query: str, max_results: int = 5) -> List[Dict[str, str]]:
    """Search DuckDuckGo and return a list of result dicts with title, href and body.

    Args:
        query: The search query.
        max_results: Maximum results to return (default 5).
    """
    from duckduckgo_search import DDGS

    results = []
    with DDGS() as ddgs:
        for r in ddgs.text(query, max_results=max_results):
            results.append(
                {
                    "title": r.get("title", ""),
                    "href": r.get("href", ""),
                    "body": r.get("body", ""),
                }
            )
    return results