Spaces:
Sleeping
Sleeping
# coding: utf-8 | |
# Derived from browser_use DomService, we use it as a utility method, and supports sync and async. | |
import gc | |
import json | |
from typing import Dict, Any, Tuple, Optional | |
from aworld.utils.async_func import async_func | |
from examples.tools.browsers.util.dom import DOMElementNode, DOMBaseNode, DOMTextNode, ViewportInfo | |
from aworld.logs.util import logger | |
async def async_build_dom_tree(page, js_code: str, args: Dict[str, Any]) -> Tuple[DOMElementNode, Dict[int, DOMElementNode]]: | |
if await page.evaluate('1+1') != 2: | |
raise ValueError('The page cannot evaluate javascript code properly') | |
# NOTE: We execute JS code in the browser to extract important DOM information. | |
# The returned hash map contains information about the DOM tree and the | |
# relationship between the DOM elements. | |
try: | |
eval_page = await page.evaluate(js_code, args) | |
except Exception as e: | |
logger.error('Error evaluating JavaScript: %s', e) | |
raise | |
# Only log performance metrics in debug mode | |
if args.get("debugMode") and 'perfMetrics' in eval_page: | |
logger.debug('DOM Tree Building Performance Metrics:\n%s', json.dumps(eval_page['perfMetrics'], indent=2)) | |
return await async_func(_construct_dom_tree)(eval_page) | |
def build_dom_tree(page, js_code: str, args: Dict[str, Any]) -> Tuple[DOMElementNode, Dict[int, DOMElementNode]]: | |
if page.evaluate('1+1') != 2: | |
raise ValueError('The page cannot evaluate javascript code properly') | |
# NOTE: We execute JS code in the browser to extract important DOM information. | |
# The returned hash map contains information about the DOM tree and the | |
# relationship between the DOM elements. | |
try: | |
eval_page = page.evaluate(js_code, args) | |
except Exception as e: | |
logger.error('Error evaluating JavaScript: %s', e) | |
raise | |
# Only log performance metrics in debug mode | |
if args.get("debugMode") and 'perfMetrics' in eval_page: | |
logger.debug('DOM Tree Building Performance Metrics:\n%s', json.dumps(eval_page['perfMetrics'], indent=2)) | |
return _construct_dom_tree(eval_page) | |
def _construct_dom_tree(eval_page: dict, ) -> tuple[DOMElementNode, Dict[int, DOMElementNode]]: | |
js_node_map = eval_page['map'] | |
js_root_id = eval_page['rootId'] | |
selector_map = {} | |
node_map = {} | |
for id, node_data in js_node_map.items(): | |
node, children_ids = _parse_node(node_data) | |
if node is None: | |
continue | |
node_map[id] = node | |
if isinstance(node, DOMElementNode) and node.highlight_index is not None: | |
selector_map[node.highlight_index] = node | |
# NOTE: We know that we are building the tree bottom up | |
# and all children are already processed. | |
if isinstance(node, DOMElementNode): | |
for child_id in children_ids: | |
if child_id not in node_map: | |
continue | |
child_node = node_map[child_id] | |
child_node.parent = node | |
node.children.append(child_node) | |
html_to_dict = node_map[str(js_root_id)] | |
del node_map | |
del js_node_map | |
del js_root_id | |
gc.collect() | |
if html_to_dict is None or not isinstance(html_to_dict, DOMElementNode): | |
raise ValueError('Failed to parse HTML to dictionary') | |
return html_to_dict, selector_map | |
def _parse_node(node_data: dict, ) -> Tuple[Optional[DOMBaseNode], list[int]]: | |
if not node_data: | |
return None, [] | |
# Process text nodes immediately | |
if node_data.get('type') == 'TEXT_NODE': | |
text_node = DOMTextNode( | |
text=node_data['text'], | |
is_visible=node_data['isVisible'], | |
parent=None, | |
) | |
return text_node, [] | |
# Process coordinates if they exist for element nodes | |
viewport_info = None | |
if 'viewport' in node_data: | |
viewport_info = ViewportInfo( | |
width=node_data['viewport']['width'], | |
height=node_data['viewport']['height'], | |
) | |
element_node = DOMElementNode( | |
tag_name=node_data['tagName'], | |
xpath=node_data['xpath'], | |
attributes=node_data.get('attributes', {}), | |
children=[], | |
is_visible=node_data.get('isVisible', False), | |
is_interactive=node_data.get('isInteractive', False), | |
is_top_element=node_data.get('isTopElement', False), | |
is_in_viewport=node_data.get('isInViewport', False), | |
highlight_index=node_data.get('highlightIndex'), | |
shadow_root=node_data.get('shadowRoot', False), | |
parent=None, | |
viewport_info=viewport_info, | |
) | |
children_ids = node_data.get('children', []) | |
return element_node, children_ids | |