Spaces:
Running
Running
import os | |
import gradio as gr | |
from smolagents import CodeAgent, LiteLLMModel, tool | |
from smolagents.agents import ActionStep | |
import helium | |
from selenium import webdriver | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.common.keys import Keys | |
from io import BytesIO | |
from PIL import Image | |
from datetime import datetime | |
from dotenv import load_dotenv | |
from huggingface_hub import login | |
import tempfile | |
import logging | |
# Set up logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Load environment variables | |
load_dotenv() | |
hf_token = os.getenv("HF_TOKEN") | |
gemini_api_key = os.getenv("GOOGLE_API_KEY") | |
if not hf_token: | |
raise ValueError("HF_TOKEN environment variable not set.") | |
if not gemini_api_key: | |
raise ValueError("GEMINI_API_KEY environment variable not set.") | |
login(hf_token, add_to_git_credential=False) | |
# Debug ChromeDriver path | |
chromedriver_path = '/usr/bin/chromedriver' | |
logger.info(f"Checking ChromeDriver at: {chromedriver_path}") | |
logger.info(f"ChromeDriver exists: {os.path.exists(chromedriver_path)}") | |
logger.info(f"ChromeDriver executable: {os.access(chromedriver_path, os.X_OK)}") | |
logger.info(f"System PATH: {os.environ.get('PATH')}") | |
# Define tools | |
def search_item_ctrl_f(text: str, nth_result: int = 1) -> str: | |
""" | |
Searches for text on the current page via Ctrl + F and jumps to the nth occurrence. | |
Args: | |
text: The text to search for | |
nth_result: Which occurrence to jump to (default: 1) | |
""" | |
elements = driver.find_elements(By.XPATH, f"//*[contains(text(), '{text}')]") | |
if nth_result > len(elements): | |
raise Exception(f"Match n°{nth_result} not found (only {len(elements)} matches found)") | |
result = f"Found {len(elements)} matches for '{text}'." | |
elem = elements[nth_result - 1] | |
driver.execute_script("arguments[0].scrollIntoView(true);", elem) | |
result += f"Focused on element {nth_result} of {len(elements)}" | |
return result | |
def go_back() -> None: | |
"""Goes back to previous page.""" | |
driver.back() | |
def close_popups() -> str: | |
""" | |
Closes any visible modal or pop-up on the page. Use this to dismiss pop-up windows! | |
This does not work on cookie consent banners. | |
""" | |
webdriver.ActionChains(driver).send_keys(Keys.ESCAPE).perform() | |
# Initialize Chrome driver | |
try: | |
chrome_options = webdriver.ChromeOptions() | |
chrome_options.add_argument("--force-device-scale-factor=1") | |
chrome_options.add_argument("--window-size=1000,1350") | |
chrome_options.add_argument("--disable-pdf-viewer") | |
chrome_options.add_argument("--no-sandbox") | |
chrome_options.add_argument("--disable-dev-shm-usage") | |
chrome_options.add_argument("--window-position=0,0") | |
chrome_options.add_argument("--headless=new") | |
driver = webdriver.Chrome(options=chrome_options) | |
helium.set_driver(driver) | |
logger.info("Chrome driver initialized successfully.") | |
except Exception as e: | |
logger.error(f"Failed to initialize Chrome driver: {str(e)}") | |
raise | |
# Screenshot callback | |
def save_screenshot(memory_step: ActionStep, agent: CodeAgent) -> str: | |
from time import sleep | |
sleep(1.0) | |
driver = helium.get_driver() | |
current_step = memory_step.step_number | |
if driver is not None: | |
# Clear old screenshots from earlier steps | |
for previous_memory_step in agent.memory.steps: | |
if isinstance(previous_memory_step, ActionStep) and previous_memory_step.step_number < current_step: | |
previous_memory_step.observations_images = None | |
# Save new screenshot | |
png_bytes = driver.get_screenshot_as_png() | |
image = Image.open(BytesIO(png_bytes)) | |
screenshot_dir = os.path.join(tempfile.gettempdir(), "web_agent_screenshots") | |
os.makedirs(screenshot_dir, exist_ok=True) | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
screenshot_filename = f"screenshot_step_{current_step}_{timestamp}.png" | |
screenshot_path = os.path.join(screenshot_dir, screenshot_filename) | |
image.save(screenshot_path) | |
logger.info(f"Saved screenshot to: {screenshot_path}") | |
# Update observations | |
url_info = f"Current url: {driver.current_url}\nScreenshot saved at: {screenshot_path}" | |
memory_step.observations = ( | |
url_info if memory_step.observations is None else memory_step.observations + "\n" + url_info | |
) | |
return screenshot_path | |
# Initialize model and agent | |
model = LiteLLMModel("gemini/gemini-2.0-flash") | |
agent = CodeAgent( | |
tools=[go_back, close_popups, search_item_ctrl_f], | |
model=model, | |
additional_authorized_imports=["helium"], | |
step_callbacks=[save_screenshot], | |
max_steps=20, | |
verbosity_level=2, | |
) | |
agent.python_executor("from helium import *") | |
# Helium instructions | |
helium_instructions = """ | |
You can use helium to access websites. Don't bother about the helium driver, it's already managed. | |
We've already ran "from helium import *" | |
Then you can go to pages! | |
Code: | |
go_to('github.com/trending') | |
```<end_code> | |
You can directly click clickable elements by inputting the text that appears on them. | |
Code: | |
click("Top products") | |
```<end_code> | |
If it's a link: | |
Code: | |
click(Link("Top products")) | |
```<end_code> | |
If you try to interact with an element and it's not found, you'll get a LookupError. | |
In general stop your action after each button click to see what happens on your screenshot. | |
Never try to login in a page. | |
To scroll up or down, use scroll_down or scroll_up with as an argument the number of pixels to scroll from. | |
Code: | |
scroll_down(num_pixels=1200) | |
```<end_code> | |
When you have pop-ups with a cross icon to close, don't try to click the close icon by finding its element or targeting an 'X' element. | |
Just use your built-in tool `close_popups` to close them: | |
Code: | |
close_popups() | |
```<end_code> | |
You can use .exists() to check for the existence of an element. For example: | |
Code: | |
if Text('Accept cookies?').exists(): | |
click('I accept') | |
```<end_code> | |
""" | |
# Chatbot interface function | |
def run_agent_chat(user_input: str, history: list): | |
try: | |
# Extract URL and request from user input or history | |
if "http" in user_input: | |
url = user_input.split()[0] if user_input.startswith("http") else next((w for w in user_input.split() if w.startswith("http")), "") | |
request = user_input.replace(url, "").strip() or "Navigate to the URL and describe the page." | |
else: | |
url = "[invalid url, do not cite]" | |
request = user_input | |
search_request = f"Please go to {url}. {request}" | |
agent_output = agent.run(search_request + helium_instructions) | |
# Collect the latest screenshot path from observations | |
latest_screenshot = None | |
for step in reversed(agent.memory.steps): | |
if isinstance(step, ActionStep) and step.observations: | |
# Extract screenshot path from observations | |
for line in step.observations.split("\n"): | |
if line.startswith("Screenshot saved at:"): | |
latest_screenshot = line.replace("Screenshot saved at: ", "").strip() | |
break | |
if latest_screenshot: | |
break | |
# Format output for chatbot | |
output = f"**Agent Output:**\n{agent_output}" | |
if latest_screenshot: | |
output += f"\n\n**Latest Screenshot:**" | |
return output, latest_screenshot | |
except Exception as e: | |
logger.error(f"Agent execution failed: {str(e)}") | |
return f"Error: {str(e)}", None | |
# Custom Gradio interface | |
def process_input(user_input, history): | |
if not user_input.strip(): | |
return history, None | |
output, latest_screenshot = run_agent_chat(user_input, history) | |
new_history = history + [[user_input, output]] | |
return new_history, latest_screenshot | |
if __name__ == "__main__": | |
with gr.Blocks() as demo: | |
gr.Markdown("# Web Navigation Agent") | |
chatbot = gr.Chatbot(label="Chat") | |
msg = gr.Textbox(placeholder="Enter URL and request (e.g., [invalid url, do not cite] Click on Developers)") | |
btn = gr.Button("Send") | |
image = gr.Image(label="Latest Screenshot") | |
btn.click(process_input, inputs=[msg, chatbot], outputs=[chatbot, image]) | |
msg.submit(process_input, inputs=[msg, chatbot], outputs=[chatbot, image]) | |
try: | |
demo.launch() | |
except KeyboardInterrupt: | |
driver.quit() | |
logger.info("Chrome driver closed on exit.") |