File size: 5,845 Bytes
c4b829b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d5ccf60
c4b829b
d5ccf60
 
 
 
 
 
 
c4b829b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d5ccf60
c4b829b
 
d5ccf60
 
 
 
 
c4b829b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d5ccf60
c4b829b
 
d5ccf60
c4b829b
d5ccf60
 
 
c4b829b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d5ccf60
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
import base64
import os
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain.tools import Tool
from langchain_core.tools import tool

api_key = os.getenv("GEMINI_API_KEY")

# Create LLM class
vision_llm = ChatGoogleGenerativeAI(
    model= "gemini-2.5-flash-preview-05-20",
    temperature=0,
    max_retries=2,
    google_api_key=api_key
)

@tool("extract_text_tool", parse_docstring=True)
def extract_text(img_path: str) -> str:
    """Extract text from an image file using a multimodal model.

    Args:
        img_path (str): The path to the image file from which to extract text.
    
    Returns:
        str: The extracted text from the image, or an empty string if an error occurs.
    """
    all_text = ""
    try:
        # Read image and encode as base64
        with open(img_path, "rb") as image_file:
            image_bytes = image_file.read()

        image_base64 = base64.b64encode(image_bytes).decode("utf-8")

        # Prepare the prompt including the base64 image data
        message = [
            HumanMessage(
                content=[
                    {
                        "type": "text",
                        "text": (
                            "Extract all the text from this image. "
                            "Return only the extracted text, no explanations."
                        ),
                    },
                    {
                        "type": "image_url",
                        "image_url": {
                            "url": f"data:image/png;base64,{image_base64}"
                        },
                    },
                ]
            )
        ]

        # Call the vision-capable model
        response = vision_llm.invoke(message)

        # Append extracted text
        all_text += response.content + "\n\n"

        return all_text.strip()
    except Exception as e:
        # A butler should handle errors gracefully
        error_msg = f"Error extracting text: {str(e)}"
        print(error_msg)
        return ""
    
@tool("analyze_image_tool", parse_docstring=True)
def analyze_image_tool(user_query: str, img_path: str) -> str:
    """Answer the question reasoning on the image.
    
    Args:
        user_query (str): The question to be answered based on the image.
        img_path (str): Path to the image file to be analyzed.

    Returns:
        str: The answer to the query based on image content, or an empty string if an error occurs.
    """
    all_text = ""
    try:
        # Read image and encode as base64
        with open(img_path, "rb") as image_file:
            image_bytes = image_file.read()

        image_base64 = base64.b64encode(image_bytes).decode("utf-8")

        # Prepare the prompt including the base64 image data
        message = [
            HumanMessage(
                content=[
                    {
                        "type": "text",
                        "text": (
                            f"User query: {user_query}"
                        ),
                    },
                    {
                        "type": "image_url",
                        "image_url": {
                            "url": f"data:image/png;base64,{image_base64}"
                        },
                    },
                ]
            )
        ]

        # Call the vision-capable model
        response = vision_llm.invoke(message)

        # Append extracted text
        all_text += response.content + "\n\n"

        return all_text.strip()
    except Exception as e:
        # A butler should handle errors gracefully
        error_msg = f"Error analyzing image: {str(e)}"
        print(error_msg)
        return ""
    
@tool("analyze_audio_tool", parse_docstring=True)
def analyze_audio_tool(user_query: str, audio_path: str) -> str:
    """Answer the question by reasoning on the provided audio file.
    
    Args:
        user_query (str): The question to be answered based on the audio content.
        audio_path (str): Path to the audio file (e.g., .mp3, .wav, .flac, .aac, .ogg).

    Returns:
        str: The answer to the query based on audio content, or an error message/empty string if an error occurs.
    """
    try:
        # Determine MIME type from file extension
        _filename, file_extension = os.path.splitext(audio_path)
        file_extension = file_extension.lower()

        supported_formats = {
            ".mp3": "audio/mp3", ".wav": "audio/wav", ".flac": "audio/flac",
            ".aac": "audio/aac", ".ogg": "audio/ogg"
        }

        if file_extension not in supported_formats:
            return (f"Error: Unsupported audio file format '{file_extension}'. "
                    f"Supported extensions: {', '.join(supported_formats.keys())}.")
        mime_type = supported_formats[file_extension]

        # Read audio file and encode as base64
        with open(audio_path, "rb") as audio_file:
            audio_bytes = audio_file.read()
        audio_base64 = base64.b64encode(audio_bytes).decode("utf-8")

        # Prepare the prompt including the base64 audio data
        message = [ 
        HumanMessage(
            content=[
                {
                    "type": "text",
                    "text": f"User query: {user_query}",
                },
                {
                    "type": "audio",
                    "source_type": "base64",
                    "mime_type": mime_type,
                    "data": audio_base64
                },
            ]
        )
        ]

        # Call the vision-capable model
        response = vision_llm.invoke(message)
        return response.content.strip()
    except Exception as e:
        error_msg = f"Error analyzing audio: {str(e)}"
        print(error_msg)
        return ""