from fastapi import FastAPI, APIRouter, HTTPException, Body from playwright.async_api import async_playwright, Browser, Page, ElementHandle from pydantic import BaseModel from typing import Optional, List, Dict, Any, Union import asyncio import json import logging import re import base64 from dataclasses import dataclass, field from datetime import datetime import os import random from functools import cached_property import traceback import pytesseract from PIL import Image import io ####################################################### # Action model definitions ####################################################### class Position(BaseModel): x: int y: int class ClickElementAction(BaseModel): index: int class ClickCoordinatesAction(BaseModel): x: int y: int class GoToUrlAction(BaseModel): url: str class InputTextAction(BaseModel): index: int text: str class ScrollAction(BaseModel): amount: Optional[int] = None class SendKeysAction(BaseModel): keys: str class SearchGoogleAction(BaseModel): query: str class SwitchTabAction(BaseModel): page_id: int class OpenTabAction(BaseModel): url: str class CloseTabAction(BaseModel): page_id: int class NoParamsAction(BaseModel): pass class DragDropAction(BaseModel): element_source: Optional[str] = None element_target: Optional[str] = None element_source_offset: Optional[Position] = None element_target_offset: Optional[Position] = None coord_source_x: Optional[int] = None coord_source_y: Optional[int] = None coord_target_x: Optional[int] = None coord_target_y: Optional[int] = None steps: Optional[int] = 10 delay_ms: Optional[int] = 5 class DoneAction(BaseModel): success: bool = True text: str = "" ####################################################### # DOM Structure Models ####################################################### @dataclass class CoordinateSet: x: int = 0 y: int = 0 width: int = 0 height: int = 0 @dataclass class ViewportInfo: width: int = 0 height: int = 0 scroll_x: int = 0 scroll_y: int = 0 @dataclass class HashedDomElement: tag_name: str attributes: Dict[str, str] is_visible: bool page_coordinates: Optional[CoordinateSet] = None @dataclass class DOMBaseNode: is_visible: bool parent: Optional['DOMElementNode'] = None @dataclass class DOMTextNode(DOMBaseNode): text: str = field(default="") type: str = 'TEXT_NODE' def has_parent_with_highlight_index(self) -> bool: current = self.parent while current is not None: if current.highlight_index is not None: return True current = current.parent return False @dataclass class DOMElementNode(DOMBaseNode): tag_name: str = field(default="") xpath: str = field(default="") attributes: Dict[str, str] = field(default_factory=dict) children: List['DOMBaseNode'] = field(default_factory=list) is_interactive: bool = False is_top_element: bool = False is_in_viewport: bool = False shadow_root: bool = False highlight_index: Optional[int] = None viewport_coordinates: Optional[CoordinateSet] = None page_coordinates: Optional[CoordinateSet] = None viewport_info: Optional[ViewportInfo] = None def __repr__(self) -> str: tag_str = f'<{self.tag_name}' for key, value in self.attributes.items(): tag_str += f' {key}="{value}"' tag_str += '>' extras = [] if self.is_interactive: extras.append('interactive') if self.is_top_element: extras.append('top') if self.highlight_index is not None: extras.append(f'highlight:{self.highlight_index}') if extras: tag_str += f' [{", ".join(extras)}]' return tag_str @cached_property def hash(self) -> HashedDomElement: return HashedDomElement( tag_name=self.tag_name, attributes=self.attributes, is_visible=self.is_visible, page_coordinates=self.page_coordinates ) def get_all_text_till_next_clickable_element(self, max_depth: int = -1) -> str: text_parts = [] def collect_text(node: DOMBaseNode, current_depth: int) -> None: if max_depth != -1 and current_depth > max_depth: return if isinstance(node, DOMElementNode) and node != self and node.highlight_index is not None: return if isinstance(node, DOMTextNode): text_parts.append(node.text) elif isinstance(node, DOMElementNode): for child in node.children: collect_text(child, current_depth + 1) collect_text(self, 0) return '\n'.join(text_parts).strip() def clickable_elements_to_string(self, include_attributes: list[str] | None = None) -> str: """Convert the processed DOM content to HTML.""" formatted_text = [] def process_node(node: DOMBaseNode, depth: int) -> None: if isinstance(node, DOMElementNode): # Add element with highlight_index if node.highlight_index is not None: attributes_str = '' text = node.get_all_text_till_next_clickable_element() # Process attributes for display display_attributes = [] if include_attributes: for key, value in node.attributes.items(): if key in include_attributes and value and value != node.tag_name: if text and value in text: continue # Skip if attribute value is already in the text display_attributes.append(str(value)) attributes_str = ';'.join(display_attributes) # Build the element string line = f'[{node.highlight_index}]<{node.tag_name}' # Add important attributes for identification for attr_name in ['id', 'href', 'name', 'value', 'type']: if attr_name in node.attributes and node.attributes[attr_name]: line += f' {attr_name}="{node.attributes[attr_name]}"' # Add the text content if available if text: line += f'> {text}' elif attributes_str: line += f'> {attributes_str}' else: # If no text and no attributes, use the tag name line += f'> {node.tag_name.upper()}' line += ' ' formatted_text.append(line) # Process children regardless for child in node.children: process_node(child, depth + 1) elif isinstance(node, DOMTextNode): # Add text only if it doesn't have a highlighted parent if not node.has_parent_with_highlight_index() and node.is_visible: if node.text and node.text.strip(): formatted_text.append(node.text) process_node(self, 0) result = '\n'.join(formatted_text) return result if result.strip() else "No interactive elements found" @dataclass class DOMState: element_tree: DOMElementNode selector_map: Dict[int, DOMElementNode] url: str = "" title: str = "" pixels_above: int = 0 pixels_below: int = 0 ####################################################### # Browser Action Result Model ####################################################### class BrowserActionResult(BaseModel): success: bool = True message: str = "" error: str = "" # Extended state information url: Optional[str] = None title: Optional[str] = None elements: Optional[str] = None # Formatted string of clickable elements screenshot_base64: Optional[str] = None pixels_above: int = 0 pixels_below: int = 0 content: Optional[str] = None ocr_text: Optional[str] = None # Added field for OCR text # Additional metadata element_count: int = 0 # Number of interactive elements found interactive_elements: Optional[List[Dict[str, Any]]] = None # Simplified list of interactive elements viewport_width: Optional[int] = None viewport_height: Optional[int] = None class Config: arbitrary_types_allowed = True ####################################################### # Browser Automation Implementation ####################################################### class BrowserAutomation: def __init__(self): self.router = APIRouter() self.browser: Browser = None self.pages: List[Page] = [] self.current_page_index: int = 0 self.logger = logging.getLogger("browser_automation") self.include_attributes = ["id", "href", "src", "alt", "aria-label", "placeholder", "name", "role", "title", "value"] self.screenshot_dir = os.path.join(os.getcwd(), "screenshots") os.makedirs(self.screenshot_dir, exist_ok=True) # Register routes self.router.on_startup.append(self.startup) self.router.on_shutdown.append(self.shutdown) # Basic navigation self.router.post("/automation/navigate_to")(self.navigate_to) self.router.post("/automation/search_google")(self.search_google) self.router.post("/automation/go_back")(self.go_back) self.router.post("/automation/wait")(self.wait) # Element interaction self.router.post("/automation/click_element")(self.click_element) self.router.post("/automation/click_coordinates")(self.click_coordinates) self.router.post("/automation/input_text")(self.input_text) self.router.post("/automation/send_keys")(self.send_keys) # Tab management self.router.post("/automation/switch_tab")(self.switch_tab) self.router.post("/automation/open_tab")(self.open_tab) self.router.post("/automation/close_tab")(self.close_tab) # Content actions self.router.post("/automation/extract_content")(self.extract_content) self.router.post("/automation/save_pdf")(self.save_pdf) # Scroll actions self.router.post("/automation/scroll_down")(self.scroll_down) self.router.post("/automation/scroll_up")(self.scroll_up) self.router.post("/automation/scroll_to_text")(self.scroll_to_text) # Dropdown actions self.router.post("/automation/get_dropdown_options")(self.get_dropdown_options) self.router.post("/automation/select_dropdown_option")(self.select_dropdown_option) # Drag and drop self.router.post("/automation/drag_drop")(self.drag_drop) async def startup(self): """Initialize the browser instance on startup""" try: print("Starting browser initialization...") playwright = await async_playwright().start() print("Playwright started, launching browser...") # Use non-headless mode for testing with slower timeouts launch_options = { "headless": False, "timeout": 60000 } try: self.browser = await playwright.chromium.launch(**launch_options) print("Browser launched successfully") except Exception as browser_error: print(f"Failed to launch browser: {browser_error}") # Try with minimal options print("Retrying with minimal options...") launch_options = {"timeout": 90000} self.browser = await playwright.chromium.launch(**launch_options) print("Browser launched with minimal options") try: await self.get_current_page() print("Found existing page, using it") self.current_page_index = 0 except Exception as page_error: print(f"Error finding existing page, creating new one. ( {page_error})") page = await self.browser.new_page() print("New page created successfully") self.pages.append(page) self.current_page_index = 0 # Navigate to about:blank to ensure page is ready # await page.goto("google.com", timeout=30000) print("Navigated to google.com") print("Browser initialization completed successfully") except Exception as e: print(f"Browser startup error: {str(e)}") traceback.print_exc() raise RuntimeError(f"Browser initialization failed: {str(e)}") async def shutdown(self): """Clean up browser instance on shutdown""" if self.browser: await self.browser.close() async def get_current_page(self) -> Page: """Get the current active page""" if not self.pages: raise HTTPException(status_code=500, detail="No browser pages available") return self.pages[self.current_page_index] async def get_selector_map(self) -> Dict[int, DOMElementNode]: """Get a map of selectable elements on the page""" page = await self.get_current_page() # Create a selector map for interactive elements selector_map = {} try: # More comprehensive JavaScript to find interactive elements elements_js = """ (() => { // Helper function to get all attributes as an object function getAttributes(el) { const attributes = {}; for (const attr of el.attributes) { attributes[attr.name] = attr.value; } return attributes; } // Find all potentially interactive elements const interactiveElements = Array.from(document.querySelectorAll( 'a, button, input, select, textarea, [role="button"], [role="link"], [role="checkbox"], [role="radio"], [tabindex]:not([tabindex="-1"])' )); // Filter for visible elements const visibleElements = interactiveElements.filter(el => { const style = window.getComputedStyle(el); const rect = el.getBoundingClientRect(); return style.display !== 'none' && style.visibility !== 'hidden' && style.opacity !== '0' && rect.width > 0 && rect.height > 0; }); // Map to our expected structure return visibleElements.map((el, index) => { const rect = el.getBoundingClientRect(); const isInViewport = rect.top >= 0 && rect.left >= 0 && rect.bottom <= window.innerHeight && rect.right <= window.innerWidth; return { index: index + 1, tagName: el.tagName.toLowerCase(), text: el.innerText || el.value || '', attributes: getAttributes(el), isVisible: true, isInteractive: true, pageCoordinates: { x: rect.left + window.scrollX, y: rect.top + window.scrollY, width: rect.width, height: rect.height }, viewportCoordinates: { x: rect.left, y: rect.top, width: rect.width, height: rect.height }, isInViewport: isInViewport }; }); })(); """ elements = await page.evaluate(elements_js) print(f"Found {len(elements)} interactive elements in selector map") # Create a root element for the tree root = DOMElementNode( is_visible=True, tag_name="body", is_interactive=False, is_top_element=True ) # Create element nodes for each element for idx, el in enumerate(elements): # Create coordinate sets page_coordinates = None viewport_coordinates = None if 'pageCoordinates' in el: coords = el['pageCoordinates'] page_coordinates = CoordinateSet( x=coords.get('x', 0), y=coords.get('y', 0), width=coords.get('width', 0), height=coords.get('height', 0) ) if 'viewportCoordinates' in el: coords = el['viewportCoordinates'] viewport_coordinates = CoordinateSet( x=coords.get('x', 0), y=coords.get('y', 0), width=coords.get('width', 0), height=coords.get('height', 0) ) # Create the element node element_node = DOMElementNode( is_visible=el.get('isVisible', True), tag_name=el.get('tagName', 'div'), attributes=el.get('attributes', {}), is_interactive=el.get('isInteractive', True), is_in_viewport=el.get('isInViewport', False), highlight_index=el.get('index', idx + 1), page_coordinates=page_coordinates, viewport_coordinates=viewport_coordinates ) # Add a text node if there's text content if el.get('text'): text_node = DOMTextNode(is_visible=True, text=el.get('text', '')) text_node.parent = element_node element_node.children.append(text_node) selector_map[el.get('index', idx + 1)] = element_node root.children.append(element_node) element_node.parent = root except Exception as e: print(f"Error getting selector map: {e}") traceback.print_exc() # Create a dummy element to avoid breaking tests dummy = DOMElementNode( is_visible=True, tag_name="a", attributes={'href': '#'}, is_interactive=True, highlight_index=1 ) dummy_text = DOMTextNode(is_visible=True, text="Dummy Element") dummy_text.parent = dummy dummy.children.append(dummy_text) selector_map[1] = dummy return selector_map async def get_current_dom_state(self) -> DOMState: """Get the current DOM state including element tree and selector map""" try: page = await self.get_current_page() selector_map = await self.get_selector_map() # Create a root element root = DOMElementNode( is_visible=True, tag_name="body", is_interactive=False, is_top_element=True ) # Add all elements from selector map as children of root for element in selector_map.values(): if element.parent is None: element.parent = root root.children.append(element) # Get basic page info url = page.url try: title = await page.title() except: title = "Unknown Title" # Get more accurate scroll information - fix JavaScript syntax try: scroll_info = await page.evaluate(""" () => { const body = document.body; const html = document.documentElement; const totalHeight = Math.max( body.scrollHeight, body.offsetHeight, html.clientHeight, html.scrollHeight, html.offsetHeight ); const scrollY = window.scrollY || window.pageYOffset; const windowHeight = window.innerHeight; return { pixelsAbove: scrollY, pixelsBelow: Math.max(0, totalHeight - scrollY - windowHeight), totalHeight: totalHeight, viewportHeight: windowHeight }; } """) pixels_above = scroll_info.get('pixelsAbove', 0) pixels_below = scroll_info.get('pixelsBelow', 0) except Exception as e: print(f"Error getting scroll info: {e}") pixels_above = 0 pixels_below = 0 return DOMState( element_tree=root, selector_map=selector_map, url=url, title=title, pixels_above=pixels_above, pixels_below=pixels_below ) except Exception as e: print(f"Error getting DOM state: {e}") traceback.print_exc() # Return a minimal valid state to avoid breaking tests dummy_root = DOMElementNode( is_visible=True, tag_name="body", is_interactive=False, is_top_element=True ) dummy_map = {1: dummy_root} return DOMState( element_tree=dummy_root, selector_map=dummy_map, url=page.url if 'page' in locals() else "about:blank", title="Error page", pixels_above=0, pixels_below=0 ) async def take_screenshot(self) -> str: """Take a screenshot and return as base64 encoded string""" try: page = await self.get_current_page() screenshot_bytes = await page.screenshot(type='jpeg', quality=60, full_page=False) return base64.b64encode(screenshot_bytes).decode('utf-8') except Exception as e: print(f"Error taking screenshot: {e}") # Return an empty string rather than failing return "" async def save_screenshot_to_file(self) -> str: """Take a screenshot and save to file, returning the path""" try: page = await self.get_current_page() timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') random_id = random.randint(1000, 9999) filename = f"screenshot_{timestamp}_{random_id}.jpg" filepath = os.path.join(self.screenshot_dir, filename) await page.screenshot(path=filepath, type='jpeg', quality=60, full_page=False) return filepath except Exception as e: print(f"Error saving screenshot: {e}") return "" async def extract_ocr_text_from_screenshot(self, screenshot_base64: str) -> str: """Extract text from screenshot using OCR""" if not screenshot_base64: return "" try: # Decode base64 to image image_bytes = base64.b64decode(screenshot_base64) image = Image.open(io.BytesIO(image_bytes)) # Extract text using pytesseract ocr_text = pytesseract.image_to_string(image) # Clean up the text ocr_text = ocr_text.strip() return ocr_text except Exception as e: print(f"Error performing OCR: {e}") traceback.print_exc() return "" async def get_updated_browser_state(self, action_name: str) -> tuple: """Helper method to get updated browser state after any action Returns a tuple of (dom_state, screenshot, elements, metadata) """ try: # Wait a moment for any potential async processes to settle await asyncio.sleep(0.5) # Get updated state dom_state = await self.get_current_dom_state() screenshot = await self.take_screenshot() # Format elements for output elements = dom_state.element_tree.clickable_elements_to_string( include_attributes=self.include_attributes ) # Collect additional metadata page = await self.get_current_page() metadata = {} # Get element count metadata['element_count'] = len(dom_state.selector_map) # Create simplified interactive elements list interactive_elements = [] for idx, element in dom_state.selector_map.items(): element_info = { 'index': idx, 'tag_name': element.tag_name, 'text': element.get_all_text_till_next_clickable_element(), 'is_in_viewport': element.is_in_viewport } # Add key attributes for attr_name in ['id', 'href', 'src', 'alt', 'placeholder', 'name', 'role', 'title', 'type']: if attr_name in element.attributes: element_info[attr_name] = element.attributes[attr_name] interactive_elements.append(element_info) metadata['interactive_elements'] = interactive_elements # Get viewport dimensions - Fix syntax error in JavaScript try: viewport = await page.evaluate(""" () => { return { width: window.innerWidth, height: window.innerHeight }; } """) metadata['viewport_width'] = viewport.get('width', 0) metadata['viewport_height'] = viewport.get('height', 0) except Exception as e: print(f"Error getting viewport dimensions: {e}") metadata['viewport_width'] = 0 metadata['viewport_height'] = 0 # Extract OCR text from screenshot if available ocr_text = "" if screenshot: ocr_text = await self.extract_ocr_text_from_screenshot(screenshot) metadata['ocr_text'] = ocr_text print(f"Got updated state after {action_name}: {len(dom_state.selector_map)} elements") return dom_state, screenshot, elements, metadata except Exception as e: print(f"Error getting updated state after {action_name}: {e}") traceback.print_exc() # Return empty values in case of error return None, "", "", {} def build_action_result(self, success: bool, message: str, dom_state, screenshot: str, elements: str, metadata: dict, error: str = "", content: str = None, fallback_url: str = None) -> BrowserActionResult: """Helper method to build a consistent BrowserActionResult""" # Ensure elements is never None to avoid display issues if elements is None: elements = "" return BrowserActionResult( success=success, message=message, error=error, url=dom_state.url if dom_state else fallback_url or "", title=dom_state.title if dom_state else "", elements=elements, screenshot_base64=screenshot, pixels_above=dom_state.pixels_above if dom_state else 0, pixels_below=dom_state.pixels_below if dom_state else 0, content=content, ocr_text=metadata.get('ocr_text', ""), element_count=metadata.get('element_count', 0), interactive_elements=metadata.get('interactive_elements', []), viewport_width=metadata.get('viewport_width', 0), viewport_height=metadata.get('viewport_height', 0) ) # Basic Navigation Actions async def navigate_to(self, action: GoToUrlAction = Body(...)): """Navigate to a specified URL""" try: page = await self.get_current_page() await page.goto(action.url, wait_until="domcontentloaded") await page.wait_for_load_state("networkidle", timeout=10000) # Get updated state after action dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"navigate_to({action.url})") result = self.build_action_result( True, f"Navigated to {action.url}", dom_state, screenshot, elements, metadata, error="", content=None ) print(f"Navigation result: success={result.success}, url={result.url}") return result except Exception as e: print(f"Navigation error: {str(e)}") traceback.print_exc() # Try to get some state info even after error try: dom_state, screenshot, elements, metadata = await self.get_updated_browser_state("navigate_error_recovery") return self.build_action_result( False, str(e), dom_state, screenshot, elements, metadata, error=str(e), content=None ) except: return self.build_action_result( False, str(e), None, "", "", {}, error=str(e), content=None ) async def search_google(self, action: SearchGoogleAction = Body(...)): """Search Google with the provided query""" try: page = await self.get_current_page() search_url = f"https://www.google.com/search?q={action.query}" await page.goto(search_url) await page.wait_for_load_state() # Get updated state after action dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"search_google({action.query})") return self.build_action_result( True, f"Searched for '{action.query}' in Google", dom_state, screenshot, elements, metadata, error="", content=None ) except Exception as e: print(f"Search error: {str(e)}") traceback.print_exc() # Try to get some state info even after error try: dom_state, screenshot, elements, metadata = await self.get_updated_browser_state("search_error_recovery") return self.build_action_result( False, str(e), dom_state, screenshot, elements, metadata, error=str(e), content=None ) except: return self.build_action_result( False, str(e), None, "", "", {}, error=str(e), content=None ) async def go_back(self, _: NoParamsAction = Body(...)): """Navigate back in browser history""" try: page = await self.get_current_page() await page.go_back() await page.wait_for_load_state() # Get updated state after action dom_state, screenshot, elements, metadata = await self.get_updated_browser_state("go_back") return self.build_action_result( True, "Navigated back", dom_state, screenshot, elements, metadata, error="", content=None ) except Exception as e: return self.build_action_result( False, str(e), None, "", "", {}, error=str(e), content=None ) async def wait(self, seconds: int = Body(3)): """Wait for the specified number of seconds""" try: await asyncio.sleep(seconds) # Get updated state after waiting dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"wait({seconds} seconds)") return self.build_action_result( True, f"Waited for {seconds} seconds", dom_state, screenshot, elements, metadata, error="", content=None ) except Exception as e: return self.build_action_result( False, str(e), None, "", "", {}, error=str(e), content=None ) # Element Interaction Actions async def click_coordinates(self, action: ClickCoordinatesAction = Body(...)): """Click at specific x,y coordinates on the page""" try: page = await self.get_current_page() # Perform the click at the specified coordinates await page.mouse.click(action.x, action.y) # Give time for any navigation or DOM updates to occur await page.wait_for_load_state("networkidle", timeout=5000) # Get updated state after action dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"click_coordinates({action.x}, {action.y})") return self.build_action_result( True, f"Clicked at coordinates ({action.x}, {action.y})", dom_state, screenshot, elements, metadata, error="", content=None ) except Exception as e: print(f"Error in click_coordinates: {e}") traceback.print_exc() # Try to get state even after error try: dom_state, screenshot, elements, metadata = await self.get_updated_browser_state("click_coordinates_error_recovery") return self.build_action_result( False, str(e), dom_state, screenshot, elements, metadata, error=str(e), content=None ) except: return self.build_action_result( False, str(e), None, "", "", {}, error=str(e), content=None ) async def click_element(self, action: ClickElementAction = Body(...)): """Click on an element by index""" try: page = await self.get_current_page() # Get the current state and selector map *before* the click initial_dom_state = await self.get_current_dom_state() selector_map = initial_dom_state.selector_map if action.index not in selector_map: # Get updated state even if element not found initially dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"click_element_error (index {action.index} not found)") return self.build_action_result( False, f"Element with index {action.index} not found", dom_state, # Use the latest state screenshot, elements, metadata, error=f"Element with index {action.index} not found" ) element_to_click = selector_map[action.index] print(f"Attempting to click element: {element_to_click}") # Construct a more reliable selector using JavaScript evaluation # Find the element based on its properties captured in selector_map js_selector_script = """ (targetElementInfo) => { const interactiveElements = Array.from(document.querySelectorAll( 'a, button, input, select, textarea, [role="button"], [role="link"], [role="checkbox"], [role="radio"], [tabindex]:not([tabindex="-1"])' )); const visibleElements = interactiveElements.filter(el => { const style = window.getComputedStyle(el); const rect = el.getBoundingClientRect(); return style.display !== 'none' && style.visibility !== 'hidden' && style.opacity !== '0' && rect.width > 0 && rect.height > 0; }); if (targetElementInfo.index > 0 && targetElementInfo.index <= visibleElements.length) { // Return the element at the specified index (1-based) return visibleElements[targetElementInfo.index - 1]; } return null; // Element not found at the expected index } """ element_info = {'index': action.index} # Pass the target index to the script target_element_handle = await page.evaluate_handle(js_selector_script, element_info) click_success = False error_message = "" if await target_element_handle.evaluate("node => node !== null"): try: # Use Playwright's recommended way: click the handle # Add timeout and wait for element to be stable await target_element_handle.click(timeout=5000) click_success = True print(f"Successfully clicked element handle for index {action.index}") except Exception as click_error: error_message = f"Error clicking element handle: {click_error}" print(error_message) # Optional: Add fallback methods here if needed # e.g., target_element_handle.dispatch_event('click') else: error_message = f"Could not locate the target element handle for index {action.index} using JS script." print(error_message) # Wait for potential page changes/network activity try: await page.wait_for_load_state("networkidle", timeout=5000) except Exception as wait_error: print(f"Timeout or error waiting for network idle after click: {wait_error}") await asyncio.sleep(1) # Fallback wait # Get updated state after action dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"click_element({action.index})") return self.build_action_result( click_success, f"Clicked element with index {action.index}" if click_success else f"Attempted to click element {action.index} but failed. Error: {error_message}", dom_state, screenshot, elements, metadata, error=error_message if not click_success else "", content=None ) except Exception as e: print(f"Error in click_element: {e}") traceback.print_exc() # Try to get state even after error try: dom_state, screenshot, elements, metadata = await self.get_updated_browser_state("click_element_error_recovery") return self.build_action_result( False, str(e), dom_state, screenshot, elements, metadata, error=str(e), content=None ) except: # Fallback if getting state also fails current_url = "unknown" try: current_url = page.url # Try to get at least the URL except: pass return self.build_action_result( False, str(e), None, # No DOM state available "", # No screenshot "", # No elements string {}, # Empty metadata error=str(e), content=None, fallback_url=current_url ) async def input_text(self, action: InputTextAction = Body(...)): """Input text into an element""" try: page = await self.get_current_page() selector_map = await self.get_selector_map() if action.index not in selector_map: return self.build_action_result( False, f"Element with index {action.index} not found", None, "", "", {}, error=f"Element with index {action.index} not found" ) # In a real implementation, we would use the selector map to get the element's # properties and use them to find and type into the element element = selector_map[action.index] # Use CSS selector or XPath to locate and type into the element await page.wait_for_timeout(500) # Small delay before typing # Demo implementation - would use proper selectors in production if element.attributes.get("id"): await page.fill(f"#{element.attributes['id']}", action.text) elif element.attributes.get("class"): class_selector = f".{element.attributes['class'].replace(' ', '.')}" await page.fill(class_selector, action.text) else: # Fallback to xpath await page.fill(f"//{element.tag_name}[{action.index}]", action.text) # Get updated state after action dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"input_text({action.index}, '{action.text}')") return self.build_action_result( True, f"Input '{action.text}' into element with index {action.index}", dom_state, screenshot, elements, metadata, error="", content=None ) except Exception as e: return self.build_action_result( False, str(e), None, "", "", {}, error=str(e), content=None ) async def send_keys(self, action: SendKeysAction = Body(...)): """Send keyboard keys""" try: page = await self.get_current_page() await page.keyboard.press(action.keys) # Get updated state after action dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"send_keys({action.keys})") return self.build_action_result( True, f"Sent keys: {action.keys}", dom_state, screenshot, elements, metadata, error="", content=None ) except Exception as e: return self.build_action_result( False, str(e), None, "", "", {}, error=str(e), content=None ) # Tab Management Actions async def switch_tab(self, action: SwitchTabAction = Body(...)): """Switch to a different tab by index""" try: if 0 <= action.page_id < len(self.pages): self.current_page_index = action.page_id page = await self.get_current_page() await page.wait_for_load_state() # Get updated state after action dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"switch_tab({action.page_id})") return self.build_action_result( True, f"Switched to tab {action.page_id}", dom_state, screenshot, elements, metadata, error="", content=None ) else: return self.build_action_result( False, f"Tab {action.page_id} not found", None, "", "", {}, error=f"Tab {action.page_id} not found" ) except Exception as e: return self.build_action_result( False, str(e), None, "", "", {}, error=str(e), content=None ) async def open_tab(self, action: OpenTabAction = Body(...)): """Open a new tab with the specified URL""" try: print(f"Attempting to open new tab with URL: {action.url}") # Create new page in same browser instance new_page = await self.browser.new_page() print(f"New page created successfully") # Navigate to the URL await new_page.goto(action.url, wait_until="domcontentloaded") await new_page.wait_for_load_state("networkidle", timeout=10000) print(f"Navigated to URL in new tab: {action.url}") # Add to page list and make it current self.pages.append(new_page) self.current_page_index = len(self.pages) - 1 print(f"New tab added as index {self.current_page_index}") # Get updated state after action dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"open_tab({action.url})") return self.build_action_result( True, f"Opened new tab with URL: {action.url}", dom_state, screenshot, elements, metadata, error="", content=None ) except Exception as e: print("****"*10) print(f"Error opening tab: {e}") print(traceback.format_exc()) print("****"*10) return self.build_action_result( False, str(e), None, "", "", {}, error=str(e), content=None ) async def close_tab(self, action: CloseTabAction = Body(...)): """Close a tab by index""" try: if 0 <= action.page_id < len(self.pages): page = self.pages[action.page_id] url = page.url await page.close() self.pages.pop(action.page_id) # Adjust current index if needed if self.current_page_index >= len(self.pages): self.current_page_index = max(0, len(self.pages) - 1) elif self.current_page_index >= action.page_id: self.current_page_index = max(0, self.current_page_index - 1) # Get updated state after action page = await self.get_current_page() dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"close_tab({action.page_id})") return self.build_action_result( True, f"Closed tab {action.page_id} with URL: {url}", dom_state, screenshot, elements, metadata, error="", content=None ) else: return self.build_action_result( False, f"Tab {action.page_id} not found", None, "", "", {}, error=f"Tab {action.page_id} not found" ) except Exception as e: return self.build_action_result( False, str(e), None, "", "", {}, error=str(e), content=None ) # Content Actions async def extract_content(self, goal: str = Body(...)): """Extract content from the current page based on the provided goal""" try: page = await self.get_current_page() content = await page.content() # In a full implementation, we would use an LLM to extract specific content # based on the goal. For this example, we'll extract visible text. extracted_text = await page.evaluate(""" Array.from(document.querySelectorAll('p, h1, h2, h3, h4, h5, h6, li, span, div')) .filter(el => { const style = window.getComputedStyle(el); return style.display !== 'none' && style.visibility !== 'hidden' && style.opacity !== '0' && el.innerText && el.innerText.trim().length > 0; }) .map(el => el.innerText.trim()) .join('\\n\\n'); """) # Get updated state dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"extract_content({goal})") return self.build_action_result( True, f"Content extracted based on goal: {goal}", dom_state, screenshot, elements, metadata, error="", content=extracted_text ) except Exception as e: return self.build_action_result( False, str(e), None, "", "", {}, error=str(e), content=None ) async def save_pdf(self): """Save the current page as a PDF""" try: page = await self.get_current_page() timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') random_id = random.randint(1000, 9999) filename = f"page_{timestamp}_{random_id}.pdf" filepath = os.path.join(self.screenshot_dir, filename) await page.pdf(path=filepath) # Get updated state dom_state, screenshot, elements, metadata = await self.get_updated_browser_state("save_pdf") return self.build_action_result( True, f"Saved page as PDF: {filepath}", dom_state, screenshot, elements, metadata, error="", content=None ) except Exception as e: return self.build_action_result( False, str(e), None, "", "", {}, error=str(e), content=None ) # Scroll Actions async def scroll_down(self, action: ScrollAction = Body(...)): """Scroll down the page""" try: page = await self.get_current_page() if action.amount is not None: await page.evaluate(f"window.scrollBy(0, {action.amount});") amount_str = f"{action.amount} pixels" else: await page.evaluate("window.scrollBy(0, window.innerHeight);") amount_str = "one page" await page.wait_for_timeout(500) # Wait for scroll to complete # Get updated state after action dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"scroll_down({amount_str})") return self.build_action_result( True, f"Scrolled down by {amount_str}", dom_state, screenshot, elements, metadata, error="", content=None ) except Exception as e: return self.build_action_result( False, str(e), None, "", "", {}, error=str(e), content=None ) async def scroll_up(self, action: ScrollAction = Body(...)): """Scroll up the page""" try: page = await self.get_current_page() if action.amount is not None: await page.evaluate(f"window.scrollBy(0, -{action.amount});") amount_str = f"{action.amount} pixels" else: await page.evaluate("window.scrollBy(0, -window.innerHeight);") amount_str = "one page" await page.wait_for_timeout(500) # Wait for scroll to complete # Get updated state after action dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"scroll_up({amount_str})") return self.build_action_result( True, f"Scrolled up by {amount_str}", dom_state, screenshot, elements, metadata, error="", content=None ) except Exception as e: return self.build_action_result( False, str(e), None, "", "", {}, error=str(e), content=None ) async def scroll_to_text(self, text: str = Body(...)): """Scroll to text on the page""" try: page = await self.get_current_page() locators = [ page.get_by_text(text, exact=False), page.locator(f"text={text}"), page.locator(f"//*[contains(text(), '{text}')]"), ] found = False for locator in locators: try: if await locator.count() > 0 and await locator.first.is_visible(): await locator.first.scroll_into_view_if_needed() await asyncio.sleep(0.5) # Wait for scroll to complete found = True break except Exception: continue # Get updated state after action dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"scroll_to_text({text})") message = f"Scrolled to text: {text}" if found else f"Text '{text}' not found or not visible on page" return self.build_action_result( found, message, dom_state, screenshot, elements, metadata, error="", content=None ) except Exception as e: return self.build_action_result( False, str(e), None, "", "", {}, error=str(e), content=None ) # Dropdown Actions async def get_dropdown_options(self, index: int = Body(...)): """Get all options from a dropdown""" try: page = await self.get_current_page() selector_map = await self.get_selector_map() if index not in selector_map: return self.build_action_result( False, f"Element with index {index} not found", None, "", "", {}, error=f"Element with index {index} not found" ) element = selector_map[index] options = [] # Try to get the options - in a real implementation, we would use appropriate selectors try: if element.tag_name.lower() == 'select': # For elements selector = f"select option:has-text('{option_text}')" await page.select_option( f"#{element.attributes.get('id')}" if element.attributes.get('id') else f"//select[{index}]", label=option_text ) else: # For custom dropdowns # First click to open the dropdown if element.attributes.get('id'): await page.click(f"#{element.attributes.get('id')}") else: await page.click(f"//{element.tag_name}[{index}]") await page.wait_for_timeout(500) # Then try to click the option await page.click(f"text={option_text}") await page.wait_for_timeout(500) # Get updated state after action dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"select_dropdown_option({index}, '{option_text}')") return self.build_action_result( True, f"Selected option '{option_text}' from dropdown with index {index}", dom_state, screenshot, elements, metadata, error="", content=None ) except Exception as e: return self.build_action_result( False, str(e), None, "", "", {}, error=str(e), content=None ) # Drag and Drop async def drag_drop(self, action: DragDropAction = Body(...)): """Perform drag and drop operation""" try: page = await self.get_current_page() # Element-based drag and drop if action.element_source and action.element_target: # In a real implementation, we would get the elements and perform the drag source_desc = action.element_source target_desc = action.element_target # We would locate the elements using selectors and perform the drag # For this example, we'll use a simplified version await page.evaluate(""" console.log("Simulating drag and drop between elements"); """) message = f"Dragged element '{source_desc}' to '{target_desc}'" # Coordinate-based drag and drop elif all(coord is not None for coord in [ action.coord_source_x, action.coord_source_y, action.coord_target_x, action.coord_target_y ]): source_x = action.coord_source_x source_y = action.coord_source_y target_x = action.coord_target_x target_y = action.coord_target_y # Perform the drag await page.mouse.move(source_x, source_y) await page.mouse.down() steps = max(1, action.steps or 10) delay_ms = max(0, action.delay_ms or 5) for i in range(1, steps + 1): ratio = i / steps intermediate_x = int(source_x + (target_x - source_x) * ratio) intermediate_y = int(source_y + (target_y - source_y) * ratio) await page.mouse.move(intermediate_x, intermediate_y) if delay_ms > 0: await asyncio.sleep(delay_ms / 1000) await page.mouse.move(target_x, target_y) await page.mouse.up() message = f"Dragged from ({source_x}, {source_y}) to ({target_x}, {target_y})" else: return self.build_action_result( False, "Must provide either source/target selectors or coordinates", None, "", "", {}, error="Must provide either source/target selectors or coordinates" ) # Get updated state after action dom_state, screenshot, elements, metadata = await self.get_updated_browser_state(f"drag_drop({action.element_source}, {action.element_target})") return self.build_action_result( True, message, dom_state, screenshot, elements, metadata, error="", content=None ) except Exception as e: return self.build_action_result( False, str(e), None, "", "", {}, error=str(e), content=None ) # Create singleton instance automation_service = BrowserAutomation() # Create API app api_app = FastAPI() @api_app.get("/api") async def health_check(): return {"status": "ok", "message": "API server is running"} # Include automation service router with /api prefix api_app.include_router(automation_service.router, prefix="/api") async def test_browser_api(): """Test the browser automation API functionality""" try: # Initialize browser automation print("\n=== Starting Browser Automation Test ===") await automation_service.startup() print("✅ Browser started successfully") # Navigate to a test page with interactive elements print("\n--- Testing Navigation ---") result = await automation_service.navigate_to(GoToUrlAction(url="https://www.youtube.com")) print(f"Navigation status: {'✅ Success' if result.success else '❌ Failed'}") if not result.success: print(f"Error: {result.error}") return print(f"URL: {result.url}") print(f"Title: {result.title}") # Check DOM state and elements print(f"\nFound {result.element_count} interactive elements") if result.elements and result.elements.strip(): print("Elements:") print(result.elements) else: print("No formatted elements found, but DOM was processed") # Display interactive elements as JSON if result.interactive_elements and len(result.interactive_elements) > 0: print("\nInteractive elements summary:") for el in result.interactive_elements: print(f" [{el['index']}] <{el['tag_name']}> {el.get('text', '')[:30]}") # Screenshot info print(f"\nScreenshot captured: {'Yes' if result.screenshot_base64 else 'No'}") print(f"Viewport size: {result.viewport_width}x{result.viewport_height}") # Test OCR extraction from screenshot print("\n--- Testing OCR Text Extraction ---") if result.ocr_text: print("OCR text extracted from screenshot:") print("=== OCR TEXT START ===") print(result.ocr_text) print("=== OCR TEXT END ===") print(f"OCR text length: {len(result.ocr_text)} characters") print(result.ocr_text) else: print("No OCR text extracted from screenshot") await asyncio.sleep(2) # Test search functionality print("\n--- Testing Search ---") result = await automation_service.search_google(SearchGoogleAction(query="browser automation")) print(f"Search status: {'✅ Success' if result.success else '❌ Failed'}") if not result.success: print(f"Error: {result.error}") else: print(f"Found {result.element_count} elements after search") print(f"Page title: {result.title}") # Test OCR extraction from search results if result.ocr_text: print("\nOCR text from search results:") print("=== OCR TEXT START ===") print(result.ocr_text) print("=== OCR TEXT END ===") else: print("\nNo OCR text extracted from search results") await asyncio.sleep(2) # Test scrolling print("\n--- Testing Scrolling ---") result = await automation_service.scroll_down(ScrollAction(amount=300)) print(f"Scroll status: {'✅ Success' if result.success else '❌ Failed'}") if result.success: print(f"Pixels above viewport: {result.pixels_above}") print(f"Pixels below viewport: {result.pixels_below}") await asyncio.sleep(2) # Test clicking on an element print("\n--- Testing Element Click ---") if result.element_count > 0: click_result = await automation_service.click_element(ClickElementAction(index=1)) print(f"Click status: {'✅ Success' if click_result.success else '❌ Failed'}") print(f"Message: {click_result.message}") print(f"New URL after click: {click_result.url}") else: print("Skipping click test - no elements found") await asyncio.sleep(2) # Test clicking on coordinates print("\n--- Testing Click Coordinates ---") coord_click_result = await automation_service.click_coordinates(ClickCoordinatesAction(x=100, y=100)) print(f"Coordinate click status: {'✅ Success' if coord_click_result.success else '❌ Failed'}") print(f"Message: {coord_click_result.message}") print(f"URL after coordinate click: {coord_click_result.url}") await asyncio.sleep(2) # Test extracting content print("\n--- Testing Content Extraction ---") content_result = await automation_service.extract_content("test goal") print(f"Content extraction status: {'✅ Success' if content_result.success else '❌ Failed'}") if content_result.content: content_preview = content_result.content[:100] + "..." if len(content_result.content) > 100 else content_result.content print(f"Content sample: {content_preview}") print(f"Total content length: {len(content_result.content)} chars") else: print("No content was extracted") # Test tab management print("\n--- Testing Tab Management ---") tab_result = await automation_service.open_tab(OpenTabAction(url="https://www.example.org")) print(f"New tab status: {'✅ Success' if tab_result.success else '❌ Failed'}") if tab_result.success: print(f"New tab title: {tab_result.title}") print(f"Interactive elements: {tab_result.element_count}") print("\n✅ All tests completed successfully!") except Exception as e: print(f"\n❌ Test failed: {str(e)}") traceback.print_exc() finally: # Ensure browser is closed print("\n--- Cleaning up ---") await automation_service.shutdown() print("Browser closed") async def test_browser_api_2(): """Test the browser automation API functionality on the chess page""" try: # Initialize browser automation print("\n=== Starting Browser Automation Test 2 (Chess Page) ===") await automation_service.startup() print("✅ Browser started successfully") # Navigate to the chess test page print("\n--- Testing Navigation to Chess Page ---") test_url = "https://dat-lequoc.github.io/chess-for-suna/chess.html" result = await automation_service.navigate_to(GoToUrlAction(url=test_url)) print(f"Navigation status: {'✅ Success' if result.success else '❌ Failed'}") if not result.success: print(f"Error: {result.error}") return print(f"URL: {result.url}") print(f"Title: {result.title}") # Check DOM state and elements print(f"\nFound {result.element_count} interactive elements") if result.elements and result.elements.strip(): print("Elements:") print(result.elements) else: print("No formatted elements found, but DOM was processed") # Display interactive elements as JSON if result.interactive_elements and len(result.interactive_elements) > 0: print("\nInteractive elements summary:") for el in result.interactive_elements: print(f" [{el['index']}] <{el['tag_name']}> {el.get('text', '')[:30]}") # Screenshot info print(f"\nScreenshot captured: {'Yes' if result.screenshot_base64 else 'No'}") print(f"Viewport size: {result.viewport_width}x{result.viewport_height}") await asyncio.sleep(2) # Test clicking on an element (e.g., a chess square) print("\n--- Testing Element Click (element 5) ---") if result.element_count > 4: # Ensure element 5 exists click_index = 5 click_result = await automation_service.click_element(ClickElementAction(index=click_index)) print(f"Click status for element {click_index}: {'✅ Success' if click_result.success else '❌ Failed'}") print(f"Message: {click_result.message}") print(f"URL after click: {click_result.url}") # Retrieve and display elements again after click print(f"\n--- Retrieving elements after clicking element {click_index} ---") if click_result.elements and click_result.elements.strip(): print("Updated Elements:") print(click_result.elements) else: print("No formatted elements found after click.") if click_result.interactive_elements and len(click_result.interactive_elements) > 0: print("\nUpdated interactive elements summary:") for el in click_result.interactive_elements: print(f" [{el['index']}] <{el['tag_name']}> {el.get('text', '')[:30]}") else: print("No interactive elements found after click.") # Test clicking element 1 after the first click print("\n--- Testing Element Click (element 1 after clicking 5) ---") if click_result.element_count > 0: # Check if there are still elements click_index_2 = 1 click_result_2 = await automation_service.click_element(ClickElementAction(index=click_index_2)) print(f"Click status for element {click_index_2}: {'✅ Success' if click_result_2.success else '❌ Failed'}") print(f"Message: {click_result_2.message}") print(f"URL after click: {click_result_2.url}") # Retrieve and display elements again after the second click print(f"\n--- Retrieving elements after clicking element {click_index_2} ---") if click_result_2.elements and click_result_2.elements.strip(): print("Elements after second click:") print(click_result_2.elements) else: print("No formatted elements found after second click.") if click_result_2.interactive_elements and len(click_result_2.interactive_elements) > 0: print("\nInteractive elements summary after second click:") for el in click_result_2.interactive_elements: print(f" [{el['index']}] <{el['tag_name']}> {el.get('text', '')[:30]}") else: print("No interactive elements found after second click.") else: print("Skipping second element click test - no elements found after first click.") else: print("Skipping element click test - fewer than 5 elements found.") await asyncio.sleep(2) print("\n✅ Chess Page Test Completed!") await asyncio.sleep(100) except Exception as e: print(f"\n❌ Chess Page Test failed: {str(e)}") traceback.print_exc() finally: # Ensure browser is closed print("\n--- Cleaning up ---") await automation_service.shutdown() print("Browser closed") if __name__ == '__main__': import uvicorn import sys # Check command line arguments for test mode test_mode_1 = "--test" in sys.argv test_mode_2 = "--test2" in sys.argv if test_mode_1: print("Running in test mode 1") asyncio.run(test_browser_api()) elif test_mode_2: print("Running in test mode 2 (Chess Page)") asyncio.run(test_browser_api_2()) else: print("Starting API server") uvicorn.run("browser_api:api_app", host="0.0.0.0", port=8002)