File size: 18,950 Bytes
ea7486e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
# coding: utf-8
# Copyright (c) 2025 inclusionAI.

import re
import time
import traceback
from typing import Optional

from examples.tools.browsers.util.dom import DOMElementNode
from aworld.logs.util import logger
from aworld.utils import import_package


class DomUtil:
    def __init__(self):
        import_package("playwright")

    @staticmethod
    async def async_click_element(page, element_node: DOMElementNode, **kwargs) -> Optional[str]:
        from playwright.async_api import ElementHandle as AElementHandle, BrowserContext as ABrowserContext

        try:
            element_handle: AElementHandle = await DomUtil.async_get_locate_element(page, element_node)
            if element_handle is None:
                raise Exception(f'Element: {repr(element_node)} not found')

            bound = await element_handle.bounding_box()
            try:
                # todo: iframe.
                center_x = bound['x'] + bound['width'] / 2
                center_y = bound['y'] + bound['height'] / 2

                try:
                    browser: ABrowserContext = kwargs.get('browser')
                    async with browser.expect_page() as new_page_info:
                        await page.mouse.click(center_x, center_y)
                    await page.mouse.click(center_x, center_y)
                    await page.wait_for_load_state()
                except:
                    logger.warning(traceback.format_exc())
            except:
                logger.info(f"click {element_handle}!!")
                if await element_handle.text_content():
                    browser: ABrowserContext = kwargs.get('browser')
                    if browser:
                        try:
                            async with browser.expect_page() as new_page_info:
                                await page.click(f"text={element_handle.text_content()}")
                            page = await new_page_info.value
                            await page.wait_for_load_state()
                        except:
                            logger.warning(traceback.format_exc())
                    else:
                        await element_handle.click()
                        await page.wait_for_load_state()
                else:
                    await element_handle.click()
                    await page.wait_for_load_state()
        except Exception as e:
            logger.error(traceback.format_exc())
            raise Exception(f'Failed to click element: {repr(element_node)}. Error: {str(e)}')

    @staticmethod
    def click_element(page, element_node: DOMElementNode, **kwargs) -> Optional[str]:
        from playwright.sync_api import ElementHandle, BrowserContext

        try:
            element_handle: ElementHandle = DomUtil.get_locate_element(page, element_node)
            if element_handle is None:
                raise Exception(f'Element: {repr(element_node)} not found')

            bound = element_handle.bounding_box()
            try:
                # todo: iframe.
                center_x = bound['x'] + bound['width'] / 2
                center_y = bound['y'] + bound['height'] / 2

                try:
                    browser: BrowserContext = kwargs.get('browser')
                    with browser.expect_page() as new_page_info:
                        page.mouse.click(center_x, center_y)
                    page = new_page_info.value
                    page.wait_for_load_state()
                except:
                    logger.warning(traceback.format_exc())
            except:
                logger.info(f"click {element_handle}!!")
                if element_handle.text_content():
                    browser: BrowserContext = kwargs.get('browser')
                    if browser:
                        try:
                            with browser.expect_page() as new_page_info:
                                page.click(f"text={element_handle.text_content()}")
                            page = new_page_info.value
                            page.wait_for_load_state()
                        except:
                            logger.warning(traceback.format_exc())
                    else:
                        element_handle.click()
                        page.wait_for_load_state()
                else:
                    element_handle.click()
                    page.wait_for_load_state()
        except Exception as e:
            logger.error(traceback.format_exc())
            raise Exception(f'Failed to click element: {repr(element_node)}. Error: {str(e)}')

    @staticmethod
    async def async_get_locate_element(current_frame, element: DOMElementNode):
        # Start with the target element and collect all parents, return Optional[AElementHandle]
        from playwright.async_api import FrameLocator as AFrameLocator

        parents: list[DOMElementNode] = []
        current = element
        while current.parent is not None:
            parent = current.parent
            parents.append(parent)
            current = parent

        # Reverse the parents list to process from top to bottom
        parents.reverse()

        # Process all iframe parents in sequence
        iframes = [item for item in parents if item.tag_name == 'iframe']
        for parent in iframes:
            css_selector = DomUtil._enhanced_css_selector_for_element(
                parent,
                include_dynamic_attributes=True,
            )
            current_frame = current_frame.frame_locator(css_selector)

        css_selector = DomUtil._enhanced_css_selector_for_element(
            element, include_dynamic_attributes=True
        )

        try:
            if isinstance(current_frame, AFrameLocator):
                element_handle = await current_frame.locator(css_selector).element_handle()
                return element_handle
            else:
                # Try to scroll into view if hidden
                element_handle = await current_frame.query_selector(css_selector)
                if element_handle:
                    await element_handle.scroll_into_view_if_needed()
                    return element_handle
                return None
        except Exception as e:
            logger.error(f'Failed to locate element: {str(e)}')
            return None

    @staticmethod
    def get_locate_element(current_frame, element: DOMElementNode):
        # Start with the target element and collect all parents
        from playwright.sync_api import FrameLocator

        parents: list[DOMElementNode] = []
        current = element
        while current.parent is not None:
            parent = current.parent
            parents.append(parent)
            current = parent

        # Reverse the parents list to process from top to bottom
        parents.reverse()

        # Process all iframe parents in sequence
        iframes = [item for item in parents if item.tag_name == 'iframe']
        for parent in iframes:
            css_selector = DomUtil._enhanced_css_selector_for_element(
                parent,
                include_dynamic_attributes=True,
            )
            current_frame = current_frame.frame_locator(css_selector)

        css_selector = DomUtil._enhanced_css_selector_for_element(
            element, include_dynamic_attributes=True
        )

        try:
            if isinstance(current_frame, FrameLocator):
                element_handle = current_frame.locator(css_selector).element_handle()
                return element_handle
            else:
                # Try to scroll into view if hidden
                element_handle = current_frame.query_selector(css_selector)
                if element_handle:
                    element_handle.scroll_into_view_if_needed()
                    return element_handle
                return None
        except Exception as e:
            logger.error(f'Failed to locate element: {str(e)}')
            return None

    @staticmethod
    def wait_for_stable_network(page, **kwargs):
        pending_requests = set()
        last_activity = time.time()

        # Define relevant resource types and content types
        RELEVANT_RESOURCE_TYPES = {
            'document',
            'stylesheet',
            'image',
            'font',
            'script',
            'iframe',
        }

        RELEVANT_CONTENT_TYPES = {
            'text/html',
            'text/css',
            'application/javascript',
            'image/',
            'font/',
            'application/json',
        }

        # Additional patterns to filter out
        IGNORED_URL_PATTERNS = {
            # Analytics and tracking
            'analytics',
            'tracking',
            'telemetry',
            'beacon',
            'metrics',
            # Ad-related
            'doubleclick',
            'adsystem',
            'adserver',
            'advertising',
            # Social media widgets
            'facebook.com/plugins',
            'platform.twitter',
            'linkedin.com/embed',
            # Live chat and support
            'livechat',
            'zendesk',
            'intercom',
            'crisp.chat',
            'hotjar',
            # Push notifications
            'push-notifications',
            'onesignal',
            'pushwoosh',
            # Background sync/heartbeat
            'heartbeat',
            'ping',
            'alive',
            # WebRTC and streaming
            'webrtc',
            'rtmp://',
            'wss://',
            # Common CDNs for dynamic content
            'cloudfront.net',
            'fastly.net',
        }

        def on_request(request):
            # Filter by resource type
            if request.resource_type not in RELEVANT_RESOURCE_TYPES:
                return

            # Filter out streaming, websocket, and other real-time requests
            if request.resource_type in {
                'websocket',
                'media',
                'eventsource',
                'manifest',
                'other',
            }:
                return

            # Filter out by URL patterns
            url = request.url.lower()
            if any(pattern in url for pattern in IGNORED_URL_PATTERNS):
                return

            # Filter out data URLs and blob URLs
            if url.startswith(('data:', 'blob:')):
                return

            # Filter out requests with certain headers
            headers = request.headers
            if headers.get('purpose') == 'prefetch' or headers.get('sec-fetch-dest') in [
                'video',
                'audio',
            ]:
                return

            nonlocal last_activity
            pending_requests.add(request)
            last_activity = time.time()

        def on_response(response):
            request = response.request
            if request not in pending_requests:
                return

            # Filter by content type if available
            content_type = response.headers.get('content-type', '').lower()

            # Skip if content type indicates streaming or real-time data
            if any(t in content_type
                   for t in [
                       'streaming',
                       'video',
                       'audio',
                       'webm',
                       'mp4',
                       'event-stream',
                       'websocket',
                       'protobuf']):
                pending_requests.remove(request)
                return

            # Only process relevant content types
            if not any(ct in content_type for ct in RELEVANT_CONTENT_TYPES):
                pending_requests.remove(request)
                return

            # Skip if response is too large (likely not essential for page load)
            content_length = response.headers.get('content-length')
            if content_length and int(content_length) > 5 * 1024 * 1024:  # 5MB
                pending_requests.remove(request)
                return

            nonlocal last_activity
            pending_requests.remove(request)
            last_activity = time.time()

        # Attach event listeners
        page.on('request', on_request)
        page.on('response', on_response)

        try:
            start_time = time.time()
            while True:
                time.sleep(0.1)
                now = time.time()

                if len(pending_requests) == 0 and (now - last_activity) >= kwargs.get('idle_wait_time', 0.5):
                    break
                if now - start_time > kwargs.get('max_wait_time', 5):
                    logger.debug(
                        f'Network timeout after {kwargs.get("max_wait_time", 5)}s with {len(pending_requests)} '
                        f'pending requests: {[r.url for r in pending_requests]}'
                    )
                    break

        finally:
            # Clean up event listeners
            page.remove_listener('request', on_request)
            page.remove_listener('response', on_response)
        logger.debug(f'Network stabilized for {kwargs.get("idle_wait_time", 0.5)} seconds')

    @staticmethod
    def _enhanced_css_selector_for_element(element: DOMElementNode, include_dynamic_attributes: bool = True) -> str:
        """Creates a CSS selector for a DOM element, handling various edge cases and special characters.

        Args:
                element: The DOM element to create a selector for

        Returns:
                A valid CSS selector string
        """
        try:
            # Get base selector from XPath
            css_selector = DomUtil._convert_simple_xpath_to_css_selector(element.xpath)

            # Handle class attributes
            if 'class' in element.attributes and element.attributes['class'] and include_dynamic_attributes:
                # Define a regex pattern for valid class names in CSS
                valid_class_name_pattern = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_-]*$')

                # Iterate through the class attribute values
                classes = element.attributes['class'].split()
                for class_name in classes:
                    # Skip empty class names
                    if not class_name.strip():
                        continue

                    # Check if the class name is valid
                    if valid_class_name_pattern.match(class_name):
                        # Append the valid class name to the CSS selector
                        css_selector += f'.{class_name}'
                    else:
                        # Skip invalid class names
                        continue

            # Expanded set of safe attributes that are stable and useful for selection
            SAFE_ATTRIBUTES = {
                # Data attributes (if they're stable in your application)
                'id',
                # Standard HTML attributes
                'name',
                'type',
                'placeholder',
                # Accessibility attributes
                'aria-label',
                'aria-labelledby',
                'aria-describedby',
                'role',
                # Common form attributes
                'for',
                'autocomplete',
                'required',
                'readonly',
                # Media attributes
                'alt',
                'title',
                'src',
                # Custom stable attributes (add any application-specific ones)
                'href',
                'target',
            }

            if include_dynamic_attributes:
                dynamic_attributes = {
                    'data-id',
                    'data-qa',
                    'data-cy',
                    'data-testid',
                }
                SAFE_ATTRIBUTES.update(dynamic_attributes)

            # Handle other attributes
            for attribute, value in element.attributes.items():
                if attribute == 'class':
                    continue

                # Skip invalid attribute names
                if not attribute.strip():
                    continue

                if attribute not in SAFE_ATTRIBUTES:
                    continue

                # Escape special characters in attribute names
                safe_attribute = attribute.replace(':', r'\:')

                # Handle different value cases
                if value == '':
                    css_selector += f'[{safe_attribute}]'
                elif any(char in value for char in '"\'<>`\n\r\t'):
                    # Use contains for values with special characters
                    # Regex-substitute *any* whitespace with a single space, then strip.
                    collapsed_value = re.sub(r'\s+', ' ', value).strip()
                    # Escape embedded double-quotes.
                    safe_value = collapsed_value.replace('"', '\\"')
                    css_selector += f'[{safe_attribute}*="{safe_value}"]'
                else:
                    css_selector += f'[{safe_attribute}="{value}"]'

            return css_selector

        except Exception:
            # Fallback to a more basic selector if something goes wrong
            tag_name = element.tag_name or '*'
            return f"{tag_name}[highlight_index='{element.highlight_index}']"

    @staticmethod
    def _convert_simple_xpath_to_css_selector(xpath: str) -> str:
        """Converts simple XPath expressions to CSS selectors."""
        if not xpath:
            return ''

        # Remove leading slash if present
        xpath = xpath.lstrip('/')

        # Split into parts
        parts = xpath.split('/')
        css_parts = []

        for part in parts:
            if not part:
                continue

            # Handle index notation [n]
            if '[' in part:
                base_part = part[: part.find('[')]
                index_part = part[part.find('['):]

                # Handle multiple indices
                indices = [i.strip('[]') for i in index_part.split(']')[:-1]]

                for idx in indices:
                    try:
                        # Handle numeric indices
                        if idx.isdigit():
                            index = int(idx) - 1
                            base_part += f':nth-of-type({index + 1})'
                        # Handle last() function
                        elif idx == 'last()':
                            base_part += ':last-of-type'
                        # Handle position() functions
                        elif 'position()' in idx:
                            if '>1' in idx:
                                base_part += ':nth-of-type(n+2)'
                    except ValueError:
                        continue

                css_parts.append(base_part)
            else:
                css_parts.append(part)

        base_selector = ' > '.join(css_parts)
        return base_selector