import os import gradio as gr from smolagents import CodeAgent, LiteLLMModel, tool from smolagents.agents import ActionStep import helium from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.common.keys import Keys from io import BytesIO from PIL import Image from datetime import datetime from dotenv import load_dotenv from huggingface_hub import login import tempfile import logging # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Load environment variables load_dotenv() hf_token = os.getenv("HF_TOKEN") gemini_api_key = os.getenv("GOOGLE_API_KEY") if not hf_token: raise ValueError("HF_TOKEN environment variable not set.") if not gemini_api_key: raise ValueError("GEMINI_API_KEY environment variable not set.") login(hf_token, add_to_git_credential=False) # Debug ChromeDriver path chromedriver_path = '/usr/bin/chromedriver' logger.info(f"Checking ChromeDriver at: {chromedriver_path}") logger.info(f"ChromeDriver exists: {os.path.exists(chromedriver_path)}") logger.info(f"ChromeDriver executable: {os.access(chromedriver_path, os.X_OK)}") logger.info(f"System PATH: {os.environ.get('PATH')}") # Define tools @tool def search_item_ctrl_f(text: str, nth_result: int = 1) -> str: """ Searches for text on the current page via Ctrl + F and jumps to the nth occurrence. Args: text: The text to search for nth_result: Which occurrence to jump to (default: 1) """ elements = driver.find_elements(By.XPATH, f"//*[contains(text(), '{text}')]") if nth_result > len(elements): raise Exception(f"Match n°{nth_result} not found (only {len(elements)} matches found)") result = f"Found {len(elements)} matches for '{text}'." elem = elements[nth_result - 1] driver.execute_script("arguments[0].scrollIntoView(true);", elem) result += f"Focused on element {nth_result} of {len(elements)}" return result @tool def go_back() -> None: """Goes back to previous page.""" driver.back() @tool def close_popups() -> str: """ Closes any visible modal or pop-up on the page. Use this to dismiss pop-up windows! This does not work on cookie consent banners. """ webdriver.ActionChains(driver).send_keys(Keys.ESCAPE).perform() # Initialize Chrome driver try: chrome_options = webdriver.ChromeOptions() chrome_options.add_argument("--force-device-scale-factor=1") chrome_options.add_argument("--window-size=1000,1350") chrome_options.add_argument("--disable-pdf-viewer") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") chrome_options.add_argument("--window-position=0,0") chrome_options.add_argument("--headless=new") driver = webdriver.Chrome(options=chrome_options) helium.set_driver(driver) logger.info("Chrome driver initialized successfully.") except Exception as e: logger.error(f"Failed to initialize Chrome driver: {str(e)}") raise # Screenshot callback def save_screenshot(memory_step: ActionStep, agent: CodeAgent) -> str: from time import sleep sleep(1.0) driver = helium.get_driver() current_step = memory_step.step_number if driver is not None: # Clear old screenshots from earlier steps for previous_memory_step in agent.memory.steps: if isinstance(previous_memory_step, ActionStep) and previous_memory_step.step_number < current_step: previous_memory_step.observations_images = None # Save new screenshot png_bytes = driver.get_screenshot_as_png() image = Image.open(BytesIO(png_bytes)) screenshot_dir = os.path.join(tempfile.gettempdir(), "web_agent_screenshots") os.makedirs(screenshot_dir, exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") screenshot_filename = f"screenshot_step_{current_step}_{timestamp}.png" screenshot_path = os.path.join(screenshot_dir, screenshot_filename) image.save(screenshot_path) logger.info(f"Saved screenshot to: {screenshot_path}") # Update observations url_info = f"Current url: {driver.current_url}\nScreenshot saved at: {screenshot_path}" memory_step.observations = ( url_info if memory_step.observations is None else memory_step.observations + "\n" + url_info ) return screenshot_path # Initialize model and agent model = LiteLLMModel("gemini/gemini-2.0-flash") agent = CodeAgent( tools=[go_back, close_popups, search_item_ctrl_f], model=model, additional_authorized_imports=["helium"], step_callbacks=[save_screenshot], max_steps=20, verbosity_level=2, ) agent.python_executor("from helium import *") # Helium instructions helium_instructions = """ You can use helium to access websites. Don't bother about the helium driver, it's already managed. We've already ran "from helium import *" Then you can go to pages! Code: go_to('github.com/trending') ``` You can directly click clickable elements by inputting the text that appears on them. Code: click("Top products") ``` If it's a link: Code: click(Link("Top products")) ``` If you try to interact with an element and it's not found, you'll get a LookupError. In general stop your action after each button click to see what happens on your screenshot. Never try to login in a page. To scroll up or down, use scroll_down or scroll_up with as an argument the number of pixels to scroll from. Code: scroll_down(num_pixels=1200) ``` When you have pop-ups with a cross icon to close, don't try to click the close icon by finding its element or targeting an 'X' element. Just use your built-in tool `close_popups` to close them: Code: close_popups() ``` You can use .exists() to check for the existence of an element. For example: Code: if Text('Accept cookies?').exists(): click('I accept') ``` """ # Chatbot interface function def run_agent_chat(user_input: str, history: list): try: # Extract URL and request from user input or history if "http" in user_input: url = user_input.split()[0] if user_input.startswith("http") else next((w for w in user_input.split() if w.startswith("http")), "") request = user_input.replace(url, "").strip() or "Navigate to the URL and describe the page." else: url = "[invalid url, do not cite]" request = user_input search_request = f"Please go to {url}. {request}" agent_output = agent.run(search_request + helium_instructions) # Collect the latest screenshot path from observations latest_screenshot = None for step in reversed(agent.memory.steps): if isinstance(step, ActionStep) and step.observations: # Extract screenshot path from observations for line in step.observations.split("\n"): if line.startswith("Screenshot saved at:"): latest_screenshot = line.replace("Screenshot saved at: ", "").strip() break if latest_screenshot: break # Format output for chatbot output = f"**Agent Output:**\n{agent_output}" if latest_screenshot: output += f"\n\n**Latest Screenshot:**" return output, latest_screenshot except Exception as e: logger.error(f"Agent execution failed: {str(e)}") return f"Error: {str(e)}", None # Custom Gradio interface def process_input(user_input, history): if not user_input.strip(): return history, None output, latest_screenshot = run_agent_chat(user_input, history) new_history = history + [[user_input, output]] return new_history, latest_screenshot if __name__ == "__main__": with gr.Blocks() as demo: gr.Markdown("# Web Navigation Agent") chatbot = gr.Chatbot(label="Chat") msg = gr.Textbox(placeholder="Enter URL and request (e.g., [invalid url, do not cite] Click on Developers)") btn = gr.Button("Send") image = gr.Image(label="Latest Screenshot") btn.click(process_input, inputs=[msg, chatbot], outputs=[chatbot, image]) msg.submit(process_input, inputs=[msg, chatbot], outputs=[chatbot, image]) try: demo.launch() except KeyboardInterrupt: driver.quit() logger.info("Chrome driver closed on exit.")