Trisha Tomy
fixes+permset
60c7a7f
import asyncio
from contextlib import AsyncExitStack
from typing import List, Literal, Optional, Any
from pydantic import BaseModel, Field
from proxy_lite.browser.browser import BrowserSession
from proxy_lite.logger import logger
from .tool_base import Tool, ToolExecutionResponse, attach_param_schema
SELF_CONTAINED_TAGS = [
# many of these are non-interactive but keeping them anyway
"area",
"base",
"br",
"col",
"embed",
"hr",
"img",
"input",
"link",
"meta",
"param",
"source",
"track",
"wbr",
]
def element_as_text(
mark_id: int,
tag: Optional[str] = None,
text: Optional[str] = None,
**raw_attributes,
) -> str:
"""Return a text representation of all elements on the page"""
attributes = []
for k, v in raw_attributes.items():
if v is None:
continue
if isinstance(v, bool):
if v:
attributes.append(k)
# we ignore False bool attributes
else:
v = str(v)
if len(v) > 2500:
v = v[: 2500 - 1] + "…"
attributes.append(f'{k}="{v}"')
attributes = " ".join(attributes)
attributes = (" " + attributes).rstrip()
tag = tag.lower() if tag else ""
if text is None:
text = ""
if len(text) > 2500:
text = text[: 2500 - 1] + "…"
if tag in SELF_CONTAINED_TAGS:
if text:
logger.warning(
f"Got self-contained element '{tag}' which contained text '{text}'.", )
else:
return f"<{tag} id={mark_id}{attributes}/>"
return f"<{tag} id={mark_id}{attributes}>{text}</{tag}>"
class GotoParams(BaseModel):
url: str = Field(..., description="The web address to visit. Must be a valid URL.")
class GoogleSearchParams(BaseModel):
query_plan: str = Field(
...,
description="Plan out the query you will make. Re-write queries in a way that will yield the best results.",
)
query: str = Field(..., description="The Google search to perform.")
class ClickParams(BaseModel):
mark_id: int = Field(..., description="Element Mark ID.")
class TypeEntry(BaseModel):
mark_id: int = Field(..., description="Element Mark ID.")
content: str = Field(..., description="The text to type into the element.")
class TypeParams(BaseModel):
entries: List[TypeEntry] = Field(
...,
description="A list of elements and contents to type.",
)
submit: bool = Field(
...,
description='Whether to press the "Enter" key after typing in the last entry.',
)
class ScrollParams(BaseModel):
direction: Literal["up", "down", "left", "right"] = Field(
...,
description='Direction to scroll. Must be one of "up", "down", "left" or "right".',
)
mark_id: int = Field(
...,
description="What to scroll. Use -1 to scroll the whole page otherwise give the mark ID of an element that is `scrollable`.", # noqa: E501
)
class BackParams(BaseModel):
pass
class WaitParams(BaseModel):
pass
class ReloadParams(BaseModel):
pass
class DoNothingParams(BaseModel):
pass
# --- NEW: Parameters for open_new_tab_and_go_to tool ---
class OpenNewTabAndGoToParams(BaseModel):
url: str = Field(..., description="The URL to navigate to in the new tab.")
# --- NEW: Parameters for select_option_by_text tool ---
class SelectOptionByTextParams(BaseModel):
mark_id: int = Field(..., description="The mark ID of the select element.")
option_text: str = Field(..., description="The text content of the option to select.")
class BrowserTool(Tool):
def __init__(self, session: BrowserSession) -> None:
super().__init__()
self.browser = session
async def __aenter__(self):
self._exit_stack = AsyncExitStack()
await self._exit_stack.enter_async_context(self.browser)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self._exit_stack.aclose()
@property
def poi_text(self) -> str:
# Get all points of interest on the page as text
texts = [element_as_text(mark_id=i, **element) for i, element in enumerate(self.browser.poi_elements)]
# Return formatted text of points of interest on page
return "\n".join([txt for txt in texts if txt])
@attach_param_schema(GotoParams)
async def goto(self, url: str) -> ToolExecutionResponse:
"""Go directly to a specific web url. Specify the exact URL."""
await self.browser.goto(url)
return ToolExecutionResponse(content=f"Successfully navigated to URL: {url}")
@attach_param_schema(GoogleSearchParams)
async def google_search(self, query_plan: str, query: str) -> ToolExecutionResponse:
"""Perform a generic web search using Google.
Results may not be relevant. If you see poor results, you can try another query.
"""
url = f"https://www.google.com/search?q={query}"
await self.browser.goto(url)
return ToolExecutionResponse(content=f"Performed Google search for: {query}")
@attach_param_schema(ClickParams)
async def click(self, mark_id: int) -> ToolExecutionResponse:
"""Click on an element of the page."""
try:
await self.browser.click(mark_id=mark_id)
return ToolExecutionResponse(content=f"Clicked element with mark ID: {mark_id}")
except IndexError as e:
# This happens if mark_id is out of bounds for browser.poi_centroids
logger.error(f"Click failed: Mark ID {mark_id} not found or POI list empty. Error: {e}")
return ToolExecutionResponse(content=f"Failed to click element with mark ID {mark_id}. Element not found or POI list invalid.")
except Exception as e:
logger.error(f"Click failed with unexpected error for mark ID {mark_id}: {e}")
return ToolExecutionResponse(content=f"An unexpected error occurred while trying to click element {mark_id}: {e}")
@attach_param_schema(TypeParams)
async def type(self, entries: List[dict], submit: bool) -> ToolExecutionResponse:
"""Type text.
You can type into one or more elements.
Note that the text inside an element is cleared before typing.
"""
typed_ids = []
for i, entry_dict in enumerate(entries):
try:
entry = TypeEntry(**entry_dict)
last_entry = i == len(entries) - 1
old_poi_positions = [tuple(point) for point in self.browser.poi_centroids]
await self.browser.enter_text(
mark_id=entry.mark_id,
text=entry.content,
submit=submit and last_entry,
)
typed_ids.append(entry.mark_id)
await self.browser.update_poi()
new_poi_positions = [tuple(point) for point in self.browser.poi_centroids]
if not last_entry and old_poi_positions != new_poi_positions:
logger.error(
"POI positions changed mid-typing, cancelling future type entries.",
)
break
except IndexError as e:
logger.error(f"Type failed: Mark ID {entry.mark_id} not found or POI list empty. Error: {e}")
return ToolExecutionResponse(content=f"Failed to type into element with mark ID {entry.mark_id}. Element not found or POI list invalid. Typed into: {typed_ids if typed_ids else 'none'}.")
except Exception as e:
logger.error(f"Type failed with unexpected error for mark ID {entry.mark_id}: {e}")
return ToolExecutionResponse(content=f"An unexpected error occurred while trying to type into element {entry.mark_id}: {e}. Typed into: {typed_ids if typed_ids else 'none'}.")
return ToolExecutionResponse(
content=f"Typed text into elements with mark IDs: {typed_ids}",
)
@attach_param_schema(ScrollParams)
async def scroll(self, direction: Literal["up", "down", "left", "right"], mark_id: int) -> ToolExecutionResponse:
"""Scroll the page (or a scrollable element) up, down, left or right."""
try:
if mark_id == -1:
mark_id_for_browser = None # Pass None to browser.scroll for page scroll
else:
mark_id_for_browser = mark_id
await self.browser.scroll(direction=direction, mark_id=mark_id_for_browser)
return ToolExecutionResponse(content=f"Scrolled {direction} on element with mark ID: {mark_id if mark_id != -1 else 'page'}")
except IndexError as e:
logger.error(f"Scroll failed: Mark ID {mark_id} not found or POI list empty. Error: {e}")
return ToolExecutionResponse(content=f"Failed to scroll element with mark ID {mark_id}. Element not found or POI list invalid.")
except Exception as e:
logger.error(f"Scroll failed with unexpected error for mark ID {mark_id}: {e}")
return ToolExecutionResponse(content=f"An unexpected error occurred while trying to scroll element {mark_id}: {e}")
@attach_param_schema(BackParams)
async def back(self) -> ToolExecutionResponse:
"""Go back to the previous page."""
try:
await self.browser.go_back()
return ToolExecutionResponse(content="Went back to the previous page.")
except Exception as e:
logger.error(f"Go back failed: {e}")
return ToolExecutionResponse(content=f"Failed to go back: {e}")
@attach_param_schema(WaitParams)
async def wait(self) -> ToolExecutionResponse:
"""Wait three seconds. Useful when the page appears to still be loading, or if there are any unfinished webpage processes.""" # noqa: E501
await asyncio.sleep(3)
return ToolExecutionResponse(content="Waited for a few seconds.")
@attach_param_schema(ReloadParams)
async def reload(self) -> ToolExecutionResponse:
"""Reload the current page. Useful when the page seems unresponsive, broken, outdated, or if you want to reset the page to its initial state.""" # noqa: E501
try:
await self.browser.reload()
return ToolExecutionResponse(content="Reloaded the current page.")
except Exception as e:
logger.error(f"Reload failed: {e}")
return ToolExecutionResponse(content=f"Failed to reload the page: {e}")
@attach_param_schema(DoNothingParams)
async def do_nothing_tool(self) -> ToolExecutionResponse:
"""Do nothing. Use this if you have no need for the browser at this time."""
return ToolExecutionResponse(content="Did nothing in the browser.")
# --- NEW: Expose the open_new_tab_and_go_to method as a tool ---
@attach_param_schema(OpenNewTabAndGoToParams)
async def open_new_tab_and_go_to(self, url: str) -> ToolExecutionResponse:
"""
Opens a new browser tab/page and navigates to the specified URL.
Closes the old page if it's not the last one remaining.
Use this to bypass loading issues by forcing a new navigation.
"""
try:
await self.browser.open_new_tab_and_go_to(url)
return ToolExecutionResponse(
content=f"Successfully opened new tab and navigated to: {url}",
)
except Exception as e:
logger.error(f"Error opening new tab and navigating to {url}: {e}")
return ToolExecutionResponse(content=f"Failed to open new tab and navigate to {url}: {e}")
# --- NEW: Select option by text from select element ---
@attach_param_schema(SelectOptionByTextParams)
async def select_option_by_text(self, mark_id: int, option_text: str) -> ToolExecutionResponse:
"""
Selects an option from a select element (including dual select picklists) by finding the option with matching text.
This is especially useful for Salesforce dual select picklists where you need to find and select a specific option.
Uses Playwright's native iframe handling to bypass CORS restrictions.
"""
try:
logger.info(f"Attempting to select option '{option_text}' from element {mark_id}")
# First, try to click the select element to ensure it's focused
await self.browser.click(mark_id=mark_id)
await asyncio.sleep(0.5) # Wait for click to register
# Use Playwright's native frame handling instead of JavaScript evaluation
# This bypasses CORS restrictions that prevent JavaScript access
# Find all frames on the page
if not self.browser.current_page:
return ToolExecutionResponse(content=f"No active page found. Cannot select option '{option_text}'.")
main_frame = self.browser.current_page.main_frame
all_frames = [main_frame] + main_frame.child_frames
logger.info(f"Searching for element {mark_id} across {len(all_frames)} frames")
for frame_idx, frame in enumerate(all_frames):
try:
# Look for select elements in this frame
select_elements = await frame.query_selector_all('select')
logger.info(f"Frame {frame_idx}: Found {len(select_elements)} select elements")
for select_elem in select_elements:
# Get all options for this select
options = await select_elem.query_selector_all('option')
# Check if any option contains our target text
for opt_idx, option in enumerate(options):
option_text_content = await option.text_content()
option_value = await option.get_attribute('value')
logger.info(f"Frame {frame_idx}, Select {select_elem}, Option {opt_idx}: text='{option_text_content}', value='{option_value}'")
if option_text_content and option_text.lower().strip() == option_text_content.lower().strip():
# Found the option! Click it directly instead of using select_option
try:
# Direct click with force=True to bypass visibility checks and short timeout
await option.click(force=True, timeout=5000)
logger.info(f"Successfully clicked option '{option_text_content.strip()}' in frame {frame_idx}")
return ToolExecutionResponse(
content=f"[ACTION COMPLETED] Successfully selected '{option_text_content.strip()}' from dual select picklist"
)
except Exception as select_error:
logger.info(f"Click timed out in frame {frame_idx}, but option may have been selected: {select_error}")
# Continue to next frame/option instead of failing completely
continue
except Exception as frame_error:
logger.info(f"Could not access frame {frame_idx}: {frame_error}")
continue
# If we get here, the option wasn't found in any frame
# Try to get available options for debugging
all_options = []
for frame in all_frames:
try:
select_elements = await frame.query_selector_all('select')
for select_elem in select_elements:
options = await select_elem.query_selector_all('option')
for option in options[:5]: # Limit to first 5 options per select
text = await option.text_content()
if text:
all_options.append(text.strip())
except:
continue
available_options_str = ', '.join(all_options[:10]) if all_options else 'None found'
return ToolExecutionResponse(
content=f"Failed to find option '{option_text}' in any select element. Available options (first 10): {available_options_str}"
)
except Exception as e:
logger.error(f"Error selecting option '{option_text}' from element {mark_id}: {e}")
return ToolExecutionResponse(content=f"An unexpected error occurred while selecting option '{option_text}': {e}")