Spaces:
Running
Running
# app.py | |
import gradio as gr | |
from playwright.sync_api import sync_playwright, Error as PlaywrightError | |
from bs4 import BeautifulSoup | |
import urllib.parse | |
import datetime | |
import atexit | |
import re | |
import os | |
from itertools import cycle | |
# --- NEW: Credential Revolver Class --- | |
class CredentialRevolver: | |
"""Manages a rotating list of proxies.""" | |
def __init__(self, proxy_string: str): | |
self.proxies = self._parse_proxies(proxy_string) | |
if self.proxies: | |
self.proxy_cycler = cycle(self.proxies) | |
print(f"✅ CredentialRevolver initialized with {len(self.proxies)} proxies.") | |
else: | |
self.proxy_cycler = None | |
print("⚠️ CredentialRevolver initialized with no proxies. Using direct connection.") | |
def _parse_proxies(self, proxy_string: str): | |
"""Parses a multi-line string of proxies into a list of dicts.""" | |
proxies = [] | |
for line in proxy_string.strip().splitlines(): | |
line = line.strip() | |
if not line: | |
continue | |
try: | |
# Format: http://user:pass@host:port | |
parsed = urllib.parse.urlparse(f"//{line}") # Add // to help parsing | |
server = f"{parsed.scheme or 'http'}://{parsed.hostname}:{parsed.port}" | |
proxy_dict = { | |
"server": server, | |
"username": parsed.username, | |
"password": parsed.password, | |
} | |
proxies.append(proxy_dict) | |
except Exception as e: | |
print(f"Could not parse proxy line: '{line}'. Error: {e}") | |
return proxies | |
def get_next(self): | |
"""Returns the next proxy configuration in a round-robin fashion.""" | |
if self.proxy_cycler: | |
return next(self.proxy_cycler) | |
return None # No proxy | |
def count(self): | |
return len(self.proxies) | |
# --- GLOBAL PLAYWRIGHT AND REVOLVER SETUP --- | |
try: | |
p = sync_playwright().start() | |
browser = p.firefox.launch(headless=True, timeout=60000) | |
print("✅ Playwright browser launched successfully.") | |
except Exception as e: | |
print(f"❌ Could not launch Playwright browser: {e}"); exit() | |
# Load proxies from Hugging Face Secrets (environment variable) | |
proxy_list_str = os.getenv("PROXY_LIST", "") | |
revolver = CredentialRevolver(proxy_list_str) | |
def cleanup(): | |
print("🧹 Cleaning up: Closing Playwright browser..."); browser.close(); p.stop() | |
atexit.register(cleanup) | |
# --- Core Browser Logic (Upgraded with Proxy Contexts) --- | |
class Tab: | |
"""Represents a single browser tab, now tied to a BrowserContext.""" | |
def __init__(self, context, page, proxy_used): | |
self.context = context # The isolated browser context (has the proxy) | |
self.page = page # The Playwright page object within the context | |
self.proxy_used = proxy_used # Info for logging | |
self.title = "New Tab" | |
self.url = "about:blank" | |
self.parsed_text = "Welcome! Navigate to a URL or search to get started." | |
self.links = [] | |
def close(self): | |
"""Closes the underlying BrowserContext, which also closes the page.""" | |
if not self.context.is_closed(): | |
self.context.close() | |
class RealBrowser: | |
"""Manages multiple tabs, each potentially with its own proxy.""" | |
def __init__(self): | |
self.tabs = [] | |
self.active_tab_index = -1 | |
self.new_tab() # Start with one tab | |
def _get_active_tab(self): | |
if self.active_tab_index == -1 or self.active_tab_index >= len(self.tabs): return None | |
return self.tabs[self.active_tab_index] | |
def _fetch_and_parse(self, tab, url): | |
# (This function remains largely the same as the previous version) | |
log = f"▶️ Navigating to {url}..." | |
try: | |
tab.page.goto(url, wait_until='domcontentloaded', timeout=30000) | |
tab.url = tab.page.url | |
tab.title = tab.page.title() or "No Title" | |
log += f"\n✅ Arrived at: {tab.url}" | |
log += f"\n📄 Title: {tab.title}" | |
html_content = tab.page.content() | |
soup = BeautifulSoup(html_content, 'lxml') | |
for script in soup(["script", "style", "nav", "footer"]): script.extract() | |
tab.parsed_text = soup.get_text(separator='\n', strip=True) | |
tab.links = [] | |
for link in soup.find_all('a', href=True): | |
href = link['href'] | |
absolute_url = urllib.parse.urljoin(tab.url, href) | |
if absolute_url.startswith('http') and not re.match(r'javascript:|mailto:', absolute_url): | |
link_text = link.get_text(strip=True) or "[No Link Text]" | |
tab.links.append({'text': link_text, 'url': absolute_url}) | |
log += f"\n🔗 Found {len(tab.links)} links." | |
except PlaywrightError as e: | |
error_message = str(e); tab.title = "Error"; tab.url = url | |
tab.parsed_text = f"❌ Failed to load page.\n\nError: {error_message}" | |
tab.links = []; log += f"\n❌ {error_message}" | |
return log | |
def go(self, term_or_url): | |
tab = self._get_active_tab() | |
if not tab: return "No active tab." | |
parsed_url = urllib.parse.urlparse(term_or_url) | |
url = term_or_url if (parsed_url.scheme and parsed_url.netloc) else f"https://duckduckgo.com/html/?q={urllib.parse.quote_plus(term_or_url)}" | |
return self._fetch_and_parse(tab, url) | |
def new_tab(self): | |
"""CRITICAL CHANGE: Creates a new tab with the next available proxy.""" | |
proxy_config = revolver.get_next() | |
log = "" | |
try: | |
# Create a new context with the proxy settings | |
context = browser.new_context(proxy=proxy_config) | |
page = context.new_page() | |
proxy_info = proxy_config['server'] if proxy_config else "Direct Connection" | |
log += f"✨ New tab opened.\n🔒 Using proxy: {proxy_info}" | |
tab = Tab(context, page, proxy_info) | |
self.tabs.append(tab) | |
self.active_tab_index = len(self.tabs) - 1 | |
# Navigate to a default page | |
log += "\n" + self.go("https://www.whatsmyip.org/") | |
except Exception as e: | |
log += f"\n❌ Failed to create new tab/context: {e}" | |
if 'context' in locals() and not context.is_closed(): | |
context.close() | |
return log | |
def close_tab(self): | |
if len(self.tabs) <= 1: return "Cannot close the last tab." | |
tab_to_close = self.tabs.pop(self.active_tab_index) | |
tab_to_close.close() # This now closes the context and the page | |
if self.active_tab_index >= len(self.tabs): | |
self.active_tab_index = len(self.tabs) - 1 | |
return f"💣 Tab closed. Switched to Tab {self.active_tab_index}." | |
# Other methods (back, forward, refresh, switch_tab) remain the same | |
# as they operate on the tab's page object, which is now correctly context-aware. | |
def back(self): | |
tab = self._get_active_tab() | |
if tab and tab.page.can_go_back(): | |
tab.page.go_back(wait_until='domcontentloaded'); return self._fetch_and_parse(tab, tab.page.url) | |
return "Cannot go back." | |
def forward(self): | |
tab = self._get_active_tab() | |
if tab and tab.page.can_go_forward(): | |
tab.page.go_forward(wait_until='domcontentloaded'); return self._fetch_and_parse(tab, tab.page.url) | |
return "Cannot go forward." | |
def refresh(self): | |
tab = self._get_active_tab() | |
if tab: | |
tab.page.reload(wait_until='domcontentloaded'); return self._fetch_and_parse(tab, tab.page.url) | |
return "No active tab." | |
def switch_tab(self, tab_label): | |
try: | |
index = int(tab_label.split(":")[0].replace("Tab", "").strip()) | |
if 0 <= index < len(self.tabs): self.active_tab_index = index; return f"Switched to Tab {index}." | |
return "Invalid tab index." | |
except: return "Invalid tab format." | |
# --- Gradio UI and Event Handlers (mostly unchanged, but with proxy info) --- | |
def update_ui_components(browser_state: RealBrowser): | |
active_tab = browser_state._get_active_tab() | |
if not active_tab: | |
# Handle case where all tabs are closed | |
return { | |
page_content: gr.Markdown("No active tabs. Please create a new one."), | |
url_textbox: "", | |
links_display: "", | |
tab_selector: gr.Radio(choices=[], label="Active Tabs"), | |
} | |
# Add proxy info to the tab selector for clarity | |
tab_choices = [f"Tab {i}: {tab.title[:30]}... (via {tab.proxy_used.split('//')[1].split('@')[-1] if tab.proxy_used != 'Direct Connection' else 'Direct'})" for i, tab in enumerate(browser_state.tabs)] | |
active_tab_label = tab_choices[browser_state.active_tab_index] | |
links_md = "### 🔗 Links on Page\n" | |
if active_tab.links: | |
for i, link in enumerate(active_tab.links[:25]): links_md += f"{i}. [{link['text'][:80]}]({link['url']})\n" | |
else: links_md += "_No links found._" | |
return { | |
page_content: gr.Markdown(f"# {active_tab.title}\n**URL:** {active_tab.url}\n\n---\n\n{active_tab.parsed_text[:2000]}..."), | |
url_textbox: gr.Textbox(value=active_tab.url), | |
links_display: gr.Markdown(links_md), | |
tab_selector: gr.Radio(choices=tab_choices, value=active_tab_label, label="Active Tabs"), | |
} | |
# The handle_action function remains the same as it's a generic dispatcher | |
def handle_action(browser_state, action, value=None): | |
# ... (same as previous version) | |
if action == "go": log = browser_state.go(value) | |
elif action == "click": | |
tab = browser_state._get_active_tab() | |
try: | |
link_index = int(value) | |
if tab and 0 <= link_index < len(tab.links): | |
log = browser_state.go(tab.links[link_index]['url']) | |
else: log = "Invalid link number." | |
except: log = "Please enter a valid number to click." | |
elif action == "back": log = browser_state.back() | |
elif action == "forward": log = browser_state.forward() | |
elif action == "refresh": log = browser_state.refresh() | |
elif action == "new_tab": log = browser_state.new_tab() | |
elif action == "close_tab": log = browser_state.close_tab() | |
elif action == "switch_tab": log = browser_state.switch_tab(value) | |
else: log = "Unknown action." | |
return {**update_ui_components(browser_state), log_display: gr.Textbox(log)} | |
# The Gradio Blocks layout remains the same | |
with gr.Blocks(theme=gr.themes.Soft(), title="Real Browser Demo") as demo: | |
# ... (same as previous version) | |
browser_state = gr.State(RealBrowser()) | |
gr.Markdown("# 🛰️ Real Browser Demo (with Proxy Revolver)") | |
gr.Markdown(f"Type a URL or search term. This demo runs a real headless browser with **{revolver.count()} proxies loaded**.") | |
with gr.Row(): | |
with gr.Column(scale=3): | |
with gr.Row(): back_btn = gr.Button("◀ Back"); forward_btn = gr.Button("▶ Forward"); refresh_btn = gr.Button("🔄 Refresh") | |
url_textbox = gr.Textbox(label="URL or Search Term", interactive=True) | |
go_btn = gr.Button("Go", variant="primary") | |
with gr.Accordion("Page Content (Text Only)", open=True): page_content = gr.Markdown("Loading...") | |
log_display = gr.Textbox(label="Status Log", interactive=False) | |
with gr.Column(scale=1): | |
with gr.Row(): new_tab_btn = gr.Button("➕ New Tab"); close_tab_btn = gr.Button("❌ Close Tab") | |
tab_selector = gr.Radio(choices=[], label="Active Tabs", interactive=True) | |
with gr.Accordion("Clickable Links", open=True): | |
links_display = gr.Markdown("...") | |
with gr.Row(): click_num_box = gr.Number(label="Link #", scale=1, minimum=0, step=1); click_btn = gr.Button("Click Link", scale=2) | |
all_outputs = [page_content, url_textbox, links_display, tab_selector, log_display] | |
demo.load(lambda s: {**update_ui_components(s), log_display: f"🚀 Browser Initialized! {revolver.count()} proxies loaded. A new tab has been opened to check the IP."}, inputs=[browser_state], outputs=all_outputs) | |
go_btn.click(lambda s, v: handle_action(s, "go", v), [browser_state, url_textbox], all_outputs, show_progress="full") | |
url_textbox.submit(lambda s, v: handle_action(s, "go", v), [browser_state, url_textbox], all_outputs, show_progress="full") | |
click_btn.click(lambda s, v: handle_action(s, "click", v), [browser_state, click_num_box], all_outputs, show_progress="full") | |
back_btn.click(lambda s: handle_action(s, "back"), [browser_state], all_outputs, show_progress="full") | |
forward_btn.click(lambda s: handle_action(s, "forward"), [browser_state], all_outputs, show_progress="full") | |
refresh_btn.click(lambda s: handle_action(s, "refresh"), [browser_state], all_outputs, show_progress="full") | |
new_tab_btn.click(lambda s: handle_action(s, "new_tab"), [browser_state], all_outputs, show_progress="full") | |
close_tab_btn.click(lambda s: handle_action(s, "close_tab"), [browser_state], all_outputs) | |
tab_selector.input(lambda s, v: handle_action(s, "switch_tab", v), [browser_state, tab_selector], all_outputs) | |
demo.launch() |