|
from fastapi import FastAPI, APIRouter, HTTPException, Body |
|
from playwright.async_api import async_playwright, Browser, Page, ElementHandle |
|
from pydantic import BaseModel |
|
from typing import Optional, List, Dict, Any, Union |
|
import asyncio |
|
import json |
|
import logging |
|
import re |
|
import base64 |
|
from dataclasses import dataclass, field |
|
from datetime import datetime |
|
import os |
|
import random |
|
from functools import cached_property |
|
import traceback |
|
import pytesseract |
|
from PIL import Image |
|
import io |
|
|
|
|
|
|
|
|
|
|
|
class Position(BaseModel): |
|
x: int |
|
y: int |
|
|
|
class ClickElementAction(BaseModel): |
|
index: int |
|
|
|
class ClickCoordinatesAction(BaseModel): |
|
x: int |
|
y: int |
|
|
|
class GoToUrlAction(BaseModel): |
|
url: str |
|
|
|
class InputTextAction(BaseModel): |
|
index: int |
|
text: str |
|
|
|
class ScrollAction(BaseModel): |
|
amount: Optional[int] = None |
|
|
|
class SendKeysAction(BaseModel): |
|
keys: str |
|
|
|
class SearchGoogleAction(BaseModel): |
|
query: str |
|
|
|
class SwitchTabAction(BaseModel): |
|
page_id: int |
|
|
|
class OpenTabAction(BaseModel): |
|
url: str |
|
|
|
class CloseTabAction(BaseModel): |
|
page_id: int |
|
|
|
class NoParamsAction(BaseModel): |
|
pass |
|
|
|
class DragDropAction(BaseModel): |
|
element_source: Optional[str] = None |
|
element_target: Optional[str] = None |
|
element_source_offset: Optional[Position] = None |
|
element_target_offset: Optional[Position] = None |
|
coord_source_x: Optional[int] = None |
|
coord_source_y: Optional[int] = None |
|
coord_target_x: Optional[int] = None |
|
coord_target_y: Optional[int] = None |
|
steps: Optional[int] = 10 |
|
delay_ms: Optional[int] = 5 |
|
|
|
class DoneAction(BaseModel): |
|
success: bool = True |
|
text: str = "" |
|
|
|
|
|
|
|
|
|
|
|
@dataclass |
|
class CoordinateSet: |
|
x: int = 0 |
|
y: int = 0 |
|
width: int = 0 |
|
height: int = 0 |
|
|
|
@dataclass |
|
class ViewportInfo: |
|
width: int = 0 |
|
height: int = 0 |
|
scroll_x: int = 0 |
|
scroll_y: int = 0 |
|
|
|
@dataclass |
|
class HashedDomElement: |
|
tag_name: str |
|
attributes: Dict[str, str] |
|
is_visible: bool |
|
page_coordinates: Optional[CoordinateSet] = None |
|
|
|
@dataclass |
|
class DOMBaseNode: |
|
is_visible: bool |
|
parent: Optional['DOMElementNode'] = None |
|
|
|
@dataclass |
|
class DOMTextNode(DOMBaseNode): |
|
text: str = field(default="") |
|
type: str = 'TEXT_NODE' |
|
|
|
def has_parent_with_highlight_index(self) -> bool: |
|
current = self.parent |
|
while current is not None: |
|
if current.highlight_index is not None: |
|
return True |
|
current = current.parent |
|
return False |
|
|
|
@dataclass |
|
class DOMElementNode(DOMBaseNode): |
|
tag_name: str = field(default="") |
|
xpath: str = field(default="") |
|
attributes: Dict[str, str] = field(default_factory=dict) |
|
children: List['DOMBaseNode'] = field(default_factory=list) |
|
|
|
is_interactive: bool = False |
|
is_top_element: bool = False |
|
is_in_viewport: bool = False |
|
shadow_root: bool = False |
|
highlight_index: Optional[int] = None |
|
viewport_coordinates: Optional[CoordinateSet] = None |
|
page_coordinates: Optional[CoordinateSet] = None |
|
viewport_info: Optional[ViewportInfo] = None |
|
|
|
def __repr__(self) -> str: |
|
tag_str = f'<{self.tag_name}' |
|
for key, value in self.attributes.items(): |
|
tag_str += f' {key}="{value}"' |
|
tag_str += '>' |
|
|
|
extras = [] |
|
if self.is_interactive: |
|
extras.append('interactive') |
|
if self.is_top_element: |
|
extras.append('top') |
|
if self.highlight_index is not None: |
|
extras.append(f'highlight:{self.highlight_index}') |
|
|
|
if extras: |
|
tag_str += f' [{", ".join(extras)}]' |
|
|
|
return tag_str |
|
|
|
@cached_property |
|
def hash(self) -> HashedDomElement: |
|
return HashedDomElement( |
|
tag_name=self.tag_name, |
|
attributes=self.attributes, |
|
is_visible=self.is_visible, |
|
page_coordinates=self.page_coordinates |
|
) |
|
|
|
def get_all_text_till_next_clickable_element(self, max_depth: int = -1) -> str: |
|
text_parts = [] |
|
|
|
def collect_text(node: DOMBaseNode, current_depth: int) -> None: |
|
if max_depth != -1 and current_depth > max_depth: |
|
return |
|
|
|
if isinstance(node, DOMElementNode) and node != self and node.highlight_index is not None: |
|
return |
|
|
|
if isinstance(node, DOMTextNode): |
|
text_parts.append(node.text) |
|
elif isinstance(node, DOMElementNode): |
|
for child in node.children: |
|
collect_text(child, current_depth + 1) |
|
|
|
collect_text(self, 0) |
|
return '\n'.join(text_parts).strip() |
|
|
|
def clickable_elements_to_string(self, include_attributes: list[str] | None = None) -> str: |
|
"""Convert the processed DOM content to HTML.""" |
|
formatted_text = [] |
|
|
|
def process_node(node: DOMBaseNode, depth: int) -> None: |
|
if isinstance(node, DOMElementNode): |
|
|
|
if node.highlight_index is not None: |
|
attributes_str = '' |
|
text = node.get_all_text_till_next_clickable_element() |
|
|
|
|
|
display_attributes = [] |
|
if include_attributes: |
|
for key, value in node.attributes.items(): |
|
if key in include_attributes and value and value != node.tag_name: |
|
if text and value in text: |
|
continue |
|
display_attributes.append(str(value)) |
|
|
|
attributes_str = ';'.join(display_attributes) |
|
|
|
|
|
line = f'[{node.highlight_index}]<{node.tag_name}' |
|
|
|
|
|
for attr_name in ['id', 'href', 'name', 'value', 'type']: |
|
if attr_name in node.attributes and node.attributes[attr_name]: |
|
line += f' {attr_name}="{node.attributes[attr_name]}"' |
|
|
|
|
|
if text: |
|
line += f'> {text}' |
|
elif attributes_str: |
|
line += f'> {attributes_str}' |
|
else: |
|
|
|
line += f'> {node.tag_name.upper()}' |
|
|
|
line += ' </>' |
|
formatted_text.append(line) |
|
|
|
|
|
for child in node.children: |
|
process_node(child, depth + 1) |
|
|
|
elif isinstance(node, DOMTextNode): |
|
|
|
if not node.has_parent_with_highlight_index() and node.is_visible: |
|
if node.text and node.text.strip(): |
|
formatted_text.append(node.text) |
|
|
|
process_node(self, 0) |
|
result = '\n'.join(formatted_text) |
|
return result if result.strip() else "No interactive elements found" |
|
|
|
@dataclass |
|
class DOMState: |
|
element_tree: DOMElementNode |
|
selector_map: Dict[int, DOMElementNode] |
|
url: str = "" |
|
title: str = "" |
|
pixels_above: int = 0 |
|
pixels_below: int = 0 |
|
|
|
|
|
|
|
|
|
|
|
class BrowserActionResult(BaseModel): |
|
success: bool = True |
|
message: str = "" |
|
error: str = "" |
|
|
|
|
|
url: Optional[str] = None |
|
title: Optional[str] = None |
|
elements: Optional[str] = None |
|
screenshot_base64: Optional[str] = None |
|
pixels_above: int = 0 |
|
pixels_below: int = 0 |
|
content: Optional[str] = None |
|
ocr_text: Optional[str] = None |
|
|
|
|
|
element_count: int = 0 |
|
interactive_elements: Optional[List[Dict[str, Any]]] = None |
|
viewport_width: Optional[int] = None |
|
viewport_height: Optional[int] = None |
|
|
|
class Config: |
|
arbitrary_types_allowed = True |
|
|
|
|
|
|
|
|
|
|
|
class BrowserAutomation: |
|
def __init__(self): |
|
self.router = APIRouter() |
|
self.browser: Browser = None |
|
self.pages: List[Page] = [] |
|
self.current_page_index: int = 0 |
|
self.logger = logging.getLogger("browser_automation") |
|
self.include_attributes = ["id", "href", "src", "alt", "aria-label", "placeholder", "name", "role", "title", "value"] |
|
self.screenshot_dir = os.path.join(os.getcwd(), "screenshots") |
|
os.makedirs(self.screenshot_dir, exist_ok=True) |
|
|
|
|
|
self.router.on_startup.append(self.startup) |
|
self.router.on_shutdown.append(self.shutdown) |
|
|
|
|
|
self.router.post("/automation/navigate_to")(self.navigate_to) |
|
self.router.post("/automation/search_google")(self.search_google) |
|
self.router.post("/automation/go_back")(self.go_back) |
|
self.router.post("/automation/wait")(self.wait) |
|
|
|
|
|
self.router.post("/automation/click_element")(self.click_element) |
|
self.router.post("/automation/click_coordinates")(self.click_coordinates) |
|
self.router.post("/automation/input_text")(self.input_text) |
|
self.router.post("/automation/send_keys")(self.send_keys) |
|
|
|
|
|
self.router.post("/automation/switch_tab")(self.switch_tab) |
|
self.router.post("/automation/open_tab")(self.open_tab) |
|
self.router.post("/automation/close_tab")(self.close_tab) |
|
|
|
|
|
self.router.post("/automation/extract_content")(self.extract_content) |
|
self.router.post("/automation/save_pdf")(self.save_pdf) |
|
|
|
|
|
self.router.post("/automation/scroll_down")(self.scroll_down) |
|
self.router.post("/automation/scroll_up")(self.scroll_up) |
|
self.router.post("/automation/scroll_to_text")(self.scroll_to_text) |
|
|
|
|
|
self.router.post("/automation/get_dropdown_options")(self.get_dropdown_options) |
|
self.router.post("/automation/select_dropdown_option")(self.select_dropdown_option) |
|
|
|
|
|
self.router.post("/automation/drag_drop")(self.drag_drop) |
|
|
|
async def startup(self): |
|
"""Initialize the browser instance on startup""" |
|
try: |
|
print("Starting browser initialization...") |
|
playwright = await async_playwright().start() |
|
print("Playwright started, launching browser...") |
|
|
|
|
|
launch_options = { |
|
"headless": False, |
|
"timeout": 60000 |
|
} |
|
|
|
try: |
|
self.browser = await playwright.chromium.launch(**launch_options) |
|
print("Browser launched successfully") |
|
except Exception as browser_error: |
|
print(f"Failed to launch browser: {browser_error}") |
|
|
|
print("Retrying with minimal options...") |
|
launch_options = {"timeout": 90000} |
|
self.browser = await playwright.chromium.launch(**launch_options) |
|
print("Browser launched with minimal options") |
|
|
|
try: |
|
await self.get_current_page() |
|
print("Found existing page, using it") |
|
self.current_page_index = 0 |
|
except Exception as page_error: |
|
print(f"Error finding existing page, creating new one. ( {page_error})") |
|
page = await self.browser.new_page() |
|
print("New page created successfully") |
|
self.pages.append(page) |
|
self.current_page_index = 0 |
|
|
|
|
|
print("Navigated to google.com") |
|
|
|
print("Browser initialization completed successfully") |
|
except Exception as e: |
|
print(f"Browser startup error: {str(e)}") |
|
traceback.print_exc() |
|
raise RuntimeError(f"Browser initialization failed: {str(e)}") |
|
|
|
async def shutdown(self): |
|
"""Clean up browser instance on shutdown""" |
|
if self.browser: |
|
await self.browser.close() |
|
|
|
async def get_current_page(self) -> Page: |
|
"""Get the current active page""" |
|
if not self.pages: |
|
raise HTTPException(status_code=500, detail="No browser pages available") |
|
return self.pages[self.current_page_index] |
|
|
|
async def get_selector_map(self) -> Dict[int, DOMElementNode]: |
|
"""Get a map of selectable elements on the page""" |
|
page = await self.get_current_page() |
|
|
|
|
|
selector_map = {} |
|
|
|
try: |
|
|
|
elements_js = """ |
|
(() => { |
|
// Helper function to get all attributes as an object |
|
function getAttributes(el) { |
|
const attributes = {}; |
|
for (const attr of el.attributes) { |
|
attributes[attr.name] = attr.value; |
|
} |
|
return attributes; |
|
} |
|
|
|
// Find all potentially interactive elements |
|
const interactiveElements = Array.from(document.querySelectorAll( |
|
'a, button, input, select, textarea, [role="button"], [role="link"], [role="checkbox"], [role="radio"], [tabindex]:not([tabindex="-1"])' |
|
)); |
|
|
|
// Filter for visible elements |
|
const visibleElements = interactiveElements.filter(el => { |
|
const style = window.getComputedStyle(el); |
|
const rect = el.getBoundingClientRect(); |
|
return style.display !== 'none' && |
|
style.visibility !== 'hidden' && |
|
style.opacity !== '0' && |
|
rect.width > 0 && |
|
rect.height > 0; |
|
}); |
|
|
|
// Map to our expected structure |
|
return visibleElements.map((el, index) => { |
|
const rect = el.getBoundingClientRect(); |
|
const isInViewport = rect.top >= 0 && |
|
rect.left >= 0 && |
|
rect.bottom <= window.innerHeight && |
|
rect.right <= window.innerWidth; |
|
|
|
return { |
|
index: index + 1, |
|
tagName: el.tagName.toLowerCase(), |
|
text: el.innerText || el.value || '', |
|
attributes: getAttributes(el), |
|
isVisible: true, |
|
isInteractive: true, |
|
pageCoordinates: { |
|
x: rect.left + window.scrollX, |
|
y: rect.top + window.scrollY, |
|
width: rect.width, |
|
height: rect.height |
|
}, |
|
viewportCoordinates: { |
|
x: rect.left, |
|
y: rect.top, |
|
width: rect.width, |
|
height: rect.height |
|
}, |
|
isInViewport: isInViewport |
|
}; |
|
}); |
|
})(); |
|
""" |
|
|
|
elements = await page.evaluate(elements_js) |
|
print(f"Found {len(elements)} interactive elements in selector map") |
|
|
|
|
|
root = DOMElementNode( |
|
is_visible=True, |
|
tag_name="body", |
|
is_interactive=False, |
|
is_top_element=True |
|
) |
|
|
|
|
|
for idx, el in enumerate(elements): |
|
|
|
page_coordinates = None |
|
viewport_coordinates = None |
|
|
|
if 'pageCoordinates' in el: |
|
coords = el['pageCoordinates'] |
|
page_coordinates = CoordinateSet( |
|
x=coords.get('x', 0), |
|
y=coords.get('y', 0), |
|
width=coords.get('width', 0), |
|
height=coords.get('height', 0) |
|
) |
|
|
|
if 'viewportCoordinates' in el: |
|
coords = el['viewportCoordinates'] |
|
viewport_coordinates = CoordinateSet( |
|
x=coords.get('x', 0), |
|
y=coords.get('y', 0), |
|
width=coords.get('width', 0), |
|
height=coords.get('height', 0) |
|
) |
|
|
|
|
|
element_node = DOMElementNode( |
|
is_visible=el.get('isVisible', True), |
|
tag_name=el.get('tagName', 'div'), |
|
attributes=el.get('attributes', {}), |
|
is_interactive=el.get('isInteractive', True), |
|
is_in_viewport=el.get('isInViewport', False), |
|
highlight_index=el.get('index', idx + 1), |
|
page_coordinates=page_coordinates, |
|
viewport_coordinates=viewport_coordinates |
|
) |
|
|
|
|
|
if el.get('text'): |
|
text_node = DOMTextNode(is_visible=True, text=el.get('text', '')) |
|
text_node.parent = element_node |
|
element_node.children.append(text_node) |
|
|
|
selector_map[el.get('index', idx + 1)] = element_node |
|
root.children.append(element_node) |
|
element_node.parent = root |
|
|
|
except Exception as e: |
|
print(f"Error getting selector map: {e}") |
|
traceback.print_exc() |
|
|
|
dummy = DOMElementNode( |
|
is_visible=True, |
|
tag_name="a", |
|
attributes={'href': '#'}, |
|
is_interactive=True, |
|
highlight_index=1 |
|
) |
|
dummy_text = DOMTextNode(is_visible=True, text="Dummy Element") |
|
dummy_text.parent = dummy |
|
dummy.children.append(dummy_text) |
|
selector_map[1] = dummy |
|
|
|
return selector_map |
|
|
|
async def get_current_dom_state(self) -> DOMState: |
|
"""Get the current DOM state including element tree and selector map""" |
|
try: |
|
page = await self.get_current_page() |
|
selector_map = await self.get_selector_map() |
|
|
|
|
|
root = DOMElementNode( |
|
is_visible=True, |
|
tag_name="body", |
|
is_interactive=False, |
|
is_top_element=True |
|
) |
|
|
|
|
|
for element in selector_map.values(): |
|
if element.parent is None: |
|
element.parent = root |
|
root.children.append(element) |
|
|
|
|
|
url = page.url |
|
try: |
|
title = await page.title() |
|
except: |
|
title = "Unknown Title" |
|
|
|
|
|
try: |
|
scroll_info = await page.evaluate(""" |
|
() => { |
|
const body = document.body; |
|
const html = document.documentElement; |
|
const totalHeight = Math.max( |
|
body.scrollHeight, body.offsetHeight, |
|
html.clientHeight, html.scrollHeight, html.offsetHeight |
|
); |
|
const scrollY = window.scrollY || window.pageYOffset; |
|
const windowHeight = window.innerHeight; |
|
|
|
return { |
|
pixelsAbove: scrollY, |
|
pixelsBelow: Math.max(0, totalHeight - scrollY - windowHeight), |
|
totalHeight: totalHeight, |
|
viewportHeight: windowHeight |
|
}; |
|
} |
|
""") |
|
pixels_above = scroll_info.get('pixelsAbove', 0) |
|
pixels_below = scroll_info.get('pixelsBelow', 0) |
|
except Exception as e: |
|
print(f"Error getting scroll info: {e}") |
|
pixels_above = 0 |
|
pixels_below = 0 |
|
|
|
return DOMState( |
|
element_tree=root, |
|
selector_map=selector_map, |
|
url=url, |
|
title=title, |
|
pixels_above=pixels_above, |
|
pixels_below=pixels_below |
|
) |
|
except Exception as e: |
|
print(f"Error getting DOM state: {e}") |
|
traceback.print_exc() |
|
|
|
dummy_root = DOMElementNode( |
|
is_visible=True, |
|
tag_name="body", |
|
is_interactive=False, |
|
is_top_element=True |
|
) |
|
dummy_map = {1: dummy_root} |
|
return DOMState( |
|
element_tree=dummy_root, |
|
selector_map=dummy_map, |
|
url=page.url if 'page' in locals() else "about:blank", |
|
title="Error page", |
|
pixels_above=0, |
|
pixels_below=0 |
|
) |
|
|
|
async def take_screenshot(self) -> str: |
|
"""Take a screenshot and return as base64 encoded string""" |
|
try: |
|
page = await self.get_current_page() |
|
screenshot_bytes = await page.screenshot(type='jpeg', quality=60, full_page=False) |
|
return base64.b64encode(screenshot_bytes).decode('utf-8') |
|
except Exception as e: |
|
print(f"Error taking screenshot: {e}") |
|
|
|
return "" |
|
|
|
async def save_screenshot_to_file(self) -> str: |
|
"""Take a screenshot and save to file, returning the path""" |
|
try: |
|
page = await self.get_current_page() |
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') |
|
random_id = random.randint(1000, 9999) |
|
filename = f"screenshot_{timestamp}_{random_id}.jpg" |
|
filepath = os.path.join(self.screenshot_dir, filename) |
|
|
|
await page.screenshot(path=filepath, type='jpeg', quality=60, full_page=False) |
|
return filepath |
|
except Exception as e: |
|
print(f"Error saving screenshot: {e}") |
|
return "" |
|
|
|
async def extract_ocr_text_from_screenshot(self, screenshot_base64: str) -> str: |
|
"""Extract text from screenshot using OCR""" |
|
if not screenshot_base64: |
|
return "" |
|
|
|
try: |
|
|
|
image_bytes = base64.b64decode(screenshot_base64) |
|
image = Image.open(io.BytesIO(image_bytes)) |
|
|
|
|
|
ocr_text = pytesseract.image_to_string(image) |
|
|
|
|
|
ocr_text = ocr_text.strip() |
|
|
|
return ocr_text |
|
except Exception as e: |
|
print(f"Error performing OCR: {e}") |
|
traceback.print_exc() |
|
return "" |
|
|
|
async def get_updated_browser_state(self, action_name: str) -> tuple: |
|
"""Helper method to get updated browser state after any action |
|
Returns a tuple of (dom_state, screenshot, elements, metadata) |
|
""" |
|
try: |
|
|
|
await asyncio.sleep(0.5) |
|
|
|
|
|
dom_state = await self.get_current_dom_state() |
|
screenshot = await self.take_screenshot() |
|
|
|
|
|
elements = dom_state.element_tree.clickable_elements_to_string( |
|
include_attributes=self.include_attributes |
|
) |
|
|
|
|
|
page = await self.get_current_page() |
|
metadata = {} |
|
|
|
|
|
metadata['element_count'] = len(dom_state.selector_map) |
|
|
|
|
|
interactive_elements = [] |
|
for idx, element in dom_state.selector_map.items(): |
|
element_info = { |
|
'index': idx, |
|
'tag_name': element.tag_name, |
|
'text': element.get_all_text_till_next_clickable_element(), |
|
'is_in_viewport': element.is_in_viewport |
|
} |
|
|
|
|
|
for attr_name in ['id', 'href', 'src', 'alt', 'placeholder', 'name', 'role', 'title', 'type']: |
|
if attr_name in element.attributes: |
|
element_info[attr_name] = element.attributes[attr_name] |
|
|
|
interactive_elements.append(element_info) |
|
|
|
metadata['interactive_elements'] = interactive_elements |
|
|
|
|
|
try: |
|
viewport = await page.evaluate(""" |
|
() => { |
|
return { |
|
width: window.innerWidth, |
|
height: window.innerHeight |
|
}; |
|
} |
|
""") |
|
metadata['viewport_width'] = viewport.get('width', 0) |
|
metadata['viewport_height'] = viewport.get('height', 0) |
|
except Exception as e: |
|
print(f"Error getting viewport dimensions: {e}") |
|
metadata['viewport_width'] = 0 |
|
metadata['viewport_height'] = 0 |
|
|
|
|
|
ocr_text = "" |
|
if screenshot: |
|
ocr_text = await self.extract_ocr_text_from_screenshot(screenshot) |
|
metadata['ocr_text'] = ocr_text |
|
|
|
print(f"Got updated state after {action_name}: {len(dom_state.selector_map)} elements") |
|
return dom_state, screenshot, elements, metadata |
|
except Exception as e: |
|
print(f"Error getting updated state after {action_name}: {e}") |
|
traceback.print_exc() |
|
|
|
return None, "", "", {} |
|
|
|
def build_action_result(self, success: bool, message: str, dom_state, screenshot: str, |
|
elements: str, metadata: dict, error: str = "", content: str = None, |
|
fallback_url: str = None) -> BrowserActionResult: |
|
"""Helper method to build a consistent BrowserActionResult""" |
|
|
|
if elements is None: |
|
elements = "" |
|
|
|
return BrowserActionResult( |
|
success=success, |
|
message=message, |
|
error=error, |
|
url=dom_state.url if dom_state else fallback_url or "", |
|
title=dom_state.title if dom_state else "", |
|
elements=elements, |
|
screenshot_base64=screenshot, |
|
pixels_above=dom_state.pixels_above if dom_state else 0, |
|
pixels_below=dom_state.pixels_below if dom_state else 0, |
|
content=content, |
|
ocr_text=metadata.get('ocr_text', ""), |
|
element_count=metadata.get('element_count', 0), |
|
interactive_elements=metadata.get('interactive_elements', []), |
|
viewport_width=metadata.get('viewport_width', 0), |
|
viewport_height=metadata.get('viewport_height', 0) |
|
) |
|
|
|
|
|
|
|
async def navigate_to(self, action: GoToUrlAction = Body(...)): |
|
"""Navigate to a specified URL""" |
|
try: |
|
page = await self.get_current_page() |
|
await page.goto(action.url, wait_until="domcontentloaded") |
|
await page.wait_for_load_state("networkidle", timeout=10000) |
|
|
|
|
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"navigate_to({action.url})") |
|
|
|
result = self.build_action_result( |
|
True, |
|
f"Navigated to {action.url}", |
|
dom_state, |
|
screenshot, |
|
elements, |
|
metadata, |
|
error="", |
|
content=None |
|
) |
|
|
|
print(f"Navigation result: success={result.success}, url={result.url}") |
|
return result |
|
except Exception as e: |
|
print(f"Navigation error: {str(e)}") |
|
traceback.print_exc() |
|
|
|
try: |
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state("navigate_error_recovery") |
|
return self.build_action_result( |
|
False, |
|
str(e), |
|
dom_state, |
|
screenshot, |
|
elements, |
|
metadata, |
|
error=str(e), |
|
content=None |
|
) |
|
except: |
|
return self.build_action_result( |
|
False, |
|
str(e), |
|
None, |
|
"", |
|
"", |
|
{}, |
|
error=str(e), |
|
content=None |
|
) |
|
|
|
async def search_google(self, action: SearchGoogleAction = Body(...)): |
|
"""Search Google with the provided query""" |
|
try: |
|
page = await self.get_current_page() |
|
search_url = f"https://www.google.com/search?q={action.query}" |
|
await page.goto(search_url) |
|
await page.wait_for_load_state() |
|
|
|
|
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"search_google({action.query})") |
|
|
|
return self.build_action_result( |
|
True, |
|
f"Searched for '{action.query}' in Google", |
|
dom_state, |
|
screenshot, |
|
elements, |
|
metadata, |
|
error="", |
|
content=None |
|
) |
|
except Exception as e: |
|
print(f"Search error: {str(e)}") |
|
traceback.print_exc() |
|
|
|
try: |
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state("search_error_recovery") |
|
return self.build_action_result( |
|
False, |
|
str(e), |
|
dom_state, |
|
screenshot, |
|
elements, |
|
metadata, |
|
error=str(e), |
|
content=None |
|
) |
|
except: |
|
return self.build_action_result( |
|
False, |
|
str(e), |
|
None, |
|
"", |
|
"", |
|
{}, |
|
error=str(e), |
|
content=None |
|
) |
|
|
|
async def go_back(self, _: NoParamsAction = Body(...)): |
|
"""Navigate back in browser history""" |
|
try: |
|
page = await self.get_current_page() |
|
await page.go_back() |
|
await page.wait_for_load_state() |
|
|
|
|
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state("go_back") |
|
|
|
return self.build_action_result( |
|
True, |
|
"Navigated back", |
|
dom_state, |
|
screenshot, |
|
elements, |
|
metadata, |
|
error="", |
|
content=None |
|
) |
|
except Exception as e: |
|
return self.build_action_result( |
|
False, |
|
str(e), |
|
None, |
|
"", |
|
"", |
|
{}, |
|
error=str(e), |
|
content=None |
|
) |
|
|
|
async def wait(self, seconds: int = Body(3)): |
|
"""Wait for the specified number of seconds""" |
|
try: |
|
await asyncio.sleep(seconds) |
|
|
|
|
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"wait({seconds} seconds)") |
|
|
|
return self.build_action_result( |
|
True, |
|
f"Waited for {seconds} seconds", |
|
dom_state, |
|
screenshot, |
|
elements, |
|
metadata, |
|
error="", |
|
content=None |
|
) |
|
except Exception as e: |
|
return self.build_action_result( |
|
False, |
|
str(e), |
|
None, |
|
"", |
|
"", |
|
{}, |
|
error=str(e), |
|
content=None |
|
) |
|
|
|
|
|
|
|
async def click_coordinates(self, action: ClickCoordinatesAction = Body(...)): |
|
"""Click at specific x,y coordinates on the page""" |
|
try: |
|
page = await self.get_current_page() |
|
|
|
|
|
await page.mouse.click(action.x, action.y) |
|
|
|
|
|
await page.wait_for_load_state("networkidle", timeout=5000) |
|
|
|
|
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"click_coordinates({action.x}, {action.y})") |
|
|
|
return self.build_action_result( |
|
True, |
|
f"Clicked at coordinates ({action.x}, {action.y})", |
|
dom_state, |
|
screenshot, |
|
elements, |
|
metadata, |
|
error="", |
|
content=None |
|
) |
|
except Exception as e: |
|
print(f"Error in click_coordinates: {e}") |
|
traceback.print_exc() |
|
|
|
|
|
try: |
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state("click_coordinates_error_recovery") |
|
return self.build_action_result( |
|
False, |
|
str(e), |
|
dom_state, |
|
screenshot, |
|
elements, |
|
metadata, |
|
error=str(e), |
|
content=None |
|
) |
|
except: |
|
return self.build_action_result( |
|
False, |
|
str(e), |
|
None, |
|
"", |
|
"", |
|
{}, |
|
error=str(e), |
|
content=None |
|
) |
|
|
|
async def click_element(self, action: ClickElementAction = Body(...)): |
|
"""Click on an element by index""" |
|
try: |
|
page = await self.get_current_page() |
|
|
|
|
|
initial_dom_state = await self.get_current_dom_state() |
|
selector_map = initial_dom_state.selector_map |
|
|
|
if action.index not in selector_map: |
|
|
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"click_element_error (index {action.index} not found)") |
|
return self.build_action_result( |
|
False, |
|
f"Element with index {action.index} not found", |
|
dom_state, |
|
screenshot, |
|
elements, |
|
metadata, |
|
error=f"Element with index {action.index} not found" |
|
) |
|
|
|
element_to_click = selector_map[action.index] |
|
print(f"Attempting to click element: {element_to_click}") |
|
|
|
|
|
|
|
js_selector_script = """ |
|
(targetElementInfo) => { |
|
const interactiveElements = Array.from(document.querySelectorAll( |
|
'a, button, input, select, textarea, [role="button"], [role="link"], [role="checkbox"], [role="radio"], [tabindex]:not([tabindex="-1"])' |
|
)); |
|
|
|
const visibleElements = interactiveElements.filter(el => { |
|
const style = window.getComputedStyle(el); |
|
const rect = el.getBoundingClientRect(); |
|
return style.display !== 'none' && style.visibility !== 'hidden' && style.opacity !== '0' && rect.width > 0 && rect.height > 0; |
|
}); |
|
|
|
if (targetElementInfo.index > 0 && targetElementInfo.index <= visibleElements.length) { |
|
// Return the element at the specified index (1-based) |
|
return visibleElements[targetElementInfo.index - 1]; |
|
} |
|
return null; // Element not found at the expected index |
|
} |
|
""" |
|
|
|
element_info = {'index': action.index} |
|
|
|
target_element_handle = await page.evaluate_handle(js_selector_script, element_info) |
|
|
|
click_success = False |
|
error_message = "" |
|
|
|
if await target_element_handle.evaluate("node => node !== null"): |
|
try: |
|
|
|
|
|
await target_element_handle.click(timeout=5000) |
|
click_success = True |
|
print(f"Successfully clicked element handle for index {action.index}") |
|
except Exception as click_error: |
|
error_message = f"Error clicking element handle: {click_error}" |
|
print(error_message) |
|
|
|
|
|
else: |
|
error_message = f"Could not locate the target element handle for index {action.index} using JS script." |
|
print(error_message) |
|
|
|
|
|
|
|
try: |
|
await page.wait_for_load_state("networkidle", timeout=5000) |
|
except Exception as wait_error: |
|
print(f"Timeout or error waiting for network idle after click: {wait_error}") |
|
await asyncio.sleep(1) |
|
|
|
|
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"click_element({action.index})") |
|
|
|
return self.build_action_result( |
|
click_success, |
|
f"Clicked element with index {action.index}" if click_success else f"Attempted to click element {action.index} but failed. Error: {error_message}", |
|
dom_state, |
|
screenshot, |
|
elements, |
|
metadata, |
|
error=error_message if not click_success else "", |
|
content=None |
|
) |
|
|
|
except Exception as e: |
|
print(f"Error in click_element: {e}") |
|
traceback.print_exc() |
|
|
|
try: |
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state("click_element_error_recovery") |
|
return self.build_action_result( |
|
False, |
|
str(e), |
|
dom_state, |
|
screenshot, |
|
elements, |
|
metadata, |
|
error=str(e), |
|
content=None |
|
) |
|
except: |
|
|
|
current_url = "unknown" |
|
try: |
|
current_url = page.url |
|
except: |
|
pass |
|
return self.build_action_result( |
|
False, |
|
str(e), |
|
None, |
|
"", |
|
"", |
|
{}, |
|
error=str(e), |
|
content=None, |
|
fallback_url=current_url |
|
) |
|
|
|
async def input_text(self, action: InputTextAction = Body(...)): |
|
"""Input text into an element""" |
|
try: |
|
page = await self.get_current_page() |
|
selector_map = await self.get_selector_map() |
|
|
|
if action.index not in selector_map: |
|
return self.build_action_result( |
|
False, |
|
f"Element with index {action.index} not found", |
|
None, |
|
"", |
|
"", |
|
{}, |
|
error=f"Element with index {action.index} not found" |
|
) |
|
|
|
|
|
|
|
element = selector_map[action.index] |
|
|
|
|
|
await page.wait_for_timeout(500) |
|
|
|
|
|
if element.attributes.get("id"): |
|
await page.fill(f"#{element.attributes['id']}", action.text) |
|
elif element.attributes.get("class"): |
|
class_selector = f".{element.attributes['class'].replace(' ', '.')}" |
|
await page.fill(class_selector, action.text) |
|
else: |
|
|
|
await page.fill(f"//{element.tag_name}[{action.index}]", action.text) |
|
|
|
|
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"input_text({action.index}, '{action.text}')") |
|
|
|
return self.build_action_result( |
|
True, |
|
f"Input '{action.text}' into element with index {action.index}", |
|
dom_state, |
|
screenshot, |
|
elements, |
|
metadata, |
|
error="", |
|
content=None |
|
) |
|
except Exception as e: |
|
return self.build_action_result( |
|
False, |
|
str(e), |
|
None, |
|
"", |
|
"", |
|
{}, |
|
error=str(e), |
|
content=None |
|
) |
|
|
|
async def send_keys(self, action: SendKeysAction = Body(...)): |
|
"""Send keyboard keys""" |
|
try: |
|
page = await self.get_current_page() |
|
await page.keyboard.press(action.keys) |
|
|
|
|
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"send_keys({action.keys})") |
|
|
|
return self.build_action_result( |
|
True, |
|
f"Sent keys: {action.keys}", |
|
dom_state, |
|
screenshot, |
|
elements, |
|
metadata, |
|
error="", |
|
content=None |
|
) |
|
except Exception as e: |
|
return self.build_action_result( |
|
False, |
|
str(e), |
|
None, |
|
"", |
|
"", |
|
{}, |
|
error=str(e), |
|
content=None |
|
) |
|
|
|
|
|
|
|
async def switch_tab(self, action: SwitchTabAction = Body(...)): |
|
"""Switch to a different tab by index""" |
|
try: |
|
if 0 <= action.page_id < len(self.pages): |
|
self.current_page_index = action.page_id |
|
page = await self.get_current_page() |
|
await page.wait_for_load_state() |
|
|
|
|
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"switch_tab({action.page_id})") |
|
|
|
return self.build_action_result( |
|
True, |
|
f"Switched to tab {action.page_id}", |
|
dom_state, |
|
screenshot, |
|
elements, |
|
metadata, |
|
error="", |
|
content=None |
|
) |
|
else: |
|
return self.build_action_result( |
|
False, |
|
f"Tab {action.page_id} not found", |
|
None, |
|
"", |
|
"", |
|
{}, |
|
error=f"Tab {action.page_id} not found" |
|
) |
|
except Exception as e: |
|
return self.build_action_result( |
|
False, |
|
str(e), |
|
None, |
|
"", |
|
"", |
|
{}, |
|
error=str(e), |
|
content=None |
|
) |
|
|
|
async def open_tab(self, action: OpenTabAction = Body(...)): |
|
"""Open a new tab with the specified URL""" |
|
try: |
|
print(f"Attempting to open new tab with URL: {action.url}") |
|
|
|
new_page = await self.browser.new_page() |
|
print(f"New page created successfully") |
|
|
|
|
|
await new_page.goto(action.url, wait_until="domcontentloaded") |
|
await new_page.wait_for_load_state("networkidle", timeout=10000) |
|
print(f"Navigated to URL in new tab: {action.url}") |
|
|
|
|
|
self.pages.append(new_page) |
|
self.current_page_index = len(self.pages) - 1 |
|
print(f"New tab added as index {self.current_page_index}") |
|
|
|
|
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"open_tab({action.url})") |
|
|
|
return self.build_action_result( |
|
True, |
|
f"Opened new tab with URL: {action.url}", |
|
dom_state, |
|
screenshot, |
|
elements, |
|
metadata, |
|
error="", |
|
content=None |
|
) |
|
except Exception as e: |
|
print("****"*10) |
|
print(f"Error opening tab: {e}") |
|
print(traceback.format_exc()) |
|
print("****"*10) |
|
return self.build_action_result( |
|
False, |
|
str(e), |
|
None, |
|
"", |
|
"", |
|
{}, |
|
error=str(e), |
|
content=None |
|
) |
|
|
|
async def close_tab(self, action: CloseTabAction = Body(...)): |
|
"""Close a tab by index""" |
|
try: |
|
if 0 <= action.page_id < len(self.pages): |
|
page = self.pages[action.page_id] |
|
url = page.url |
|
await page.close() |
|
self.pages.pop(action.page_id) |
|
|
|
|
|
if self.current_page_index >= len(self.pages): |
|
self.current_page_index = max(0, len(self.pages) - 1) |
|
elif self.current_page_index >= action.page_id: |
|
self.current_page_index = max(0, self.current_page_index - 1) |
|
|
|
|
|
page = await self.get_current_page() |
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"close_tab({action.page_id})") |
|
|
|
return self.build_action_result( |
|
True, |
|
f"Closed tab {action.page_id} with URL: {url}", |
|
dom_state, |
|
screenshot, |
|
elements, |
|
metadata, |
|
error="", |
|
content=None |
|
) |
|
else: |
|
return self.build_action_result( |
|
False, |
|
f"Tab {action.page_id} not found", |
|
None, |
|
"", |
|
"", |
|
{}, |
|
error=f"Tab {action.page_id} not found" |
|
) |
|
except Exception as e: |
|
return self.build_action_result( |
|
False, |
|
str(e), |
|
None, |
|
"", |
|
"", |
|
{}, |
|
error=str(e), |
|
content=None |
|
) |
|
|
|
|
|
|
|
async def extract_content(self, goal: str = Body(...)): |
|
"""Extract content from the current page based on the provided goal""" |
|
try: |
|
page = await self.get_current_page() |
|
content = await page.content() |
|
|
|
|
|
|
|
extracted_text = await page.evaluate(""" |
|
Array.from(document.querySelectorAll('p, h1, h2, h3, h4, h5, h6, li, span, div')) |
|
.filter(el => { |
|
const style = window.getComputedStyle(el); |
|
return style.display !== 'none' && |
|
style.visibility !== 'hidden' && |
|
style.opacity !== '0' && |
|
el.innerText && |
|
el.innerText.trim().length > 0; |
|
}) |
|
.map(el => el.innerText.trim()) |
|
.join('\\n\\n'); |
|
""") |
|
|
|
|
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"extract_content({goal})") |
|
|
|
return self.build_action_result( |
|
True, |
|
f"Content extracted based on goal: {goal}", |
|
dom_state, |
|
screenshot, |
|
elements, |
|
metadata, |
|
error="", |
|
content=extracted_text |
|
) |
|
except Exception as e: |
|
return self.build_action_result( |
|
False, |
|
str(e), |
|
None, |
|
"", |
|
"", |
|
{}, |
|
error=str(e), |
|
content=None |
|
) |
|
|
|
async def save_pdf(self): |
|
"""Save the current page as a PDF""" |
|
try: |
|
page = await self.get_current_page() |
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') |
|
random_id = random.randint(1000, 9999) |
|
filename = f"page_{timestamp}_{random_id}.pdf" |
|
filepath = os.path.join(self.screenshot_dir, filename) |
|
|
|
await page.pdf(path=filepath) |
|
|
|
|
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state("save_pdf") |
|
|
|
return self.build_action_result( |
|
True, |
|
f"Saved page as PDF: {filepath}", |
|
dom_state, |
|
screenshot, |
|
elements, |
|
metadata, |
|
error="", |
|
content=None |
|
) |
|
except Exception as e: |
|
return self.build_action_result( |
|
False, |
|
str(e), |
|
None, |
|
"", |
|
"", |
|
{}, |
|
error=str(e), |
|
content=None |
|
) |
|
|
|
|
|
|
|
async def scroll_down(self, action: ScrollAction = Body(...)): |
|
"""Scroll down the page""" |
|
try: |
|
page = await self.get_current_page() |
|
if action.amount is not None: |
|
await page.evaluate(f"window.scrollBy(0, {action.amount});") |
|
amount_str = f"{action.amount} pixels" |
|
else: |
|
await page.evaluate("window.scrollBy(0, window.innerHeight);") |
|
amount_str = "one page" |
|
|
|
await page.wait_for_timeout(500) |
|
|
|
|
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"scroll_down({amount_str})") |
|
|
|
return self.build_action_result( |
|
True, |
|
f"Scrolled down by {amount_str}", |
|
dom_state, |
|
screenshot, |
|
elements, |
|
metadata, |
|
error="", |
|
content=None |
|
) |
|
except Exception as e: |
|
return self.build_action_result( |
|
False, |
|
str(e), |
|
None, |
|
"", |
|
"", |
|
{}, |
|
error=str(e), |
|
content=None |
|
) |
|
|
|
async def scroll_up(self, action: ScrollAction = Body(...)): |
|
"""Scroll up the page""" |
|
try: |
|
page = await self.get_current_page() |
|
if action.amount is not None: |
|
await page.evaluate(f"window.scrollBy(0, -{action.amount});") |
|
amount_str = f"{action.amount} pixels" |
|
else: |
|
await page.evaluate("window.scrollBy(0, -window.innerHeight);") |
|
amount_str = "one page" |
|
|
|
await page.wait_for_timeout(500) |
|
|
|
|
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"scroll_up({amount_str})") |
|
|
|
return self.build_action_result( |
|
True, |
|
f"Scrolled up by {amount_str}", |
|
dom_state, |
|
screenshot, |
|
elements, |
|
metadata, |
|
error="", |
|
content=None |
|
) |
|
except Exception as e: |
|
return self.build_action_result( |
|
False, |
|
str(e), |
|
None, |
|
"", |
|
"", |
|
{}, |
|
error=str(e), |
|
content=None |
|
) |
|
|
|
async def scroll_to_text(self, text: str = Body(...)): |
|
"""Scroll to text on the page""" |
|
try: |
|
page = await self.get_current_page() |
|
locators = [ |
|
page.get_by_text(text, exact=False), |
|
page.locator(f"text={text}"), |
|
page.locator(f"//*[contains(text(), '{text}')]"), |
|
] |
|
|
|
found = False |
|
for locator in locators: |
|
try: |
|
if await locator.count() > 0 and await locator.first.is_visible(): |
|
await locator.first.scroll_into_view_if_needed() |
|
await asyncio.sleep(0.5) |
|
found = True |
|
break |
|
except Exception: |
|
continue |
|
|
|
|
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"scroll_to_text({text})") |
|
|
|
message = f"Scrolled to text: {text}" if found else f"Text '{text}' not found or not visible on page" |
|
|
|
return self.build_action_result( |
|
found, |
|
message, |
|
dom_state, |
|
screenshot, |
|
elements, |
|
metadata, |
|
error="", |
|
content=None |
|
) |
|
except Exception as e: |
|
return self.build_action_result( |
|
False, |
|
str(e), |
|
None, |
|
"", |
|
"", |
|
{}, |
|
error=str(e), |
|
content=None |
|
) |
|
|
|
|
|
|
|
async def get_dropdown_options(self, index: int = Body(...)): |
|
"""Get all options from a dropdown""" |
|
try: |
|
page = await self.get_current_page() |
|
selector_map = await self.get_selector_map() |
|
|
|
if index not in selector_map: |
|
return self.build_action_result( |
|
False, |
|
f"Element with index {index} not found", |
|
None, |
|
"", |
|
"", |
|
{}, |
|
error=f"Element with index {index} not found" |
|
) |
|
|
|
element = selector_map[index] |
|
options = [] |
|
|
|
|
|
try: |
|
if element.tag_name.lower() == 'select': |
|
|
|
options_js = f""" |
|
Array.from(document.querySelectorAll('select')[{index-1}].options) |
|
.map((option, index) => ({ |
|
index: index, |
|
text: option.text, |
|
value: option.value |
|
})); |
|
""" |
|
options = await page.evaluate(options_js) |
|
else: |
|
|
|
|
|
await page.click(f"#{element.attributes.get('id')}") if element.attributes.get('id') else None |
|
await page.wait_for_timeout(500) |
|
|
|
options_js = """ |
|
Array.from(document.querySelectorAll('.dropdown-item, [role="option"], li')) |
|
.filter(el => { |
|
const style = window.getComputedStyle(el); |
|
return style.display !== 'none' && style.visibility !== 'hidden'; |
|
}) |
|
.map((option, index) => ({ |
|
index: index, |
|
text: option.innerText.trim(), |
|
value: option.getAttribute('value') || option.getAttribute('data-value') || option.innerText.trim() |
|
})); |
|
""" |
|
options = await page.evaluate(options_js) |
|
|
|
|
|
await page.keyboard.press("Escape") |
|
except Exception as e: |
|
self.logger.error(f"Error getting dropdown options: {e}") |
|
|
|
options = [ |
|
{"index": 0, "text": "Option 1", "value": "option1"}, |
|
{"index": 1, "text": "Option 2", "value": "option2"}, |
|
{"index": 2, "text": "Option 3", "value": "option3"}, |
|
] |
|
|
|
|
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"get_dropdown_options({index})") |
|
|
|
return self.build_action_result( |
|
True, |
|
f"Retrieved {len(options)} options from dropdown", |
|
dom_state, |
|
screenshot, |
|
elements, |
|
metadata, |
|
error="", |
|
content=json.dumps(options) |
|
) |
|
except Exception as e: |
|
return self.build_action_result( |
|
False, |
|
str(e), |
|
None, |
|
"", |
|
"", |
|
{}, |
|
error=str(e), |
|
content=None |
|
) |
|
|
|
async def select_dropdown_option(self, index: int = Body(...), option_text: str = Body(...)): |
|
"""Select an option from a dropdown by text""" |
|
try: |
|
page = await self.get_current_page() |
|
selector_map = await self.get_selector_map() |
|
|
|
if index not in selector_map: |
|
return self.build_action_result( |
|
False, |
|
f"Element with index {index} not found", |
|
None, |
|
"", |
|
"", |
|
{}, |
|
error=f"Element with index {index} not found" |
|
) |
|
|
|
element = selector_map[index] |
|
|
|
|
|
if element.tag_name.lower() == 'select': |
|
|
|
selector = f"select option:has-text('{option_text}')" |
|
await page.select_option( |
|
f"#{element.attributes.get('id')}" if element.attributes.get('id') else f"//select[{index}]", |
|
label=option_text |
|
) |
|
else: |
|
|
|
|
|
if element.attributes.get('id'): |
|
await page.click(f"#{element.attributes.get('id')}") |
|
else: |
|
await page.click(f"//{element.tag_name}[{index}]") |
|
|
|
await page.wait_for_timeout(500) |
|
|
|
|
|
await page.click(f"text={option_text}") |
|
|
|
await page.wait_for_timeout(500) |
|
|
|
|
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"select_dropdown_option({index}, '{option_text}')") |
|
|
|
return self.build_action_result( |
|
True, |
|
f"Selected option '{option_text}' from dropdown with index {index}", |
|
dom_state, |
|
screenshot, |
|
elements, |
|
metadata, |
|
error="", |
|
content=None |
|
) |
|
except Exception as e: |
|
return self.build_action_result( |
|
False, |
|
str(e), |
|
None, |
|
"", |
|
"", |
|
{}, |
|
error=str(e), |
|
content=None |
|
) |
|
|
|
|
|
|
|
async def drag_drop(self, action: DragDropAction = Body(...)): |
|
"""Perform drag and drop operation""" |
|
try: |
|
page = await self.get_current_page() |
|
|
|
|
|
if action.element_source and action.element_target: |
|
|
|
source_desc = action.element_source |
|
target_desc = action.element_target |
|
|
|
|
|
|
|
await page.evaluate(""" |
|
console.log("Simulating drag and drop between elements"); |
|
""") |
|
|
|
message = f"Dragged element '{source_desc}' to '{target_desc}'" |
|
|
|
|
|
elif all(coord is not None for coord in [ |
|
action.coord_source_x, action.coord_source_y, |
|
action.coord_target_x, action.coord_target_y |
|
]): |
|
source_x = action.coord_source_x |
|
source_y = action.coord_source_y |
|
target_x = action.coord_target_x |
|
target_y = action.coord_target_y |
|
|
|
|
|
await page.mouse.move(source_x, source_y) |
|
await page.mouse.down() |
|
|
|
steps = max(1, action.steps or 10) |
|
delay_ms = max(0, action.delay_ms or 5) |
|
|
|
for i in range(1, steps + 1): |
|
ratio = i / steps |
|
intermediate_x = int(source_x + (target_x - source_x) * ratio) |
|
intermediate_y = int(source_y + (target_y - source_y) * ratio) |
|
await page.mouse.move(intermediate_x, intermediate_y) |
|
if delay_ms > 0: |
|
await asyncio.sleep(delay_ms / 1000) |
|
|
|
await page.mouse.move(target_x, target_y) |
|
await page.mouse.up() |
|
|
|
message = f"Dragged from ({source_x}, {source_y}) to ({target_x}, {target_y})" |
|
else: |
|
return self.build_action_result( |
|
False, |
|
"Must provide either source/target selectors or coordinates", |
|
None, |
|
"", |
|
"", |
|
{}, |
|
error="Must provide either source/target selectors or coordinates" |
|
) |
|
|
|
|
|
dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"drag_drop({action.element_source}, {action.element_target})") |
|
|
|
return self.build_action_result( |
|
True, |
|
message, |
|
dom_state, |
|
screenshot, |
|
elements, |
|
metadata, |
|
error="", |
|
content=None |
|
) |
|
except Exception as e: |
|
return self.build_action_result( |
|
False, |
|
str(e), |
|
None, |
|
"", |
|
"", |
|
{}, |
|
error=str(e), |
|
content=None |
|
) |
|
|
|
|
|
automation_service = BrowserAutomation() |
|
|
|
|
|
api_app = FastAPI() |
|
|
|
@api_app.get("/api") |
|
async def health_check(): |
|
return {"status": "ok", "message": "API server is running"} |
|
|
|
|
|
api_app.include_router(automation_service.router, prefix="/api") |
|
|
|
async def test_browser_api(): |
|
"""Test the browser automation API functionality""" |
|
try: |
|
|
|
print("\n=== Starting Browser Automation Test ===") |
|
await automation_service.startup() |
|
print("β
Browser started successfully") |
|
|
|
|
|
print("\n--- Testing Navigation ---") |
|
result = await automation_service.navigate_to(GoToUrlAction(url="https://www.youtube.com")) |
|
print(f"Navigation status: {'β
Success' if result.success else 'β Failed'}") |
|
if not result.success: |
|
print(f"Error: {result.error}") |
|
return |
|
|
|
print(f"URL: {result.url}") |
|
print(f"Title: {result.title}") |
|
|
|
|
|
print(f"\nFound {result.element_count} interactive elements") |
|
if result.elements and result.elements.strip(): |
|
print("Elements:") |
|
print(result.elements) |
|
else: |
|
print("No formatted elements found, but DOM was processed") |
|
|
|
|
|
if result.interactive_elements and len(result.interactive_elements) > 0: |
|
print("\nInteractive elements summary:") |
|
for el in result.interactive_elements: |
|
print(f" [{el['index']}] <{el['tag_name']}> {el.get('text', '')[:30]}") |
|
|
|
|
|
print(f"\nScreenshot captured: {'Yes' if result.screenshot_base64 else 'No'}") |
|
print(f"Viewport size: {result.viewport_width}x{result.viewport_height}") |
|
|
|
|
|
print("\n--- Testing OCR Text Extraction ---") |
|
if result.ocr_text: |
|
print("OCR text extracted from screenshot:") |
|
print("=== OCR TEXT START ===") |
|
print(result.ocr_text) |
|
print("=== OCR TEXT END ===") |
|
print(f"OCR text length: {len(result.ocr_text)} characters") |
|
print(result.ocr_text) |
|
else: |
|
print("No OCR text extracted from screenshot") |
|
|
|
await asyncio.sleep(2) |
|
|
|
|
|
print("\n--- Testing Search ---") |
|
result = await automation_service.search_google(SearchGoogleAction(query="browser automation")) |
|
print(f"Search status: {'β
Success' if result.success else 'β Failed'}") |
|
if not result.success: |
|
print(f"Error: {result.error}") |
|
else: |
|
print(f"Found {result.element_count} elements after search") |
|
print(f"Page title: {result.title}") |
|
|
|
|
|
if result.ocr_text: |
|
print("\nOCR text from search results:") |
|
print("=== OCR TEXT START ===") |
|
print(result.ocr_text) |
|
print("=== OCR TEXT END ===") |
|
else: |
|
print("\nNo OCR text extracted from search results") |
|
|
|
await asyncio.sleep(2) |
|
|
|
|
|
print("\n--- Testing Scrolling ---") |
|
result = await automation_service.scroll_down(ScrollAction(amount=300)) |
|
print(f"Scroll status: {'β
Success' if result.success else 'β Failed'}") |
|
if result.success: |
|
print(f"Pixels above viewport: {result.pixels_above}") |
|
print(f"Pixels below viewport: {result.pixels_below}") |
|
|
|
await asyncio.sleep(2) |
|
|
|
|
|
print("\n--- Testing Element Click ---") |
|
if result.element_count > 0: |
|
click_result = await automation_service.click_element(ClickElementAction(index=1)) |
|
print(f"Click status: {'β
Success' if click_result.success else 'β Failed'}") |
|
print(f"Message: {click_result.message}") |
|
print(f"New URL after click: {click_result.url}") |
|
else: |
|
print("Skipping click test - no elements found") |
|
|
|
await asyncio.sleep(2) |
|
|
|
|
|
print("\n--- Testing Click Coordinates ---") |
|
coord_click_result = await automation_service.click_coordinates(ClickCoordinatesAction(x=100, y=100)) |
|
print(f"Coordinate click status: {'β
Success' if coord_click_result.success else 'β Failed'}") |
|
print(f"Message: {coord_click_result.message}") |
|
print(f"URL after coordinate click: {coord_click_result.url}") |
|
|
|
await asyncio.sleep(2) |
|
|
|
|
|
print("\n--- Testing Content Extraction ---") |
|
content_result = await automation_service.extract_content("test goal") |
|
print(f"Content extraction status: {'β
Success' if content_result.success else 'β Failed'}") |
|
if content_result.content: |
|
content_preview = content_result.content[:100] + "..." if len(content_result.content) > 100 else content_result.content |
|
print(f"Content sample: {content_preview}") |
|
print(f"Total content length: {len(content_result.content)} chars") |
|
else: |
|
print("No content was extracted") |
|
|
|
|
|
print("\n--- Testing Tab Management ---") |
|
tab_result = await automation_service.open_tab(OpenTabAction(url="https://www.example.org")) |
|
print(f"New tab status: {'β
Success' if tab_result.success else 'β Failed'}") |
|
if tab_result.success: |
|
print(f"New tab title: {tab_result.title}") |
|
print(f"Interactive elements: {tab_result.element_count}") |
|
|
|
print("\nβ
All tests completed successfully!") |
|
|
|
except Exception as e: |
|
print(f"\nβ Test failed: {str(e)}") |
|
traceback.print_exc() |
|
finally: |
|
|
|
print("\n--- Cleaning up ---") |
|
await automation_service.shutdown() |
|
print("Browser closed") |
|
|
|
async def test_browser_api_2(): |
|
"""Test the browser automation API functionality on the chess page""" |
|
try: |
|
|
|
print("\n=== Starting Browser Automation Test 2 (Chess Page) ===") |
|
await automation_service.startup() |
|
print("β
Browser started successfully") |
|
|
|
|
|
print("\n--- Testing Navigation to Chess Page ---") |
|
test_url = "https://dat-lequoc.github.io/chess-for-suna/chess.html" |
|
result = await automation_service.navigate_to(GoToUrlAction(url=test_url)) |
|
print(f"Navigation status: {'β
Success' if result.success else 'β Failed'}") |
|
if not result.success: |
|
print(f"Error: {result.error}") |
|
return |
|
|
|
print(f"URL: {result.url}") |
|
print(f"Title: {result.title}") |
|
|
|
|
|
print(f"\nFound {result.element_count} interactive elements") |
|
if result.elements and result.elements.strip(): |
|
print("Elements:") |
|
print(result.elements) |
|
else: |
|
print("No formatted elements found, but DOM was processed") |
|
|
|
|
|
if result.interactive_elements and len(result.interactive_elements) > 0: |
|
print("\nInteractive elements summary:") |
|
for el in result.interactive_elements: |
|
print(f" [{el['index']}] <{el['tag_name']}> {el.get('text', '')[:30]}") |
|
|
|
|
|
print(f"\nScreenshot captured: {'Yes' if result.screenshot_base64 else 'No'}") |
|
print(f"Viewport size: {result.viewport_width}x{result.viewport_height}") |
|
|
|
await asyncio.sleep(2) |
|
|
|
|
|
print("\n--- Testing Element Click (element 5) ---") |
|
if result.element_count > 4: |
|
click_index = 5 |
|
click_result = await automation_service.click_element(ClickElementAction(index=click_index)) |
|
print(f"Click status for element {click_index}: {'β
Success' if click_result.success else 'β Failed'}") |
|
print(f"Message: {click_result.message}") |
|
print(f"URL after click: {click_result.url}") |
|
|
|
|
|
print(f"\n--- Retrieving elements after clicking element {click_index} ---") |
|
if click_result.elements and click_result.elements.strip(): |
|
print("Updated Elements:") |
|
print(click_result.elements) |
|
else: |
|
print("No formatted elements found after click.") |
|
|
|
if click_result.interactive_elements and len(click_result.interactive_elements) > 0: |
|
print("\nUpdated interactive elements summary:") |
|
for el in click_result.interactive_elements: |
|
print(f" [{el['index']}] <{el['tag_name']}> {el.get('text', '')[:30]}") |
|
else: |
|
print("No interactive elements found after click.") |
|
|
|
|
|
print("\n--- Testing Element Click (element 1 after clicking 5) ---") |
|
if click_result.element_count > 0: |
|
click_index_2 = 1 |
|
click_result_2 = await automation_service.click_element(ClickElementAction(index=click_index_2)) |
|
print(f"Click status for element {click_index_2}: {'β
Success' if click_result_2.success else 'β Failed'}") |
|
print(f"Message: {click_result_2.message}") |
|
print(f"URL after click: {click_result_2.url}") |
|
|
|
|
|
print(f"\n--- Retrieving elements after clicking element {click_index_2} ---") |
|
if click_result_2.elements and click_result_2.elements.strip(): |
|
print("Elements after second click:") |
|
print(click_result_2.elements) |
|
else: |
|
print("No formatted elements found after second click.") |
|
|
|
if click_result_2.interactive_elements and len(click_result_2.interactive_elements) > 0: |
|
print("\nInteractive elements summary after second click:") |
|
for el in click_result_2.interactive_elements: |
|
print(f" [{el['index']}] <{el['tag_name']}> {el.get('text', '')[:30]}") |
|
else: |
|
print("No interactive elements found after second click.") |
|
else: |
|
print("Skipping second element click test - no elements found after first click.") |
|
|
|
else: |
|
print("Skipping element click test - fewer than 5 elements found.") |
|
|
|
await asyncio.sleep(2) |
|
|
|
print("\nβ
Chess Page Test Completed!") |
|
await asyncio.sleep(100) |
|
|
|
except Exception as e: |
|
print(f"\nβ Chess Page Test failed: {str(e)}") |
|
traceback.print_exc() |
|
finally: |
|
|
|
print("\n--- Cleaning up ---") |
|
await automation_service.shutdown() |
|
print("Browser closed") |
|
|
|
if __name__ == '__main__': |
|
import uvicorn |
|
import sys |
|
|
|
|
|
test_mode_1 = "--test" in sys.argv |
|
test_mode_2 = "--test2" in sys.argv |
|
|
|
if test_mode_1: |
|
print("Running in test mode 1") |
|
asyncio.run(test_browser_api()) |
|
elif test_mode_2: |
|
print("Running in test mode 2 (Chess Page)") |
|
asyncio.run(test_browser_api_2()) |
|
else: |
|
print("Starting API server") |
|
uvicorn.run("browser_api:api_app", host="0.0.0.0", port=8002) |