|
import os |
|
import requests |
|
import base64 |
|
from langchain_openai import ChatOpenAI |
|
from langchain_community.tools import DuckDuckGoSearchRun |
|
from langchain.agents import initialize_agent, Tool |
|
from langchain.agents.agent_types import AgentType |
|
from langchain.memory import ConversationBufferMemory |
|
from langchain_core.messages import HumanMessage |
|
|
|
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" |
|
|
|
class BasicAgent: |
|
def __init__(self): |
|
print("BasicAgent initialized.") |
|
|
|
tools = [ |
|
Tool( |
|
name="DuckDuckGo Search", |
|
func=DuckDuckGoSearchRun().run, |
|
description="Use this tool to find factual information or recent events." |
|
), |
|
Tool( |
|
name="Image Analyzer", |
|
func=self.describe_image, |
|
description="Analyzes and describes what's in an image. Input is an image path." |
|
) |
|
] |
|
|
|
memory = ConversationBufferMemory(memory_key="chat_history") |
|
self.model = ChatOpenAI(model="gpt-4.1-mini", temperature=0) |
|
|
|
self.agent = initialize_agent( |
|
tools=tools, |
|
llm=self.model, |
|
agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, |
|
verbose=True, |
|
memory=memory |
|
) |
|
|
|
def describe_image(self, img_path: str) -> str: |
|
all_text = "" |
|
try: |
|
r = requests.get(img_path, timeout=10) |
|
image_bytes = r.content |
|
image_base64 = base64.b64encode(image_bytes).decode("utf-8") |
|
message = [ |
|
HumanMessage( |
|
content=[ |
|
{ |
|
"type": "text", |
|
"text": ( |
|
"You're a chess assistant. Answer only with the best move in algebraic notation (e.g., Qd1#)." |
|
), |
|
}, |
|
{ |
|
"type": "image_url", |
|
"image_url": { |
|
"url": f"data:image/png;base64,{image_base64}" |
|
}, |
|
}, |
|
] |
|
) |
|
] |
|
response = self.model.invoke(message) |
|
all_text += response.content + "\n\n" |
|
return all_text.strip() |
|
except Exception as e: |
|
error_msg = f"Error extracting text: {str(e)}" |
|
print(error_msg) |
|
return "" |
|
|
|
def fetch_file(self, task_id): |
|
try: |
|
url = f"{DEFAULT_API_URL}/files/{task_id}" |
|
r = requests.get(url, timeout=10) |
|
r.raise_for_status() |
|
return url, r.content, r.headers.get("Content-Type", "") |
|
except: |
|
return None, None, None |
|
|
|
def __call__(self, question: str, task_id: str) -> str: |
|
print(f"Agent received question (first 50 chars) {task_id}: {question[:50]}...") |
|
file_url, file_content, file_type = self.fetch_file(task_id) |
|
print(f"Fetched file {file_type}") |
|
if file_url is not None: |
|
question = f"{question} This task has assigned file with URL: {file_url}" |
|
fixed_answer = self.agent.run(question) |
|
print(f"Agent returning fixed answer: {fixed_answer}") |
|
return fixed_answer |