File size: 3,016 Bytes
6f8bc75
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
from __future__ import annotations

import logging
from typing import IO, TYPE_CHECKING, AsyncContextManager, Literal

from fastapi import APIRouter, HTTPException
from fastapi.responses import Response
from playwright.async_api import BrowserContext, TimeoutError, async_playwright
from pydantic import BaseModel, HttpUrl

if TYPE_CHECKING:
    from types import TracebackType


router = APIRouter()


class ViewPort(BaseModel):
    width: int = 1280
    height: int = 720


class ScreenshotItems(BaseModel):
    url: HttpUrl
    full_page: bool | None = False
    query_selector: str | None = None

    viewport: ViewPort | None = None
    color_scheme: Literal["light", "dark", "no-preference"] | None = "no-preference"
    bypass_csp: bool | None = False
    java_script_enabled: bool | None = True
    proxy: dict | None = None
    is_mobile: bool | None = False
    no_viewport: bool | None = False


class ScreenShot:
    async def __aenter__(self) -> AsyncContextManager[ScreenShot]:
        self.playwright = await async_playwright().start()
        self.browser = await self.playwright.chromium.launch(
            args=["--disable-extensions"],
            chromium_sandbox=True,
        )
        return self

    async def browser_context(self, items: ScreenshotItems) -> BrowserContext:
        return await self.browser.new_context(
            viewport=items.viewport.model_dump() if items.viewport else None,
            color_scheme=items.color_scheme,
            bypass_csp=items.bypass_csp,
            java_script_enabled=items.java_script_enabled,
            proxy=items.proxy.model_dump() if items.proxy else None,
            is_mobile=items.is_mobile,
            no_viewport=items.no_viewport,
        )

    async def capture(self, items: ScreenshotItems) -> IO[bytes]:
        context: BrowserContext = await self.browser_context(items)
        page = await context.new_page()
        await page.goto(str(items.url))

        if items.query_selector:
            page = page.locator(items.query_selector)

        screenshot_data = await page.screenshot(full_page=items.full_page)
        await context.close()
        return screenshot_data

    async def __aexit__(
        self,
        typ: type[BaseException] | None,
        exc: BaseException | None,
        tb: TracebackType | None,
    ) -> None:
        if self.browser:
            await self.browser.close()
        if self.playwright:
            await self.playwright.stop()


@router.post("/screenshot")
async def screenshot(data: ScreenshotItems) -> Response:
    async with ScreenShot() as sc:
        try:
            response = await sc.capture(items=data)
            return Response(content=response, media_type="image/png")
        except TimeoutError as e:
            raise HTTPException(
                status_code=504,
                detail=f"An error occurred while generating the screenshot: {e}",
            ) from e
        except Exception:
            logging.exception("screenshot unhandled error")