Duibonduil's picture
Upload 2 files
f75ed7d verified
# coding: utf-8
# Derived from browser_use DomService, we use it as a utility method, and supports sync and async.
import gc
import json
from typing import Dict, Any, Tuple, Optional
from aworld.utils.async_func import async_func
from examples.tools.browsers.util.dom import DOMElementNode, DOMBaseNode, DOMTextNode, ViewportInfo
from aworld.logs.util import logger
async def async_build_dom_tree(page, js_code: str, args: Dict[str, Any]) -> Tuple[DOMElementNode, Dict[int, DOMElementNode]]:
if await page.evaluate('1+1') != 2:
raise ValueError('The page cannot evaluate javascript code properly')
# NOTE: We execute JS code in the browser to extract important DOM information.
# The returned hash map contains information about the DOM tree and the
# relationship between the DOM elements.
try:
eval_page = await page.evaluate(js_code, args)
except Exception as e:
logger.error('Error evaluating JavaScript: %s', e)
raise
# Only log performance metrics in debug mode
if args.get("debugMode") and 'perfMetrics' in eval_page:
logger.debug('DOM Tree Building Performance Metrics:\n%s', json.dumps(eval_page['perfMetrics'], indent=2))
return await async_func(_construct_dom_tree)(eval_page)
def build_dom_tree(page, js_code: str, args: Dict[str, Any]) -> Tuple[DOMElementNode, Dict[int, DOMElementNode]]:
if page.evaluate('1+1') != 2:
raise ValueError('The page cannot evaluate javascript code properly')
# NOTE: We execute JS code in the browser to extract important DOM information.
# The returned hash map contains information about the DOM tree and the
# relationship between the DOM elements.
try:
eval_page = page.evaluate(js_code, args)
except Exception as e:
logger.error('Error evaluating JavaScript: %s', e)
raise
# Only log performance metrics in debug mode
if args.get("debugMode") and 'perfMetrics' in eval_page:
logger.debug('DOM Tree Building Performance Metrics:\n%s', json.dumps(eval_page['perfMetrics'], indent=2))
return _construct_dom_tree(eval_page)
def _construct_dom_tree(eval_page: dict, ) -> tuple[DOMElementNode, Dict[int, DOMElementNode]]:
js_node_map = eval_page['map']
js_root_id = eval_page['rootId']
selector_map = {}
node_map = {}
for id, node_data in js_node_map.items():
node, children_ids = _parse_node(node_data)
if node is None:
continue
node_map[id] = node
if isinstance(node, DOMElementNode) and node.highlight_index is not None:
selector_map[node.highlight_index] = node
# NOTE: We know that we are building the tree bottom up
# and all children are already processed.
if isinstance(node, DOMElementNode):
for child_id in children_ids:
if child_id not in node_map:
continue
child_node = node_map[child_id]
child_node.parent = node
node.children.append(child_node)
html_to_dict = node_map[str(js_root_id)]
del node_map
del js_node_map
del js_root_id
gc.collect()
if html_to_dict is None or not isinstance(html_to_dict, DOMElementNode):
raise ValueError('Failed to parse HTML to dictionary')
return html_to_dict, selector_map
def _parse_node(node_data: dict, ) -> Tuple[Optional[DOMBaseNode], list[int]]:
if not node_data:
return None, []
# Process text nodes immediately
if node_data.get('type') == 'TEXT_NODE':
text_node = DOMTextNode(
text=node_data['text'],
is_visible=node_data['isVisible'],
parent=None,
)
return text_node, []
# Process coordinates if they exist for element nodes
viewport_info = None
if 'viewport' in node_data:
viewport_info = ViewportInfo(
width=node_data['viewport']['width'],
height=node_data['viewport']['height'],
)
element_node = DOMElementNode(
tag_name=node_data['tagName'],
xpath=node_data['xpath'],
attributes=node_data.get('attributes', {}),
children=[],
is_visible=node_data.get('isVisible', False),
is_interactive=node_data.get('isInteractive', False),
is_top_element=node_data.get('isTopElement', False),
is_in_viewport=node_data.get('isInViewport', False),
highlight_index=node_data.get('highlightIndex'),
shadow_root=node_data.get('shadowRoot', False),
parent=None,
viewport_info=viewport_info,
)
children_ids = node_data.get('children', [])
return element_node, children_ids