Spaces:
Sleeping
Sleeping
File size: 4,302 Bytes
de68d43 a9ef651 de68d43 a9ef651 de68d43 a9ef651 75d501d de68d43 c16aac6 8beead4 c16aac6 a9429bd de68d43 5b0d5e5 a9ef651 de68d43 a9ef651 de68d43 a9ef651 de68d43 5b0d5e5 a9ef651 de68d43 a9ef651 5006d83 a9429bd c16aac6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
from __future__ import annotations
from typing import TYPE_CHECKING, AsyncContextManager
from playwright.async_api import ( # noqa: F401
Browser,
BrowserContext,
Page,
TimeoutError,
async_playwright,
)
from .models import GetContentModel, PageModel, ScreenshotModel
if TYPE_CHECKING:
from types import TracebackType
class AsyncPlaywrightContext:
HEADERS = "Mozilla/5.0 (Macintosh; Intel Mac OS X 14_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36" # noqa: E501
def __init__(self) -> None:
self.playwright = None
self.browser = None
self.default_context = None
async def __aenter__(self) -> AsyncContextManager:
if not self.playwright:
self.playwright = await async_playwright().start()
self.browser = await self.playwright.firefox.launch(
firefox_user_prefs={
"extensions.enabledScopes": 1,
"extensions.autoDisableScopes": 1,
"dom.webdriver.enabled": False,
"useAutomationExtension": False,
"general.useragent.override": self.HEADERS,
},
)
self.default_context = await self.browser.new_context()
return self
async def new_context_page(
self,
screenshot_model: GetContentModel,
browser: Browser,
page_model: PageModel,
) -> Page:
params = {
'color_scheme': page_model.color_scheme,
'java_script_enabled': page_model.java_script_enabled,
'no_viewport': page_model.no_viewport,
'proxy': page_model.proxy.model_dump() if page_model.proxy else None,
'viewport': page_model.viewport.model_dump() if page_model.viewport else None,
}
if not screenshot_model.new_browser:
return await self.browser.new_page(**params)
new_context = await browser.new_context(**params)
return await new_context.new_page()
async def screenshot(
self,
screenshot_model: ScreenshotModel,
page_model: PageModel,
) -> bytes:
page = await self.new_context_page(
screenshot_model = screenshot_model,
browser=self.browser,
page_model=page_model)
await page.goto(str(screenshot_model.url))
await page.wait_for_timeout(screenshot_model.ms_delay)
screenshot_locator = (
page.locator(screenshot_model.query_selector)
if screenshot_model.query_selector
else None
)
if screenshot_locator:
if screenshot_model.wait_selector:
await screenshot_locator.wait_for()
screenshot_data: bytes = await screenshot_locator.screenshot()
else:
screenshot_data: bytes = await page.screenshot(full_page=screenshot_model.full_page)
await page.close()
return screenshot_data
async def get_content(
self,
get_content_model: GetContentModel,
page_model: PageModel,
) -> str:
page = await self.new_context_page(
screenshot_model = GetContentModel,
browser=self.browser,
page_model=page_model)
await page.goto(str(get_content_model.url))
await page.wait_for_timeout(get_content_model.ms_delay)
wait_locator = (
await page.locator(get_content_model.query_selector)
if get_content_model.query_selector
else None
)
if wait_locator:
await wait_locator.wait_for()
html = page.content()
await page.close()
return html
async def close_instance(self) -> None:
if self.playwright:
await self.browser.close()
await self.playwright.stop()
self.browser = None
self.playwright = None
async def __aexit__(
self,
typ: type[BaseException] | None,
exc: BaseException | None,
tb: TracebackType | None,
) -> None:
print('speed test')
# if self.browser:
# await self.browser.close()
# if self.playwright:
# await self.playwright.stop() |