import asyncio from contextlib import AsyncExitStack from typing import List, Literal, Optional, Any from pydantic import BaseModel, Field from proxy_lite.browser.browser import BrowserSession from proxy_lite.logger import logger from .tool_base import Tool, ToolExecutionResponse, attach_param_schema SELF_CONTAINED_TAGS = [ # many of these are non-interactive but keeping them anyway "area", "base", "br", "col", "embed", "hr", "img", "input", "link", "meta", "param", "source", "track", "wbr", ] def element_as_text( mark_id: int, tag: Optional[str] = None, text: Optional[str] = None, **raw_attributes, ) -> str: """Return a text representation of all elements on the page""" attributes = [] for k, v in raw_attributes.items(): if v is None: continue if isinstance(v, bool): if v: attributes.append(k) # we ignore False bool attributes else: v = str(v) if len(v) > 2500: v = v[: 2500 - 1] + "…" attributes.append(f'{k}="{v}"') attributes = " ".join(attributes) attributes = (" " + attributes).rstrip() tag = tag.lower() if tag else "" if text is None: text = "" if len(text) > 2500: text = text[: 2500 - 1] + "…" if tag in SELF_CONTAINED_TAGS: if text: logger.warning( f"Got self-contained element '{tag}' which contained text '{text}'.", ) else: return f"<{tag} id={mark_id}{attributes}/>" return f"<{tag} id={mark_id}{attributes}>{text}" class GotoParams(BaseModel): url: str = Field(..., description="The web address to visit. Must be a valid URL.") class GoogleSearchParams(BaseModel): query_plan: str = Field( ..., description="Plan out the query you will make. Re-write queries in a way that will yield the best results.", ) query: str = Field(..., description="The Google search to perform.") class ClickParams(BaseModel): mark_id: int = Field(..., description="Element Mark ID.") class TypeEntry(BaseModel): mark_id: int = Field(..., description="Element Mark ID.") content: str = Field(..., description="The text to type into the element.") class TypeParams(BaseModel): entries: List[TypeEntry] = Field( ..., description="A list of elements and contents to type.", ) submit: bool = Field( ..., description='Whether to press the "Enter" key after typing in the last entry.', ) class ScrollParams(BaseModel): direction: Literal["up", "down", "left", "right"] = Field( ..., description='Direction to scroll. Must be one of "up", "down", "left" or "right".', ) mark_id: int = Field( ..., description="What to scroll. Use -1 to scroll the whole page otherwise give the mark ID of an element that is `scrollable`.", # noqa: E501 ) class BackParams(BaseModel): pass class WaitParams(BaseModel): pass class ReloadParams(BaseModel): pass class DoNothingParams(BaseModel): pass # --- NEW: Parameters for open_new_tab_and_go_to tool --- class OpenNewTabAndGoToParams(BaseModel): url: str = Field(..., description="The URL to navigate to in the new tab.") # --- NEW: Parameters for select_option_by_text tool --- class SelectOptionByTextParams(BaseModel): mark_id: int = Field(..., description="The mark ID of the select element.") option_text: str = Field(..., description="The text content of the option to select.") class BrowserTool(Tool): def __init__(self, session: BrowserSession) -> None: super().__init__() self.browser = session async def __aenter__(self): self._exit_stack = AsyncExitStack() await self._exit_stack.enter_async_context(self.browser) return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self._exit_stack.aclose() @property def poi_text(self) -> str: # Get all points of interest on the page as text texts = [element_as_text(mark_id=i, **element) for i, element in enumerate(self.browser.poi_elements)] # Return formatted text of points of interest on page return "\n".join([txt for txt in texts if txt]) @attach_param_schema(GotoParams) async def goto(self, url: str) -> ToolExecutionResponse: """Go directly to a specific web url. Specify the exact URL.""" await self.browser.goto(url) return ToolExecutionResponse(content=f"Successfully navigated to URL: {url}") @attach_param_schema(GoogleSearchParams) async def google_search(self, query_plan: str, query: str) -> ToolExecutionResponse: """Perform a generic web search using Google. Results may not be relevant. If you see poor results, you can try another query. """ url = f"https://www.google.com/search?q={query}" await self.browser.goto(url) return ToolExecutionResponse(content=f"Performed Google search for: {query}") @attach_param_schema(ClickParams) async def click(self, mark_id: int) -> ToolExecutionResponse: """Click on an element of the page.""" try: await self.browser.click(mark_id=mark_id) return ToolExecutionResponse(content=f"Clicked element with mark ID: {mark_id}") except IndexError as e: # This happens if mark_id is out of bounds for browser.poi_centroids logger.error(f"Click failed: Mark ID {mark_id} not found or POI list empty. Error: {e}") return ToolExecutionResponse(content=f"Failed to click element with mark ID {mark_id}. Element not found or POI list invalid.") except Exception as e: logger.error(f"Click failed with unexpected error for mark ID {mark_id}: {e}") return ToolExecutionResponse(content=f"An unexpected error occurred while trying to click element {mark_id}: {e}") @attach_param_schema(TypeParams) async def type(self, entries: List[dict], submit: bool) -> ToolExecutionResponse: """Type text. You can type into one or more elements. Note that the text inside an element is cleared before typing. """ typed_ids = [] for i, entry_dict in enumerate(entries): try: entry = TypeEntry(**entry_dict) last_entry = i == len(entries) - 1 old_poi_positions = [tuple(point) for point in self.browser.poi_centroids] await self.browser.enter_text( mark_id=entry.mark_id, text=entry.content, submit=submit and last_entry, ) typed_ids.append(entry.mark_id) await self.browser.update_poi() new_poi_positions = [tuple(point) for point in self.browser.poi_centroids] if not last_entry and old_poi_positions != new_poi_positions: logger.error( "POI positions changed mid-typing, cancelling future type entries.", ) break except IndexError as e: logger.error(f"Type failed: Mark ID {entry.mark_id} not found or POI list empty. Error: {e}") return ToolExecutionResponse(content=f"Failed to type into element with mark ID {entry.mark_id}. Element not found or POI list invalid. Typed into: {typed_ids if typed_ids else 'none'}.") except Exception as e: logger.error(f"Type failed with unexpected error for mark ID {entry.mark_id}: {e}") return ToolExecutionResponse(content=f"An unexpected error occurred while trying to type into element {entry.mark_id}: {e}. Typed into: {typed_ids if typed_ids else 'none'}.") return ToolExecutionResponse( content=f"Typed text into elements with mark IDs: {typed_ids}", ) @attach_param_schema(ScrollParams) async def scroll(self, direction: Literal["up", "down", "left", "right"], mark_id: int) -> ToolExecutionResponse: """Scroll the page (or a scrollable element) up, down, left or right.""" try: if mark_id == -1: mark_id_for_browser = None # Pass None to browser.scroll for page scroll else: mark_id_for_browser = mark_id await self.browser.scroll(direction=direction, mark_id=mark_id_for_browser) return ToolExecutionResponse(content=f"Scrolled {direction} on element with mark ID: {mark_id if mark_id != -1 else 'page'}") except IndexError as e: logger.error(f"Scroll failed: Mark ID {mark_id} not found or POI list empty. Error: {e}") return ToolExecutionResponse(content=f"Failed to scroll element with mark ID {mark_id}. Element not found or POI list invalid.") except Exception as e: logger.error(f"Scroll failed with unexpected error for mark ID {mark_id}: {e}") return ToolExecutionResponse(content=f"An unexpected error occurred while trying to scroll element {mark_id}: {e}") @attach_param_schema(BackParams) async def back(self) -> ToolExecutionResponse: """Go back to the previous page.""" try: await self.browser.go_back() return ToolExecutionResponse(content="Went back to the previous page.") except Exception as e: logger.error(f"Go back failed: {e}") return ToolExecutionResponse(content=f"Failed to go back: {e}") @attach_param_schema(WaitParams) async def wait(self) -> ToolExecutionResponse: """Wait three seconds. Useful when the page appears to still be loading, or if there are any unfinished webpage processes.""" # noqa: E501 await asyncio.sleep(3) return ToolExecutionResponse(content="Waited for a few seconds.") @attach_param_schema(ReloadParams) async def reload(self) -> ToolExecutionResponse: """Reload the current page. Useful when the page seems unresponsive, broken, outdated, or if you want to reset the page to its initial state.""" # noqa: E501 try: await self.browser.reload() return ToolExecutionResponse(content="Reloaded the current page.") except Exception as e: logger.error(f"Reload failed: {e}") return ToolExecutionResponse(content=f"Failed to reload the page: {e}") @attach_param_schema(DoNothingParams) async def do_nothing_tool(self) -> ToolExecutionResponse: """Do nothing. Use this if you have no need for the browser at this time.""" return ToolExecutionResponse(content="Did nothing in the browser.") # --- NEW: Expose the open_new_tab_and_go_to method as a tool --- @attach_param_schema(OpenNewTabAndGoToParams) async def open_new_tab_and_go_to(self, url: str) -> ToolExecutionResponse: """ Opens a new browser tab/page and navigates to the specified URL. Closes the old page if it's not the last one remaining. Use this to bypass loading issues by forcing a new navigation. """ try: await self.browser.open_new_tab_and_go_to(url) return ToolExecutionResponse( content=f"Successfully opened new tab and navigated to: {url}", ) except Exception as e: logger.error(f"Error opening new tab and navigating to {url}: {e}") return ToolExecutionResponse(content=f"Failed to open new tab and navigate to {url}: {e}") # --- NEW: Select option by text from select element --- @attach_param_schema(SelectOptionByTextParams) async def select_option_by_text(self, mark_id: int, option_text: str) -> ToolExecutionResponse: """ Selects an option from a select element (including dual select picklists) by finding the option with matching text. This is especially useful for Salesforce dual select picklists where you need to find and select a specific option. Uses Playwright's native iframe handling to bypass CORS restrictions. """ try: logger.info(f"Attempting to select option '{option_text}' from element {mark_id}") # First, try to click the select element to ensure it's focused await self.browser.click(mark_id=mark_id) await asyncio.sleep(0.5) # Wait for click to register # Use Playwright's native frame handling instead of JavaScript evaluation # This bypasses CORS restrictions that prevent JavaScript access # Find all frames on the page if not self.browser.current_page: return ToolExecutionResponse(content=f"No active page found. Cannot select option '{option_text}'.") main_frame = self.browser.current_page.main_frame all_frames = [main_frame] + main_frame.child_frames logger.info(f"Searching for element {mark_id} across {len(all_frames)} frames") for frame_idx, frame in enumerate(all_frames): try: # Look for select elements in this frame select_elements = await frame.query_selector_all('select') logger.info(f"Frame {frame_idx}: Found {len(select_elements)} select elements") for select_elem in select_elements: # Get all options for this select options = await select_elem.query_selector_all('option') # Check if any option contains our target text for opt_idx, option in enumerate(options): option_text_content = await option.text_content() option_value = await option.get_attribute('value') logger.info(f"Frame {frame_idx}, Select {select_elem}, Option {opt_idx}: text='{option_text_content}', value='{option_value}'") if option_text_content and option_text.lower().strip() == option_text_content.lower().strip(): # Found the option! Click it directly instead of using select_option try: # Direct click with force=True to bypass visibility checks and short timeout await option.click(force=True, timeout=5000) logger.info(f"Successfully clicked option '{option_text_content.strip()}' in frame {frame_idx}") return ToolExecutionResponse( content=f"[ACTION COMPLETED] Successfully selected '{option_text_content.strip()}' from dual select picklist" ) except Exception as select_error: logger.info(f"Click timed out in frame {frame_idx}, but option may have been selected: {select_error}") # Continue to next frame/option instead of failing completely continue except Exception as frame_error: logger.info(f"Could not access frame {frame_idx}: {frame_error}") continue # If we get here, the option wasn't found in any frame # Try to get available options for debugging all_options = [] for frame in all_frames: try: select_elements = await frame.query_selector_all('select') for select_elem in select_elements: options = await select_elem.query_selector_all('option') for option in options[:5]: # Limit to first 5 options per select text = await option.text_content() if text: all_options.append(text.strip()) except: continue available_options_str = ', '.join(all_options[:10]) if all_options else 'None found' return ToolExecutionResponse( content=f"Failed to find option '{option_text}' in any select element. Available options (first 10): {available_options_str}" ) except Exception as e: logger.error(f"Error selecting option '{option_text}' from element {mark_id}: {e}") return ToolExecutionResponse(content=f"An unexpected error occurred while selecting option '{option_text}': {e}")