File size: 5,456 Bytes
f66080c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
from fastapi import FastAPI, HTTPException, Query
from playwright.async_api import async_playwright
from urllib.parse import urlparse
from typing import List, Dict, Set
import re

app = FastAPI()

# In-memory cache for JS file URLs
js_cache: Set[str] = set()


# Extract URLs from any text
def extract_possible_endpoints(text: str) -> List[str]:
    pattern = re.compile(r'https?://[^\s"\'<>]+')
    urls = pattern.findall(text)
    return list(set([
        url for url in urls
        if '/api/' in url or re.search(r'\.(json|php|xml|ajax|aspx|jsp)', url)
    ]))


# Identify internal links
async def extract_internal_links(page, base_url: str) -> List[str]:
    domain = urlparse(base_url).netloc
    anchors = await page.eval_on_selector_all('a[href]', 'els => els.map(el => el.href)')
    internal_links = [
        link for link in anchors
        if urlparse(link).netloc == domain
    ]
    return list(set(internal_links))


# Classify whether an endpoint is internal or third-party
def classify_endpoint(endpoint: str, domain: str) -> str:
    return "first-party" if urlparse(endpoint).netloc.endswith(domain) else "third-party"


# Simulate scroll and click interactions
async def simulate_interactions(page):
    try:
        for _ in range(5):  # Scroll
            await page.mouse.wheel(0, 800)
            await page.wait_for_timeout(1000)

        selectors_to_click = ['button', 'a', '[role="button"]', '.load-more', '.show-more']
        for selector in selectors_to_click:
            elements = await page.query_selector_all(selector)
            for el in elements[:5]:  # Avoid overclicking
                try:
                    await el.click()
                    await page.wait_for_timeout(1000)
                except:
                    continue
    except Exception as e:
        print(f"[!] Interaction simulation failed: {e}")


# Main scraper per page
async def scrape_page_for_endpoints(page, url: str, base_domain: str) -> List[Dict]:
    endpoints_data = []
    captured_urls = []

    async def capture_request(req):
        captured_urls.append({
            "url": req.url,
            "method": req.method
        })

    page.on("request", capture_request)

    try:
        await page.goto(url, timeout=60000)
        await page.wait_for_timeout(3000)

        await simulate_interactions(page)

        # Extract and parse JS files
        js_urls = await page.eval_on_selector_all('script[src]', 'els => els.map(el => el.src)')
        for js_url in js_urls:
            if js_url not in js_cache:
                js_cache.add(js_url)
                try:
                    response = await page.request.get(js_url)
                    if response.ok:
                        body = await response.text()
                        for ep in extract_possible_endpoints(body):
                            endpoints_data.append({
                                "url": ep,
                                "source": "js",
                                "type": classify_endpoint(ep, base_domain),
                                "method": "UNKNOWN"
                            })
                except:
                    continue

        # Network-based endpoints
        for item in captured_urls:
            ep = item["url"]
            if any(x in ep for x in ["/api/", ".json", ".php", ".xml", ".ajax"]):
                endpoints_data.append({
                    "url": ep,
                    "source": "network",
                    "type": classify_endpoint(ep, base_domain),
                    "method": item["method"]
                })

    except Exception as e:
        print(f"[!] Failed to scrape {url}: {e}")

    return endpoints_data


@app.get("/scrape-api-endpoints")
async def scrape_api_endpoints(
    website: str = Query(..., description="Website URL to scrape"),
    max_depth: int = Query(1, description="How deep to crawl internal links")
):
    try:
        all_endpoints: List[Dict] = []
        visited: Set[str] = set()

        async with async_playwright() as p:
            browser = await p.chromium.launch(headless=True)
            context = await browser.new_context()
            page = await context.new_page()

            queue = [(website, 0)]
            base_domain = urlparse(website).netloc

            while queue:
                current_url, depth = queue.pop(0)
                if current_url in visited or depth > max_depth:
                    continue

                visited.add(current_url)
                print(f"[+] Scraping {current_url}")

                endpoints = await scrape_page_for_endpoints(page, current_url, base_domain)
                all_endpoints.extend(endpoints)

                if depth < max_depth:
                    links = await extract_internal_links(page, website)
                    for link in links:
                        if link not in visited:
                            queue.append((link, depth + 1))

            await browser.close()

        # Deduplicate by URL
        unique_by_url = {}
        for item in all_endpoints:
            unique_by_url[item["url"]] = item

        return {
            "website": website,
            "pages_visited": len(visited),
            "total_endpoints_found": len(unique_by_url),
            "api_endpoints": list(unique_by_url.values())
        }

    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Scraping failed: {str(e)}")