Spaces:
Sleeping
Sleeping
File size: 6,356 Bytes
1853d1e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 |
import base64
import json
import inspect
import time
from typing import Callable
from datetime import datetime, timezone
from langchain.tools import tool
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.messages import HumanMessage
from langchain_google_genai.chat_models import ChatGoogleGenerativeAIError
from langchain_tavily import TavilySearch, TavilyExtract
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_community.utilities.wikipedia import WikipediaAPIWrapper
from langchain_community.tools.wikipedia.tool import WikipediaQueryRun
from youtube_transcript_api import YouTubeTranscriptApi
from basic_agent import print_conversation
from dotenv import load_dotenv
from langchain.globals import set_debug
from urllib.parse import urlparse, parse_qs
set_debug(False)
CUSTOM_DEBUG = True
load_dotenv()
def encode_image_to_base64(path):
with open(path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode("utf-8")
def print_tool_call(tool: Callable, tool_name: str, args: dict):
"""Prints the tool call for debugging purposes."""
sig = inspect.signature(tool)
print_conversation(
messages=[
{
'role': 'Tool-Call',
'content': f"Calling `{tool_name}`{sig}"
},
{
'role': 'Tool-Args',
'content': args
}
],
)
def print_tool_response(response: str):
"""Prints the tool response for debugging purposes."""
print_conversation(
messages=[
{
'role': 'Tool-Response',
'content': response
}
],
)
search_tool = TavilySearch(max_results=5)
extract_tool = TavilyExtract()
@tool
def search_and_extract(query: str) -> list[dict]:
"""Performs a web search and returns structured content extracted from top results."""
time.sleep(3) # To avoid hitting the API rate limit in the llm-apis when calling the tool multiple times in a row.
MAX_NUMBER_OF_CHARS = 10_000
if CUSTOM_DEBUG:
print_tool_call(
search_and_extract,
tool_name='search_and_extract',
args={'query': query, 'max_number_of_chars': MAX_NUMBER_OF_CHARS},
)
results = search_tool.invoke({"query": query})
raw_results = results.get("results", [])
urls = [r["url"] for r in raw_results if r.get("url")]
if not urls:
return [{"error": "No URLs found to extract from."}]
extracted = extract_tool.invoke({"urls": urls})
results = extracted.get("results", [])
structured_results = []
raw_contents = [doc.get("raw_content", "") for doc in results]
for result, doc_content in zip(raw_results, raw_contents):
doc_content_trunc = doc_content[0:MAX_NUMBER_OF_CHARS] if len(doc_content) > MAX_NUMBER_OF_CHARS else doc_content
structured_results.append({
"title": result.get("title"),
"url": result.get("url"),
"snippet": result.get("content"),
"raw_content": doc_content_trunc
})
if CUSTOM_DEBUG:
console_structured_results = [{k: v for k, v in result_dicti.items() if k != "raw_content"} for result_dicti in
structured_results]
print_tool_response(json.dumps(console_structured_results))
return structured_results
def extract_video_id(url: str) -> str:
parsed = urlparse(url)
return parse_qs(parsed.query).get("v", [""])[0]
@tool
def load_youtube_transcript(url: str) -> str:
"""Load a YouTube transcript using youtube_transcript_api."""
video_id = extract_video_id(url)
if CUSTOM_DEBUG:
print_tool_call(
load_youtube_transcript,
tool_name='load_youtube_transcript',
args={'url': url},
)
try:
youtube_api_client = YouTubeTranscriptApi()
fetched_transcript = youtube_api_client.fetch(video_id=video_id)
transcript = " ".join(entry.text for entry in fetched_transcript if entry.text.strip())
if transcript and CUSTOM_DEBUG:
print_tool_response(transcript)
return transcript
except Exception as e:
error_str = f"Error loading transcript: {e}. Assuming no transcript for this video."
print_tool_response(error_str)
return error_str
gemini = ChatGoogleGenerativeAI(model="gemini-1.5-flash")
@tool
def image_query_tool(image_path: str, question: str) -> str:
"""
Uses Gemini Vision to answer a question about an image.
- image_path: file path to the image to analyze (.png)
- question: the query to ask about the image
"""
try:
base64_img = encode_image_to_base64(image_path)
except OSError:
response = f"OSError: Invalid argument (invalid image path or file format): {image_path}. Please provide a valid PNG image."
print_tool_response(response)
return response
base64_img_str = f"data:image/png;base64,{base64_img}"
if CUSTOM_DEBUG:
print_tool_call(
image_query_tool,
tool_name='image_query_tool',
args={'base64_image': base64_img_str[:100], 'question': question},
)
msg = HumanMessage(content=[
{"type": "text", "text": question},
{"type": "image_url", "image_url": base64_img_str},
])
try:
response = gemini.invoke([msg])
except ChatGoogleGenerativeAIError:
response = "ChatGoogleGenerativeAIError: Invalid argument provided to Gemini: 400 Provided image is not valid"
print_tool_response(response)
return response
if CUSTOM_DEBUG:
print_tool_response(response.content)
return response.content
@tool
def search_and_extract_from_wikipedia(query: str) -> list:
"""Search Wikipedia for a query and extract useful information."""
wiki_api_wrapper = WikipediaAPIWrapper()
wiki_tool = WikipediaQueryRun(api_wrapper=wiki_api_wrapper)
if CUSTOM_DEBUG:
print_tool_call(
search_and_extract_from_wikipedia,
tool_name='search_and_extract_from_wikipedia',
args={'query': query},
)
response = wiki_tool.invoke(query)
if CUSTOM_DEBUG:
print_tool_response(response)
return response |