File size: 4,747 Bytes
f75ed7d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# coding: utf-8

# Derived from browser_use DomService, we use it as a utility method, and supports sync and async.

import gc
import json

from typing import Dict, Any, Tuple, Optional

from aworld.utils.async_func import async_func
from examples.tools.browsers.util.dom import DOMElementNode, DOMBaseNode, DOMTextNode, ViewportInfo
from aworld.logs.util import logger


async def async_build_dom_tree(page, js_code: str, args: Dict[str, Any]) -> Tuple[DOMElementNode, Dict[int, DOMElementNode]]:
    if await page.evaluate('1+1') != 2:
        raise ValueError('The page cannot evaluate javascript code properly')

    # NOTE: We execute JS code in the browser to extract important DOM information.
    #       The returned hash map contains information about the DOM tree and the
    #       relationship between the DOM elements.
    try:
        eval_page = await page.evaluate(js_code, args)
    except Exception as e:
        logger.error('Error evaluating JavaScript: %s', e)
        raise

    # Only log performance metrics in debug mode
    if args.get("debugMode") and 'perfMetrics' in eval_page:
        logger.debug('DOM Tree Building Performance Metrics:\n%s', json.dumps(eval_page['perfMetrics'], indent=2))

    return await async_func(_construct_dom_tree)(eval_page)


def build_dom_tree(page, js_code: str, args: Dict[str, Any]) -> Tuple[DOMElementNode, Dict[int, DOMElementNode]]:
    if page.evaluate('1+1') != 2:
        raise ValueError('The page cannot evaluate javascript code properly')

    # NOTE: We execute JS code in the browser to extract important DOM information.
    #       The returned hash map contains information about the DOM tree and the
    #       relationship between the DOM elements.
    try:
        eval_page = page.evaluate(js_code, args)
    except Exception as e:
        logger.error('Error evaluating JavaScript: %s', e)
        raise

    # Only log performance metrics in debug mode
    if args.get("debugMode") and 'perfMetrics' in eval_page:
        logger.debug('DOM Tree Building Performance Metrics:\n%s', json.dumps(eval_page['perfMetrics'], indent=2))

    return _construct_dom_tree(eval_page)


def _construct_dom_tree(eval_page: dict, ) -> tuple[DOMElementNode, Dict[int, DOMElementNode]]:
    js_node_map = eval_page['map']
    js_root_id = eval_page['rootId']

    selector_map = {}
    node_map = {}

    for id, node_data in js_node_map.items():
        node, children_ids = _parse_node(node_data)
        if node is None:
            continue

        node_map[id] = node

        if isinstance(node, DOMElementNode) and node.highlight_index is not None:
            selector_map[node.highlight_index] = node

        # NOTE: We know that we are building the tree bottom up
        #       and all children are already processed.
        if isinstance(node, DOMElementNode):
            for child_id in children_ids:
                if child_id not in node_map:
                    continue

                child_node = node_map[child_id]

                child_node.parent = node
                node.children.append(child_node)

    html_to_dict = node_map[str(js_root_id)]

    del node_map
    del js_node_map
    del js_root_id

    gc.collect()

    if html_to_dict is None or not isinstance(html_to_dict, DOMElementNode):
        raise ValueError('Failed to parse HTML to dictionary')

    return html_to_dict, selector_map


def _parse_node(node_data: dict, ) -> Tuple[Optional[DOMBaseNode], list[int]]:
    if not node_data:
        return None, []

    # Process text nodes immediately
    if node_data.get('type') == 'TEXT_NODE':
        text_node = DOMTextNode(
            text=node_data['text'],
            is_visible=node_data['isVisible'],
            parent=None,
        )
        return text_node, []

    # Process coordinates if they exist for element nodes

    viewport_info = None

    if 'viewport' in node_data:
        viewport_info = ViewportInfo(
            width=node_data['viewport']['width'],
            height=node_data['viewport']['height'],
        )

    element_node = DOMElementNode(
        tag_name=node_data['tagName'],
        xpath=node_data['xpath'],
        attributes=node_data.get('attributes', {}),
        children=[],
        is_visible=node_data.get('isVisible', False),
        is_interactive=node_data.get('isInteractive', False),
        is_top_element=node_data.get('isTopElement', False),
        is_in_viewport=node_data.get('isInViewport', False),
        highlight_index=node_data.get('highlightIndex'),
        shadow_root=node_data.get('shadowRoot', False),
        parent=None,
        viewport_info=viewport_info,
    )

    children_ids = node_data.get('children', [])

    return element_node, children_ids