Spaces:
Running
Running
File size: 11,360 Bytes
4c3fe29 3819331 31d5b37 3819331 31d5b37 15fdb32 7f2bf6a 15fdb32 31d5b37 15fdb32 31d5b37 7f2bf6a 15fdb32 7f2bf6a 31d5b37 7f2bf6a 31d5b37 7f2bf6a 31d5b37 7f2bf6a 31d5b37 15fdb32 7f2bf6a 31d5b37 7f2bf6a 3819331 7f2bf6a 3819331 31d5b37 7f2bf6a 31d5b37 7f2bf6a 3819331 7f2bf6a 3819331 7f2bf6a 15fdb32 7f2bf6a 15fdb32 7f2bf6a 15fdb32 7f2bf6a 15fdb32 7f2bf6a 15fdb32 7f2bf6a 31d5b37 7f2bf6a 3819331 7f2bf6a 31d5b37 7f2bf6a 31d5b37 15fdb32 7f2bf6a 31d5b37 15fdb32 31d5b37 3819331 31d5b37 7f2bf6a 3819331 7f2bf6a 31d5b37 7f2bf6a 3819331 15fdb32 31d5b37 15fdb32 31d5b37 3819331 15fdb32 3819331 31d5b37 15fdb32 7f2bf6a 31d5b37 7f2bf6a 3819331 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 |
import os
os.system("playwright install")
import gradio as gr
from playwright.sync_api import sync_playwright, Error as PlaywrightError
from bs4 import BeautifulSoup
import urllib.parse
import datetime
import atexit
import re
from itertools import cycle
import uuid # For generating unique tab IDs
try:
p = sync_playwright().start()
browser = p.firefox.launch(headless=True, timeout=60000)
print("✅ Playwright browser launched successfully.")
except Exception as e:
print(f"❌ Could not launch Playwright browser. Original error: {e}")
exit()
# This dictionary is the key to the solution. It maps a tab's unique ID
# to its live, non-copyable Playwright Page and Context object.
LIVE_CONTEXTS = {} # { tab_id: { "context": PlaywrightContext, "page": PlaywrightPage } }
def cleanup():
"""Ensures all browser resources are closed when the app shuts down."""
print(f"🧹 Cleaning up: Closing {len(LIVE_CONTEXTS)} browser contexts...")
for tab_id, resources in LIVE_CONTEXTS.items():
if not resources["context"].is_closed():
resources["context"].close()
browser.close()
p.stop()
atexit.register(cleanup)
class TabState:
"""A plain data class representing a tab's state. Fully copyable."""
def __init__(self, tab_id, proxy_used="Direct Connection"):
self.id = tab_id
self.url = "about:blank"
self.title = "New Tab"
self.parsed_text = "Welcome! Navigate to a URL or search to get started."
self.links = []
self.proxy_used = proxy_used
class BrowserState:
"""A plain data class representing the browser's overall state."""
def __init__(self):
self.tabs = [] # A list of TabState objects
self.active_tab_id = None
# Add bookmarks, history etc. here if needed
def get_active_tab(self) -> TabState | None:
if not self.active_tab_id: return None
return next((t for t in self.tabs if t.id == self.active_tab_id), None)
class CredentialRevolver: # (This class is unchanged)
def __init__(self, proxy_string: str):
self.proxies = self._parse_proxies(proxy_string)
if self.proxies: self.proxy_cycler = cycle(self.proxies)
else: self.proxy_cycler = None
def _parse_proxies(self, proxy_string: str):
proxies = [];
for line in proxy_string.strip().splitlines():
try: parsed = urllib.parse.urlparse(f"//{line.strip()}"); server = f"{parsed.scheme or 'http'}://{parsed.hostname}:{parsed.port}"; proxies.append({"server": server, "username": parsed.username, "password": parsed.password})
except: pass
return proxies
def get_next(self): return next(self.proxy_cycler) if self.proxy_cycler else None
def count(self): return len(self.proxies)
proxy_list_str = os.getenv("PROXY_LIST", "")
revolver = CredentialRevolver(proxy_list_str)
def _fetch_and_update_tab_state(tab_state: TabState, url: str):
"""
The core function. It uses the tab_state's ID to find the LIVE page,
navigates it, and then updates the copyable tab_state object.
"""
log = f"▶️ Navigating to {url}..."
live_page = LIVE_CONTEXTS[tab_state.id]["page"]
try:
live_page.goto(url, wait_until='domcontentloaded', timeout=30000)
tab_state.url = live_page.url
tab_state.title = live_page.title() or "No Title"
log += f"\n✅ Arrived at: {tab_state.url}"
html_content = live_page.content()
soup = BeautifulSoup(html_content, 'lxml')
for script in soup(["script", "style", "nav", "footer"]): script.extract()
tab_state.parsed_text = soup.get_text(separator='\n', strip=True)
tab_state.links = []
for link in soup.find_all('a', href=True):
href = link['href']
absolute_url = urllib.parse.urljoin(tab_state.url, href)
if absolute_url.startswith('http') and not re.match(r'javascript:|mailto:', absolute_url):
link_text = link.get_text(strip=True) or "[No Link Text]"
tab_state.links.append({'text': link_text, 'url': absolute_url})
log += f"\n🔗 Found {len(tab_state.links)} links."
except PlaywrightError as e:
error_message = str(e); tab_state.title = "Error"; tab_state.url = url
tab_state.parsed_text = f"❌ Failed to load page.\n\nError: {error_message}"
tab_state.links = []; log += f"\n❌ {error_message}"
return log
def handle_action(browser_state: BrowserState, action: str, value=None):
"""Main event handler. It modifies the browser_state and interacts with LIVE_CONTEXTS."""
log = ""
active_tab_state = browser_state.get_active_tab()
if action == "new_tab":
tab_id = str(uuid.uuid4())
proxy_config = revolver.get_next()
context = browser.new_context(proxy=proxy_config)
page = context.new_page()
LIVE_CONTEXTS[tab_id] = {"context": context, "page": page}
new_tab = TabState(tab_id, proxy_used=proxy_config['server'] if proxy_config else "Direct")
browser_state.tabs.append(new_tab)
browser_state.active_tab_id = tab_id
# Now navigate the new tab
log = _fetch_and_update_tab_state(new_tab, "https://www.whatsmyip.org/")
elif action == "go" and active_tab_state:
url = value if (urllib.parse.urlparse(value).scheme and urllib.parse.urlparse(value).netloc) else f"https://duckduckgo.com/html/?q={urllib.parse.quote_plus(value)}"
log = _fetch_and_update_tab_state(active_tab_state, url)
elif action == "click" and active_tab_state:
try:
link_index = int(value)
if 0 <= link_index < len(active_tab_state.links):
link_url = active_tab_state.links[link_index]['url']
log = _fetch_and_update_tab_state(active_tab_state, link_url)
else: log = "Invalid link number."
except: log = "Please enter a valid number to click."
elif action == "close_tab" and active_tab_state:
if len(browser_state.tabs) <= 1:
log = "Cannot close the last tab."
else:
tab_to_close_id = browser_state.active_tab_id
# Find and remove tab from state
tab_index = browser_state.tabs.index(active_tab_state)
browser_state.tabs.pop(tab_index)
# Set new active tab
new_index = tab_index - 1 if tab_index > 0 else 0
browser_state.active_tab_id = browser_state.tabs[new_index].id
# Close and remove live resources
resources = LIVE_CONTEXTS.pop(tab_to_close_id)
if not resources['context'].is_closed():
resources['context'].close()
log = f"💣 Tab closed."
elif action == "switch_tab":
try:
# The value from the radio button is the tab_id itself
browser_state.active_tab_id = value
log = f"Switched to tab."
except: log = "Invalid tab format."
# Return the modified state object. Gradio will handle copying it for the UI update.
return browser_state, log
def update_ui_components(browser_state: BrowserState):
"""Generates all UI component values from the plain browser_state object."""
active_tab = browser_state.get_active_tab()
if not active_tab:
return {
page_content: gr.Markdown("No active tabs. Please create a new one."),
url_textbox: "", links_display: "",
tab_selector: gr.Radio(choices=[], label="Active Tabs"),
}
# Use the tab ID as the value for the radio button
tab_choices = [(f"Tab {i}: {t.title[:25]}... (via {t.proxy_used})", t.id) for i, t in enumerate(browser_state.tabs)]
links_md = "### 🔗 Links on Page\n"
if active_tab.links:
for i, link in enumerate(active_tab.links[:25]): links_md += f"{i}. [{link['text'][:80]}]({link['url']})\n"
else: links_md += "_No links found._"
return {
page_content: gr.Markdown(f"# {active_tab.title}\n**URL:** {active_tab.url}\n\n---\n\n{active_tab.parsed_text[:2000]}..."),
url_textbox: gr.Textbox(value=active_tab.url),
links_display: gr.Markdown(links_md),
tab_selector: gr.Radio(choices=tab_choices, value=active_tab.id, label="Active Tabs"),
}
# --- Gradio UI Layout ---
with gr.Blocks(theme=gr.themes.Soft(), title="Real Browser Demo") as demo:
# Initialize the state with our new, copyable BrowserState class
browser_state = gr.State(BrowserState())
gr.Markdown("# 🛰️ Real Browser Demo (with Proxies & State Fix)")
gr.Markdown(f"This demo runs a real headless browser. **{revolver.count()} proxies loaded**.")
# (The rest of the UI layout is the same)
# ...
# --- Gradio Interface Layout ---
with gr.Row():
with gr.Column(scale=3):
url_textbox = gr.Textbox(label="URL or Search Term", interactive=True)
go_btn = gr.Button("Go", variant="primary")
with gr.Accordion("Page Content (Text Only)", open=True): page_content = gr.Markdown("Loading...")
log_display = gr.Textbox(label="Status Log", interactive=False)
with gr.Column(scale=1):
with gr.Row(): new_tab_btn = gr.Button("➕ New Tab"); close_tab_btn = gr.Button("❌ Close Tab")
tab_selector = gr.Radio(choices=[], label="Active Tabs", interactive=True)
with gr.Accordion("Clickable Links", open=True):
links_display = gr.Markdown("...")
with gr.Row(): click_num_box = gr.Number(label="Link #", scale=1, minimum=0, step=1); click_btn = gr.Button("Click Link", scale=2)
all_outputs = [page_content, url_textbox, links_display, tab_selector, log_display]
def master_handler(current_state, action, value):
new_state, log = handle_action(current_state, action, value)
# The update_ui_components function now only needs the state
ui_updates = update_ui_components(new_state)
ui_updates[log_display] = log
# IMPORTANT: Return the new_state object to update gr.State
return new_state, ui_updates
# Initial load: create the first tab
demo.load(
lambda s: master_handler(s, "new_tab", None)[1], # Just return the UI updates
inputs=[browser_state],
outputs=list(all_outputs)
)
# Event listeners now call the master_handler
go_btn.click(lambda s, v: master_handler(s, "go", v), [browser_state, url_textbox], [browser_state, *all_outputs], show_progress="full")
url_textbox.submit(lambda s, v: master_handler(s, "go", v), [browser_state, url_textbox], [browser_state, *all_outputs], show_progress="full")
click_btn.click(lambda s, v: master_handler(s, "click", v), [browser_state, click_num_box], [browser_state, *all_outputs], show_progress="full")
new_tab_btn.click(lambda s: master_handler(s, "new_tab", None), [browser_state], [browser_state, *all_outputs], show_progress="full")
close_tab_btn.click(lambda s: master_handler(s, "close_tab", None), [browser_state], [browser_state, *all_outputs])
tab_selector.input(lambda s, v: master_handler(s, "switch_tab", v), [browser_state, tab_selector], [browser_state, *all_outputs])
demo.launch() |