joaomorossini's picture
Add new agents and tools for Devid and BrowsingAgent; remove obsolete ResearchAndReportAgent
670dd87
import base64
import re
from typing_extensions import override
from agency_swarm.agents import Agent
from agency_swarm.tools.oai import FileSearch
class BrowsingAgent(Agent):
SCREENSHOT_FILE_NAME = "screenshot.jpg"
def __init__(self, selenium_config=None, **kwargs):
from .tools.util.selenium import set_selenium_config
super().__init__(
name="BrowsingAgent",
description="This agent is designed to navigate and search web effectively.",
instructions="./instructions.md",
files_folder="./files",
schemas_folder="./schemas",
tools=[],
tools_folder="./tools",
temperature=0,
max_prompt_tokens=16000,
model="gpt-4o",
validation_attempts=25,
**kwargs,
)
if selenium_config is not None:
set_selenium_config(selenium_config)
self.prev_message = ""
@override
def response_validator(self, message):
from selenium.webdriver.common.by import By
from selenium.webdriver.support.select import Select
from .tools.util import (
highlight_elements_with_labels,
remove_highlight_and_labels,
)
from .tools.util.selenium import get_web_driver, set_web_driver
# Filter out everything in square brackets
filtered_message = re.sub(r"\[.*?\]", "", message).strip()
if filtered_message and self.prev_message == filtered_message:
raise ValueError(
"Do not repeat yourself. If you are stuck, try a different approach or search in google for the page you are looking for directly."
)
self.prev_message = filtered_message
if "[send screenshot]" in message.lower():
wd = get_web_driver()
remove_highlight_and_labels(wd)
self.take_screenshot()
response_text = "Here is the screenshot of the current web page:"
elif "[highlight clickable elements]" in message.lower():
wd = get_web_driver()
highlight_elements_with_labels(
wd,
'a, button, div[onclick], div[role="button"], div[tabindex], '
'span[onclick], span[role="button"], span[tabindex]',
)
self._shared_state.set(
"elements_highlighted",
'a, button, div[onclick], div[role="button"], div[tabindex], '
'span[onclick], span[role="button"], span[tabindex]',
)
self.take_screenshot()
all_elements = wd.find_elements(By.CSS_SELECTOR, ".highlighted-element")
all_element_texts = [element.text for element in all_elements]
element_texts_json = {}
for i, element_text in enumerate(all_element_texts):
element_texts_json[str(i + 1)] = self.remove_unicode(element_text)
element_texts_json = {k: v for k, v in element_texts_json.items() if v}
element_texts_formatted = ", ".join(
[f"{k}: {v}" for k, v in element_texts_json.items()]
)
response_text = (
"Here is the screenshot of the current web page with highlighted clickable elements. \n\n"
"Texts of the elements are: " + element_texts_formatted + ".\n\n"
"Elements without text are not shown, but are available on screenshot. \n"
"Please make sure to analyze the screenshot to find the clickable element you need to click on."
)
elif "[highlight text fields]" in message.lower():
wd = get_web_driver()
highlight_elements_with_labels(wd, "input, textarea")
self._shared_state.set("elements_highlighted", "input, textarea")
self.take_screenshot()
all_elements = wd.find_elements(By.CSS_SELECTOR, ".highlighted-element")
all_element_texts = [element.text for element in all_elements]
element_texts_json = {}
for i, element_text in enumerate(all_element_texts):
element_texts_json[str(i + 1)] = self.remove_unicode(element_text)
element_texts_formatted = ", ".join(
[f"{k}: {v}" for k, v in element_texts_json.items()]
)
response_text = (
"Here is the screenshot of the current web page with highlighted text fields: \n"
"Texts of the elements are: " + element_texts_formatted + ".\n"
"Please make sure to analyze the screenshot to find the text field you need to fill."
)
elif "[highlight dropdowns]" in message.lower():
wd = get_web_driver()
highlight_elements_with_labels(wd, "select")
self._shared_state.set("elements_highlighted", "select")
self.take_screenshot()
all_elements = wd.find_elements(By.CSS_SELECTOR, ".highlighted-element")
all_selector_values = {}
i = 0
for element in all_elements:
select = Select(element)
options = select.options
selector_values = {}
for j, option in enumerate(options):
selector_values[str(j)] = option.text
if j > 10:
break
all_selector_values[str(i + 1)] = selector_values
all_selector_values = {k: v for k, v in all_selector_values.items() if v}
all_selector_values_formatted = ", ".join(
[f"{k}: {v}" for k, v in all_selector_values.items()]
)
response_text = (
"Here is the screenshot with highlighted dropdowns. \n"
"Selector values are: " + all_selector_values_formatted + ".\n"
"Please make sure to analyze the screenshot to find the dropdown you need to select."
)
else:
return message
set_web_driver(wd)
content = self.create_response_content(response_text)
raise ValueError(content)
def take_screenshot(self):
from .tools.util import get_b64_screenshot
from .tools.util.selenium import get_web_driver
wd = get_web_driver()
screenshot = get_b64_screenshot(wd)
screenshot_data = base64.b64decode(screenshot)
with open(self.SCREENSHOT_FILE_NAME, "wb") as screenshot_file:
screenshot_file.write(screenshot_data)
def create_response_content(self, response_text):
with open(self.SCREENSHOT_FILE_NAME, "rb") as file:
file_id = self.client.files.create(
file=file,
purpose="vision",
).id
content = [
{"type": "text", "text": response_text},
{"type": "image_file", "image_file": {"file_id": file_id}},
]
return content
# Function to check for Unicode escape sequences
def remove_unicode(self, data):
return re.sub(r"[^\x00-\x7F]+", "", data)