Spaces:
Sleeping
Sleeping
# coding: utf-8 | |
# Copyright (c) 2025 inclusionAI. | |
import os | |
import traceback | |
import asyncio | |
import time | |
from typing import Tuple, Any | |
from examples.tools.tool_action import BrowserAction | |
from aworld.core.tool.action_factory import ActionFactory | |
from aworld.core.common import ActionModel, ActionResult, Observation | |
from examples.tools.browsers.util.dom import DOMElementNode | |
from aworld.logs.util import logger | |
from examples.tools.browsers.action.utils import DomUtil | |
from aworld.core.tool.action import ExecutableAction | |
from aworld.utils import import_packages | |
from aworld.models.llm import get_llm_model, call_llm_model | |
def get_page(**kwargs): | |
tool = kwargs.get("tool") | |
if tool is None: | |
page = kwargs.get('page') | |
else: | |
page = tool.page | |
return page | |
def get_browser(**kwargs): | |
tool = kwargs.get("tool") | |
if tool is None: | |
page = kwargs.get('browser') | |
else: | |
page = tool.context | |
return page | |
class GotoUrl(ExecutableAction): | |
def act(self, action: ActionModel, **kwargs) -> Tuple[ActionResult, Any]: | |
logger.info(f"exec {BrowserAction.GO_TO_URL.value.name} action") | |
page = get_page(**kwargs) | |
if page is None: | |
logger.warning(f"{BrowserAction.GO_TO_URL.name} page is none") | |
return ActionResult(content="no page", keep=True), page | |
params = action.params | |
url = params.get("url") | |
if not url: | |
logger.warning("empty url, go to nothing.") | |
return ActionResult(content="empty url", keep=True), page | |
items = url.split('://') | |
if len(items) == 1: | |
if items[0][0] != '/': | |
url = "file://" + os.path.join(os.getcwd(), url) | |
page.goto(url) | |
page.wait_for_load_state() | |
msg = f'Navigated to {url}' | |
logger.info(msg) | |
return ActionResult(content=msg, keep=True), page | |
async def async_act(self, action: ActionModel, **kwargs) -> Tuple[ActionResult, Any]: | |
logger.info(f"exec {BrowserAction.GO_TO_URL.value.name} action") | |
page = get_page(**kwargs) | |
if page is None: | |
logger.warning(f"{BrowserAction.GO_TO_URL.name} page is none") | |
return ActionResult(content="no page", keep=True), page | |
url = action.params.get("url") | |
if not url: | |
logger.warning("empty url, go to nothing.") | |
return ActionResult(content="empty url", keep=True), page | |
items = url.split('://') | |
if len(items) == 1: | |
if items[0][0] != '/': | |
url = "file://" + os.path.join(os.getcwd(), url) | |
await page.goto(url) | |
await page.wait_for_load_state() | |
msg = f'Navigated to {url}' | |
logger.info(msg) | |
return ActionResult(content=msg, keep=True), page | |
class InputText(ExecutableAction): | |
def act(self, action: ActionModel, **kwargs) -> Tuple[ActionResult, Any]: | |
logger.info(f"exec {BrowserAction.INPUT_TEXT.value.name} action") | |
page = get_page(**kwargs) | |
if page is None: | |
logger.warning(f"{BrowserAction.INPUT_TEXT.name} page is none") | |
return ActionResult(content="input text no page", keep=True), page | |
params = action.params | |
index = params.get("index", 0) | |
# compatible with int and str datatype | |
index = int(index) | |
input = params.get("text", "") | |
ob: Observation = kwargs.get("observation") | |
if not ob or index not in ob.dom_tree.element_map: | |
raise RuntimeError(f'Element index {index} does not exist') | |
if not input: | |
raise ValueError(f'No input to the page') | |
element_node = ob.dom_tree.element_map[index] | |
self.input_to_element(input, page, element_node) | |
msg = f'Input {input} into index {index}' | |
logger.info(f"action {msg}") | |
logger.debug(f'Element xpath: {element_node.xpath}') | |
return ActionResult(content=msg, keep=True), page | |
async def async_act(self, action: ActionModel, **kwargs) -> Tuple[ActionResult, Any]: | |
logger.info(f"exec {BrowserAction.INPUT_TEXT.value.name} action") | |
page = get_page(**kwargs) | |
if page is None: | |
logger.warning(f"{BrowserAction.INPUT_TEXT.name} page is none") | |
return ActionResult(content="input text no page", keep=True), page | |
params = action.params | |
index = params.get("index") | |
# compatible with int and str datatype | |
index = int(index) | |
input = params.get("text", "") | |
ob: Observation = kwargs.get("observation") | |
if not ob or index not in ob.dom_tree.element_map: | |
raise RuntimeError(f'Element index {index} does not exist') | |
if not input: | |
raise ValueError(f'No input to the page') | |
element_node = ob.dom_tree.element_map[index] | |
await self.async_input_to_element(input, page, element_node) | |
msg = f'Input {input} into index {index}' | |
logger.info(f"action {msg}") | |
logger.debug(f'Element xpath: {element_node.xpath}') | |
return ActionResult(content=msg, keep=True), page | |
def input_to_element(self, input: str, page, element_node: DOMElementNode): | |
try: | |
# Highlight before typing | |
# if element_node.highlight_index is not None: | |
# await self._update_state(focus_element=element_node.highlight_index) | |
element_handle = DomUtil.get_locate_element(page, element_node) | |
if element_handle is None: | |
raise RuntimeError(f'Element: {repr(element_node)} not found') | |
# Ensure element is ready for input | |
try: | |
element_handle.wait_for_element_state('stable', timeout=1000) | |
element_handle.scroll_into_view_if_needed(timeout=1000) | |
except Exception: | |
pass | |
# Get element properties to determine input method | |
is_contenteditable = element_handle.get_property('isContentEditable') | |
# Different handling for contenteditable vs input fields | |
if is_contenteditable.json_value(): | |
element_handle.evaluate('el => el.textContent = ""') | |
element_handle.type(input, delay=5) | |
else: | |
element_handle.fill(input) | |
except Exception as e: | |
logger.warning(f'Failed to input text into element: {repr(element_node)}. Error: {str(e)}') | |
raise RuntimeError(f'Failed to input text into index {element_node.highlight_index}') | |
async def async_input_to_element(self, input: str, page, element_node: DOMElementNode): | |
try: | |
element_handle = await DomUtil.async_get_locate_element(page, element_node) | |
if element_handle is None: | |
raise RuntimeError(f'Element: {repr(element_node)} not found') | |
# Ensure element is ready for input | |
try: | |
await element_handle.wait_for_element_state('stable', timeout=1000) | |
await element_handle.scroll_into_view_if_needed(timeout=1000) | |
except Exception: | |
pass | |
# Get element properties to determine input method | |
is_contenteditable = await element_handle.get_property('isContentEditable') | |
# Different handling for contenteditable vs input fields | |
if await is_contenteditable.json_value(): | |
await element_handle.evaluate('el => el.textContent = ""') | |
await element_handle.type(input, delay=5) | |
else: | |
await element_handle.fill(input) | |
except Exception as e: | |
logger.warning(f'Failed to input text into element: {repr(element_node)}. Error: {str(e)}') | |
raise RuntimeError(f'Failed to input text into index {element_node.highlight_index}') | |
class ClickElement(ExecutableAction): | |
def __init__(self): | |
import_packages(['playwright', 'markdownify']) | |
def act(self, action: ActionModel, **kwargs) -> Tuple[ActionResult, Any]: | |
from playwright.sync_api import BrowserContext | |
logger.info(f"exec {BrowserAction.CLICK_ELEMENT.value.name} action") | |
page = get_page(**kwargs) | |
if page is None: | |
logger.warning(f"{BrowserAction.CLICK_ELEMENT.name} page is none") | |
return ActionResult(content="input text no page", keep=True), page | |
browser: BrowserContext = get_browser(**kwargs) | |
if browser is None: | |
logger.warning(f"{BrowserAction.CLICK_ELEMENT.name} browser context is none") | |
return ActionResult(content="none browser context", keep=True), page | |
index = action.params.get("index") | |
# compatible with int and str datatype | |
index = int(index) | |
ob: Observation = kwargs.get("observation") | |
if not ob or index not in ob.dom_tree.element_map: | |
raise RuntimeError(f'Element index {index} does not exist') | |
if not input: | |
raise ValueError(f'No input to the page') | |
element_node = ob.dom_tree.element_map[index] | |
try: | |
pages = len(browser.pages) | |
msg = f'Clicked button with index {index}: {element_node.get_all_text_till_next_clickable_element(max_depth=2)}' | |
logger.info(msg) | |
DomUtil.click_element(page, element_node, browser=browser) | |
logger.debug(f'Element xpath: {element_node.xpath}') | |
if len(browser.pages) > pages: | |
new_tab_msg = 'Open the new tab' | |
msg += f' - {new_tab_msg}' | |
logger.info(new_tab_msg) | |
page = browser.pages[-1] | |
page.bring_to_front() | |
page.wait_for_load_state(timeout=60000) | |
return ActionResult(content=msg, keep=True), page | |
except Exception as e: | |
logger.warning(f'Element not clickable with index {index} - most likely the page changed') | |
return ActionResult(error=str(e)), page | |
async def async_act(self, action: ActionModel, **kwargs) -> Tuple[ActionResult, Any]: | |
logger.info(f"exec {BrowserAction.CLICK_ELEMENT.value.name} action") | |
page = get_page(**kwargs) | |
if page is None: | |
logger.warn(f"{BrowserAction.CLICK_ELEMENT.name} page is none") | |
return ActionResult(content="input text no page", keep=True), page | |
browser = get_browser(**kwargs) | |
if browser is None: | |
logger.warning(f"{BrowserAction.CLICK_ELEMENT.name} browser context is none") | |
return ActionResult(content="none browser context", keep=True), page | |
index = action.params.get("index") | |
# compatible with int and str datatype | |
index = int(index) | |
ob: Observation = kwargs.get("observation") | |
if not ob or index not in ob.dom_tree.element_map: | |
raise RuntimeError(f'Element index {index} does not exist') | |
if not input: | |
raise ValueError(f'No input to the page') | |
element_node = ob.dom_tree.element_map[index] | |
pages = len(browser.pages) | |
try: | |
await DomUtil.async_click_element(page, element_node, browser=browser) | |
msg = f'Clicked button with index {index}: {element_node.get_all_text_till_next_clickable_element(max_depth=2)}' | |
logger.info(msg) | |
logger.debug(f'Element xpath: {element_node.xpath}') | |
if len(browser.pages) > pages: | |
new_tab_msg = 'Open the new tab' | |
msg += f' - {new_tab_msg}' | |
logger.info(new_tab_msg) | |
page = browser.pages[-1] | |
await page.bring_to_front() | |
await page.wait_for_load_state(timeout=60000) | |
return ActionResult(content=msg, keep=True), page | |
except Exception as e: | |
logger.warning(f'Element not clickable with index {index} - most likely the page changed') | |
return ActionResult(error=str(e)), page | |
# SEARCH_ENGINE = {"": "https://www.google.com/search?udm=14&q=", | |
# "google": "https://www.google.com/search?udm=14&q="} | |
SEARCH_ENGINE = {"": "https://www.bing.com/search?q=", | |
"google": "https://www.bing.com/search?q="} | |
class Search(ExecutableAction): | |
def act(self, action: ActionModel, **kwargs) -> Tuple[ActionResult, Any]: | |
logger.info(f"exec {BrowserAction.SEARCH.value.name} action") | |
page = get_page(**kwargs) | |
if page is None: | |
logger.warning(f"{BrowserAction.SEARCH.name} page is none") | |
return ActionResult(content="search no page", keep=True), page | |
params = action.params if action.params else {} | |
engine = params.get("engine", "") | |
url = SEARCH_ENGINE.get(engine) | |
query = params.get("query") | |
page.goto(f'{url}{query}') | |
page.wait_for_load_state() | |
msg = f'Searched for "{query}" in {url}' | |
logger.info(msg) | |
return ActionResult(content=msg, keep=True), page | |
async def async_act(self, action: ActionModel, **kwargs) -> Tuple[ActionResult, Any]: | |
logger.info(f"exec {BrowserAction.SEARCH.value.name} action") | |
page = get_page(**kwargs) | |
if page is None: | |
logger.warning(f"{BrowserAction.SEARCH.name} page is none") | |
return ActionResult(content="search no page", keep=True), page | |
params = action.params if action.params else {} | |
engine = params.get("engine", "") | |
url = SEARCH_ENGINE.get(engine) | |
query = params.get("query") | |
await page.goto(f'{url}{query}') | |
await page.wait_for_load_state() | |
msg = f'Searched for "{query}" in {url}' | |
logger.info(msg) | |
return ActionResult(content=msg, keep=True), page | |
class SearchGoogle(ExecutableAction): | |
def act(self, action: ActionModel, **kwargs) -> Tuple[ActionResult, Any]: | |
logger.info(f"exec {BrowserAction.SEARCH_GOOGLE.value.name} action") | |
page = get_page(**kwargs) | |
if page is None: | |
logger.warning(f"{BrowserAction.SEARCH_GOOGLE.name} page is none") | |
return ActionResult(content="search no page", keep=True), page | |
query = action.params.get("query") | |
page.goto(f'{SEARCH_ENGINE.get("")}{query}') | |
page.wait_for_load_state() | |
msg = f'Searched for "{query}" in Google' | |
logger.info(msg) | |
return ActionResult(content=msg, keep=True), page | |
async def async_act(self, action: ActionModel, **kwargs) -> Tuple[ActionResult, Any]: | |
logger.info(f"exec {BrowserAction.SEARCH_GOOGLE.value.name} action") | |
page = get_page(**kwargs) | |
if page is None: | |
logger.warning(f"{BrowserAction.SEARCH_GOOGLE.name} page is none") | |
return ActionResult(content="search no page", keep=True), page | |
query = action.params.get("query") | |
await page.goto(f'{SEARCH_ENGINE.get("")}{query}') | |
await page.wait_for_load_state() | |
msg = f'Searched for "{query}" in Google' | |
logger.info(msg) | |
return ActionResult(content=msg, keep=True), page | |
class NewTab(ExecutableAction): | |
def act(self, action: ActionModel, **kwargs) -> Tuple[ActionResult, Any]: | |
logger.info(f"exec {BrowserAction.NEW_TAB.value.name} action") | |
browser = get_browser(**kwargs) | |
url = action.params.get("url") | |
new_page = browser.new_page() | |
new_page.wait_for_load_state() | |
if url: | |
new_page.goto(url) | |
DomUtil.wait_for_stable_network(new_page) | |
msg = f'Opened new tab with {url}' | |
logger.debug(msg) | |
return ActionResult(content=msg, keep=True), new_page | |
async def async_act(self, action: ActionModel, **kwargs) -> Tuple[ActionResult, Any]: | |
logger.info(f"exec {BrowserAction.NEW_TAB.value.name} action") | |
browser = get_browser(**kwargs) | |
url = action.params.get("url") | |
new_page = await browser.new_page() | |
await new_page.wait_for_load_state() | |
if url: | |
await new_page.goto(url) | |
DomUtil.wait_for_stable_network(new_page) | |
msg = f'Opened new tab with {url}' | |
logger.debug(msg) | |
return ActionResult(content=msg, keep=True), get_page(**kwargs) | |
class GoBack(ExecutableAction): | |
def act(self, action: ActionModel, **kwargs) -> Tuple[ActionResult, Any]: | |
logger.info(f"exec {BrowserAction.GO_BACK.value.name} action") | |
page = get_page(**kwargs) | |
if page is None: | |
logger.warning(f"{BrowserAction.GO_BACK.name} page is none") | |
return ActionResult(content="search no page", keep=True), page | |
page.go_back() | |
msg = 'Navigated back' | |
logger.info(msg) | |
return ActionResult(content=msg, keep=True), page | |
async def async_act(self, action: ActionModel, **kwargs) -> Tuple[ActionResult, Any]: | |
logger.info(f"exec {BrowserAction.GO_BACK.value.name} action") | |
page = get_page(**kwargs) | |
if page is None: | |
logger.warning(f"{BrowserAction.GO_BACK.name} page is none") | |
return ActionResult(content="search no page", keep=True), page | |
await page.go_back() | |
msg = 'Navigated back' | |
logger.info(msg) | |
return ActionResult(content=msg, keep=True), page | |
class ExtractContent(ExecutableAction): | |
def act(self, action: ActionModel, **kwargs) -> Tuple[ActionResult, Any]: | |
import markdownify | |
from langchain_core.prompts import PromptTemplate | |
logger.info(f"exec {BrowserAction.EXTRACT_CONTENT.value.name} action") | |
page = get_page(**kwargs) | |
if page is None: | |
logger.warning(f"{BrowserAction.EXTRACT_CONTENT.name} page is none") | |
return ActionResult(content="extract content no page", keep=True), page | |
goal = action.params.get("goal") | |
llm_config = kwargs.get("llm_config") | |
if llm_config and llm_config.llm_api_key: | |
llm = get_llm_model(llm_config) | |
max_extract_content_output_tokens = kwargs.get("max_extract_content_output_tokens") | |
max_extract_content_input_tokens = kwargs.get("max_extract_content_input_tokens") | |
content = markdownify.markdownify(page.content()) | |
# Truncate content if it exceeds max input tokens | |
if max_extract_content_input_tokens and len(content) > max_extract_content_input_tokens: | |
logger.warning( | |
f"Content length ({len(content)}) exceeds max input tokens ({max_extract_content_input_tokens}). Truncating content.") | |
content = content[:max_extract_content_input_tokens] | |
prompt = 'Your task is to extract the content of the page. You will be given a page and a goal and you should extract all relevant information around this goal from the page. If the goal is vague, summarize the page. Respond in json format. Extraction goal: {goal}, Page: {page}' | |
prompt_with_outputlimit = 'Your task is to extract the content of the page. You will be given a page and a goal and you should extract all relevant information around this goal from the page. If the goal is vague, summarize the page. Respond in json format. Extraction goal: {goal}, Page: {page} \n\n#The length of the returned result must be less than {max_extract_content_output_tokens} characters.' | |
template = PromptTemplate(input_variables=['goal', 'page'], template=prompt) | |
messages = [{'role': 'user', 'content': template.format(goal=goal, page=content)}] | |
try: | |
output = call_llm_model(llm, | |
messages=messages, | |
model=llm_config.llm_model_name, | |
temperature=llm_config.llm_temperature) | |
result_content = output.content | |
# Check if output exceeds the token limit and retry with length-limited prompt if needed | |
if max_extract_content_output_tokens and len(result_content) > max_extract_content_output_tokens: | |
logger.warning( | |
f"Output exceeds maximum length ({len(result_content)} > {max_extract_content_output_tokens}). Retrying with limited prompt.") | |
template_with_limit = PromptTemplate( | |
input_variables=['goal', 'page', 'max_extract_content_output_tokens'], | |
template=prompt_with_outputlimit | |
) | |
messages = [{'role': 'user', 'content': template_with_limit.format( | |
goal=goal, | |
page=content, | |
max_extract_content_output_tokens=max_extract_content_output_tokens, | |
max_tokens=max_extract_content_output_tokens | |
)}] | |
# extract content with length limit | |
output = call_llm_model(llm, | |
messages=messages, | |
model=llm_config.llm_model_name, | |
temperature=llm_config.llm_temperature) | |
result_content = output.content | |
msg = f'Extracted from page\n: {result_content}\n' | |
logger.info(msg) | |
return ActionResult(content=msg, keep=True), page | |
except Exception as e: | |
logger.debug(f'Error extracting content: {e}') | |
msg = f'Extracted from page\n: {content}\n' | |
logger.info(msg) | |
return ActionResult(content=msg), page | |
async def async_act(self, action: ActionModel, **kwargs) -> Tuple[ActionResult, Any]: | |
import markdownify | |
from langchain_core.prompts import PromptTemplate | |
logger.info(f"exec {BrowserAction.EXTRACT_CONTENT.value.name} action") | |
page = get_page(**kwargs) | |
if page is None: | |
logger.warning(f"{BrowserAction.EXTRACT_CONTENT.name} page is none") | |
return ActionResult(content="extract content no page", keep=True), page | |
goal = action.params.get("goal") | |
llm_config = kwargs.get("llm_config") | |
if llm_config and llm_config.llm_api_key: | |
llm = get_llm_model(llm_config) | |
content = markdownify.markdownify(await page.content()) | |
max_extract_content_output_tokens = kwargs.get("max_extract_content_output_tokens") | |
max_extract_content_input_tokens = kwargs.get("max_extract_content_input_tokens") | |
# Truncate content if it exceeds max input tokens | |
if max_extract_content_input_tokens and len(content) > max_extract_content_input_tokens: | |
logger.warning( | |
f"Content length ({len(content)}) exceeds max input tokens ({max_extract_content_input_tokens}). Truncating content.") | |
content = content[:max_extract_content_input_tokens] | |
prompt = 'Your task is to extract the content of the page. You will be given a page and a goal and you should extract all relevant information around this goal from the page. If the goal is vague, summarize the page. Respond in json format. Extraction goal: {goal}, Page: {page}' | |
prompt_with_outputlimit = 'Your task is to extract the content of the page. You will be given a page and a goal and you should extract all relevant information around this goal from the page. If the goal is vague, summarize the page. Respond in json format. Extraction goal: {goal}, Page: {page} \n\n#The length of the returned result must be less than {max_extract_content_output_tokens} characters.' | |
template = PromptTemplate(input_variables=['goal', 'page'], template=prompt) | |
messages = [{'role': 'user', 'content': template.format(goal=goal, page=content)}] | |
try: | |
output = call_llm_model(llm, | |
messages=messages, | |
model=llm_config.llm_model_name, | |
temperature=llm_config.llm_temperature) | |
result_content = output.content | |
# Check if output exceeds the token limit and retry with length-limited prompt if needed | |
if max_extract_content_output_tokens and len(result_content) > max_extract_content_output_tokens: | |
logger.info( | |
f"Output exceeds maximum length ({len(result_content)} > {max_extract_content_output_tokens}). Retrying with limited prompt.") | |
template_with_limit = PromptTemplate( | |
input_variables=['goal', 'page', 'max_extract_content_output_tokens'], | |
template=prompt_with_outputlimit | |
) | |
messages = [{'role': 'user', 'content': template_with_limit.format( | |
goal=goal, | |
page=content, | |
max_extract_content_output_tokens=max_extract_content_output_tokens, | |
max_tokens=max_extract_content_output_tokens | |
)}] | |
# extract content with length limit | |
output = call_llm_model(llm, | |
messages=messages, | |
model=llm_config.llm_model_name, | |
temperature=llm_config.llm_temperature) | |
result_content = output.content | |
msg = f'Extracted from page\n: {result_content}\n' | |
logger.info(msg) | |
return ActionResult(content=msg, keep=True), page | |
except Exception as e: | |
logger.debug(f'Error extracting content: {e}') | |
msg = f'Extracted from page\n: {content}\n' | |
logger.info(msg) | |
return ActionResult(content=msg), page | |
class ScrollDown(ExecutableAction): | |
def act(self, action: ActionModel, **kwargs) -> Tuple[ActionResult, Any]: | |
logger.info(f"exec {BrowserAction.SCROLL_DOWN.value.name} action") | |
page = get_page(**kwargs) | |
if page is None: | |
logger.warning(f"{BrowserAction.SCROLL_DOWN.name} page is none") | |
return ActionResult(content="scroll no page", keep=True), page | |
amount = action.params.get("amount") | |
if not amount: | |
page.evaluate('window.scrollBy(0, window.innerHeight);') | |
else: | |
amount = int(amount) | |
page.evaluate(f'window.scrollBy(0, {amount});') | |
amount = f'{amount} pixels' if amount else 'one page' | |
msg = f'Scrolled down the page by {amount}' | |
logger.info(msg) | |
return ActionResult(content=msg, keep=True), page | |
async def async_act(self, action: ActionModel, **kwargs) -> Tuple[ActionResult, Any]: | |
logger.info(f"exec {BrowserAction.SCROLL_DOWN.value.name} action") | |
page = get_page(**kwargs) | |
if page is None: | |
logger.warning(f"{BrowserAction.SCROLL_DOWN.name} page is none") | |
return ActionResult(content="scroll no page", keep=True), page | |
amount = action.params.get("amount") | |
if not amount: | |
await page.evaluate('window.scrollBy(0, window.innerHeight);') | |
else: | |
amount = int(amount) | |
await page.evaluate(f'window.scrollBy(0, {amount});') | |
amount = f'{amount} pixels' if amount else 'one page' | |
msg = f'Scrolled down the page by {amount}' | |
logger.info(msg) | |
return ActionResult(content=msg, keep=True), page | |
class ScrollUp(ExecutableAction): | |
def act(self, action: ActionModel, **kwargs) -> Tuple[ActionResult, Any]: | |
logger.info(f"exec {BrowserAction.SCROLL_UP.value.name} action") | |
page = get_page(**kwargs) | |
if page is None: | |
logger.warning(f"{BrowserAction.SCROLL_UP.name} page is none") | |
return ActionResult(content="scroll no page", keep=True), page | |
amount = action.params.get("amount") | |
if not amount: | |
page.evaluate('window.scrollBy(0, -window.innerHeight);') | |
else: | |
amount = int(amount) | |
page.evaluate(f'window.scrollBy(0, -{amount});') | |
amount = f'{amount} pixels' if amount else 'one page' | |
msg = f'Scrolled down the page by {amount}' | |
logger.info(msg) | |
return ActionResult(content=msg, keep=True), page | |
async def async_act(self, action: ActionModel, **kwargs) -> Tuple[ActionResult, Any]: | |
logger.info(f"exec {BrowserAction.SCROLL_UP.value.name} action") | |
page = get_page(**kwargs) | |
if page is None: | |
logger.warning(f"{BrowserAction.SCROLL_UP.name} page is none") | |
return ActionResult(content="scroll no page", keep=True), page | |
amount = action.params.get("amount") | |
if not amount: | |
await page.evaluate('window.scrollBy(0, -window.innerHeight);') | |
else: | |
amount = int(amount) | |
await page.evaluate(f'window.scrollBy(0, -{amount});') | |
amount = f'{amount} pixels' if amount else 'one page' | |
msg = f'Scrolled down the page by {amount}' | |
logger.info(msg) | |
return ActionResult(content=msg, keep=True), page | |
class Wait(ExecutableAction): | |
def act(self, action: ActionModel, **kwargs) -> Tuple[ActionResult, Any]: | |
seconds = action.params.get("seconds") | |
if not seconds: | |
seconds = action.params.get("duration", 0) | |
seconds = int(seconds) | |
msg = f'Waiting for {seconds} seconds' | |
logger.info(msg) | |
time.sleep(seconds) | |
return ActionResult(content=msg, keep=True), kwargs.get('page') | |
async def async_act(self, action: ActionModel, **kwargs) -> Tuple[ActionResult, Any]: | |
seconds = action.params.get("seconds") | |
if not seconds: | |
seconds = action.params.get("duration", 0) | |
seconds = int(seconds) | |
msg = f'Waiting for {seconds} seconds' | |
logger.info(msg) | |
await asyncio.sleep(seconds) | |
return ActionResult(content=msg, keep=True), kwargs.get('page') | |
class SwitchTab(ExecutableAction): | |
def act(self, action: ActionModel, **kwargs) -> Tuple[ActionResult, Any]: | |
logger.info(f"exec {BrowserAction.SWITCH_TAB.value.name} action") | |
browser = get_browser(**kwargs) | |
if browser is None: | |
logger.warning(f"{BrowserAction.SWITCH_TAB.name} browser context is none") | |
return ActionResult(content="switch tab no browser context", keep=True), get_page(**kwargs) | |
page_id = action.params.get("page_id", 0) | |
page_id = int(page_id) | |
pages = browser.pages | |
if page_id >= len(pages): | |
raise RuntimeError(f'No tab found with page_id: {page_id}') | |
page = pages[page_id] | |
page.bring_to_front() | |
page.wait_for_load_state() | |
msg = f'Switched to tab {page_id}' | |
logger.info(msg) | |
return ActionResult(content=msg, keep=True), page | |
async def async_act(self, action: ActionModel, **kwargs) -> Tuple[ActionResult, Any]: | |
logger.info(f"exec {BrowserAction.SWITCH_TAB.value.name} action") | |
browser = get_browser(**kwargs) | |
if browser is None: | |
logger.warning(f"{BrowserAction.SWITCH_TAB.name} browser context is none") | |
return ActionResult(content="switch tab no browser context", keep=True), get_page(**kwargs) | |
page_id = action.params.get("page_id", 0) | |
page_id = int(page_id) | |
pages = browser.pages | |
if page_id >= len(pages): | |
raise RuntimeError(f'No tab found with page_id: {page_id}') | |
page = pages[page_id] | |
await page.bring_to_front() | |
await page.wait_for_load_state() | |
msg = f'Switched to tab {page_id}' | |
logger.info(msg) | |
return ActionResult(content=msg, keep=True), page | |
class SendKeys(ExecutableAction): | |
def act(self, action: ActionModel, **kwargs) -> Tuple[ActionResult, Any]: | |
logger.info(f"exec {BrowserAction.SEND_KEYS.value.name} action") | |
page = get_page(**kwargs) | |
if page is None: | |
logger.warning(f"{BrowserAction.SEND_KEYS.name} page is none") | |
return ActionResult(content="scroll no page", keep=True), page | |
keys = action.params.get("keys") | |
if not keys: | |
return ActionResult(success=False, content="no keys", keep=True), page | |
try: | |
page.keyboard.press(keys) | |
except Exception as e: | |
logger.warning(f"{keys} press fail. \n{traceback.format_exc()}") | |
raise e | |
return ActionResult(content=f"Sent keys: {keys}", keep=True), page | |
async def async_act(self, action: ActionModel, **kwargs) -> Tuple[ActionResult, Any]: | |
logger.info(f"exec {BrowserAction.SEND_KEYS.value.name} action") | |
page = get_page(**kwargs) | |
if page is None: | |
logger.warning(f"{BrowserAction.SEND_KEYS.name} page is none") | |
return ActionResult(content="scroll no page", keep=True), page | |
keys = action.params.get("keys") | |
if not keys: | |
return ActionResult(success=False, content="no keys", keep=True), page | |
try: | |
await page.keyboard.press(keys) | |
except Exception as e: | |
logger.warning(f"{keys} press fail. \n{traceback.format_exc()}") | |
raise e | |
return ActionResult(content=f"Sent keys: {keys}", keep=True), page | |
class WriteToFile(ExecutableAction): | |
def act(self, action: ActionModel, **kwargs) -> Tuple[ActionResult, Any]: | |
# 设置默认文件路径 | |
file_path = "tmp_result.md" | |
# 检查参数中是否有file_path | |
if "file_path" in action.params: | |
file_path = action.params.get("file_path", "tmp_result.md") | |
# 检查参数中是否有file_name | |
elif "file_name" in action.params: | |
file_path = action.params.get("file_name", "tmp_result.md") | |
elif "filename" in action.params: | |
file_path = action.params.get("filename", "tmp_result.md") | |
content = action.params.get("content", "") | |
mode = action.params.get("mode", "a") # Default to append mode | |
# 获取文件的绝对路径 | |
abs_file_path = os.path.abspath(file_path) | |
try: | |
with open(file_path, mode, encoding='utf-8') as f: | |
f.write(content + '\n') | |
msg = f'Successfully wrote content to {abs_file_path}' | |
logger.info(msg) | |
return ActionResult(content=msg, keep=True), get_page(**kwargs) | |
except Exception as e: | |
error_msg = f'Failed to write to file {abs_file_path}: {str(e)}' | |
logger.error(error_msg) | |
return ActionResult(content=error_msg, keep=True, error=error_msg), get_page(**kwargs) | |
async def async_act(self, action: ActionModel, **kwargs) -> Tuple[ActionResult, Any]: | |
# For file operations, we don't need to make this asynchronous | |
return self.act(action, **kwargs) | |
class Done(ExecutableAction): | |
def act(self, action: ActionModel, **kwargs) -> Tuple[ActionResult, Any]: | |
logger.info(f"exec {BrowserAction.DONE.value.name} action") | |
return ActionResult(is_done=True, success=True, content="done", keep=True), get_page(**kwargs) | |
async def async_act(self, action: ActionModel, **kwargs) -> Tuple[ActionResult, Any]: | |
logger.info(f"exec {BrowserAction.DONE.value.name} action") | |
return ActionResult(is_done=True, success=True, content="done", keep=True), get_page(**kwargs) | |