File size: 4,714 Bytes
73bb16b
52d1305
73bb16b
52d1305
 
 
73bb16b
 
 
 
 
 
 
 
 
 
52d1305
73bb16b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
52d1305
73bb16b
 
 
40ad9f8
73bb16b
52d1305
73bb16b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
52d1305
73bb16b
 
 
40ad9f8
73bb16b
52d1305
73bb16b
 
 
 
 
 
 
 
 
52d1305
73bb16b
 
 
40ad9f8
73bb16b
52d1305
73bb16b
 
 
 
 
 
 
 
 
 
52d1305
73bb16b
 
 
40ad9f8
73bb16b
52d1305
73bb16b
 
 
 
 
 
52d1305
73bb16b
 
 
40ad9f8
73bb16b
52d1305
73bb16b
 
 
 
 
 
 
52d1305
 
 
73bb16b
 
 
 
 
 
365b711
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# Custom tools for OpenAI Agents
from __future__ import annotations

import contextlib
import io
import os
from typing import Any, List, Union

from openai_agents import function_tool # Using openai_agents
import pandas as pd
import openai
from PIL import Image
import pytesseract
from duckduckgo_search import DDGS
from urllib.parse import urlparse, parse_qs # For youtube_transcript
from youtube_transcript_api import YouTubeTranscriptApi # For youtube_transcript, corrected import

# ---- 1. PythonRunTool -> python_run function ----------------------------------
@function_tool
def python_run(code: str) -> str:
    """
    Execute trusted Python code and return printed output + repr() of the last expression (or _result variable).

    Args:
        code (str): Python code to execute.
    """
    buf, ns = io.StringIO(), {}
    last = None
    try:
        with contextlib.redirect_stdout(buf):
            exec(compile(code, "<agent-python>", "exec"), {}, ns)
        last = ns.get("_result", None)
    except Exception as e:
        raise RuntimeError(f"PythonRunTool error: {e}") from e
    out = buf.getvalue()
    # Always return a string
    result = (out + (repr(last) if last is not None else "")).strip()
    return str(result)

# ---- 2. ExcelLoaderTool -> load_spreadsheet function --------------------------
@function_tool
def load_spreadsheet(path: str, sheet: Union[str, int, None] = None) -> str:
    """
    Read .xlsx/.xls/.csv from disk and return rows as a list of dictionaries with string keys.

    Args:
        path (str): Path to .csv/.xls/.xlsx file.
        sheet (Union[str, int, None], optional): Sheet name or index (optional, required for Excel files only). Defaults to None.
    """
    if not os.path.isfile(path):
        raise FileNotFoundError(path)
    ext = os.path.splitext(path)[1].lower()
    if sheet == "": # Treat empty string as None for sheet name
        sheet = None
    if ext == ".csv":
        df = pd.read_csv(path)
    else:
        df = pd.read_excel(path, sheet_name=sheet)
    records = [{str(k): v for k, v in row.items()} for row in df.to_dict(orient="records")]
    # Always return a string
    return str(records)

# ---- 3. YouTubeTranscriptTool -> youtube_transcript function ------------------
@function_tool
def youtube_transcript(url: str, lang: str = "en") -> str:
    """
    Return the subtitles of a YouTube URL using youtube-transcript-api.

    Args:
        url (str): YouTube URL.
        lang (str, optional): Transcript language. Defaults to "en".
    """
    vid = parse_qs(urlparse(url).query).get("v", [None])[0] or url.split("/")[-1]
    # Corrected import: from youtube_transcript_api import YouTubeTranscriptApi
    data = YouTubeTranscriptApi.get_transcript(vid, languages=[lang, "en", "en-US", "en-GB"])
    text = " ".join(d["text"] for d in data).strip()
    return str(text)

# ---- 4. AudioTranscriptionTool -> transcribe_audio function -------------------
@function_tool
def transcribe_audio(path: str, model: str = "whisper-1") -> str:
    """
    Transcribe an audio file with OpenAI Whisper, returns plain text.

    Args:
        path (str): Path to audio file.
        model (str, optional): Model name for transcription. Defaults to "whisper-1".
    """
    if not os.path.isfile(path):
        raise FileNotFoundError(path)
    client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
    with open(path, "rb") as fp:
        transcript_data = client.audio.transcriptions.create(model=model, file=fp) # Renamed to transcript_data
    return str(transcript_data.text.strip())

# ---- 5. SimpleOCRTool -> image_ocr function ------------------------------------
@function_tool
def image_ocr(path: str) -> str:
    """
    Return any text spotted in an image via pytesseract OCR.

    Args:
        path (str): Path to image file.
    """
    if not os.path.isfile(path):
        raise FileNotFoundError(path)
    return str(pytesseract.image_to_string(Image.open(path)).strip())

# ---- 6. New DuckDuckGo Search Tool ---------------------------------------------
@function_tool
def duckduckgo_search(query: str) -> str:
    """
    Searches the web using DuckDuckGo and returns a summary of results.

    Args:
        query (str): The search query.
    """
    with DDGS() as ddgs:
        results = ddgs.text(query, max_results=5) # Get top 5 results
        summary = "\n".join([f"{r['title']}: {r['body']}" for r in results]) if results else "No results found."
    return summary

# ---------------------------------------------------------------------------
__all__ = [
    "python_run",
    "load_spreadsheet",
    "youtube_transcript",
    "transcribe_audio",
    "image_ocr",
    "duckduckgo_search",
]