File size: 3,383 Bytes
e836bd4 06074b9 a3a06d3 06074b9 188a166 06074b9 188a166 06074b9 4695b90 06074b9 6d51abb 06074b9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
import os
import requests
import base64
from langchain_openai import ChatOpenAI
from langchain_community.tools import DuckDuckGoSearchRun
from langchain.agents import initialize_agent, Tool
from langchain.agents.agent_types import AgentType
from langchain.memory import ConversationBufferMemory
from langchain_core.messages import HumanMessage
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
class BasicAgent:
def __init__(self):
print("BasicAgent initialized.")
# Use DuckDuckGo Search only
tools = [
Tool(
name="DuckDuckGo Search",
func=DuckDuckGoSearchRun().run,
description="Use this tool to find factual information or recent events."
),
Tool(
name="Image Analyzer",
func=self.describe_image,
description="Analyzes and describes what's in an image. Input is an image path."
)
]
memory = ConversationBufferMemory(memory_key="chat_history")
self.model = ChatOpenAI(model="gpt-4.1-mini", temperature=0)
self.agent = initialize_agent(
tools=tools,
llm=self.model,
agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
verbose=True,
memory=memory
)
def describe_image(self, img_path: str) -> str:
all_text = ""
try:
r = requests.get(img_path, timeout=10)
image_bytes = r.content
image_base64 = base64.b64encode(image_bytes).decode("utf-8")
message = [
HumanMessage(
content=[
{
"type": "text",
"text": (
"You're a chess assistant. Answer only with the best move in algebraic notation (e.g., Qd1#)."
),
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/png;base64,{image_base64}"
},
},
]
)
]
response = self.model.invoke(message)
all_text += response.content + "\n\n"
return all_text.strip()
except Exception as e:
error_msg = f"Error extracting text: {str(e)}"
print(error_msg)
return ""
def fetch_file(self, task_id):
try:
url = f"{DEFAULT_API_URL}/files/{task_id}"
r = requests.get(url, timeout=10)
r.raise_for_status()
return url, r.content, r.headers.get("Content-Type", "")
except:
return None, None, None
def __call__(self, question: str, task_id: str) -> str:
print(f"Agent received question (first 50 chars) {task_id}: {question[:50]}...")
file_url, file_content, file_type = self.fetch_file(task_id)
print(f"Fetched file {file_type}")
if file_url is not None:
question = f"{question} This task has assigned file with URL: {file_url}"
fixed_answer = self.agent.run(question)
print(f"Agent returning fixed answer: {fixed_answer}")
return fixed_answer |