|
from fastapi import FastAPI, HTTPException, Query |
|
from playwright.async_api import async_playwright |
|
from urllib.parse import urlparse |
|
from typing import List, Dict, Set |
|
import re |
|
|
|
app = FastAPI() |
|
|
|
|
|
js_cache: Set[str] = set() |
|
|
|
|
|
|
|
def extract_possible_endpoints(text: str) -> List[str]: |
|
pattern = re.compile(r'https?://[^\s"\'<>]+') |
|
urls = pattern.findall(text) |
|
return list(set([ |
|
url for url in urls |
|
if '/api/' in url or re.search(r'\.(json|php|xml|ajax|aspx|jsp)', url) |
|
])) |
|
|
|
|
|
|
|
async def extract_internal_links(page, base_url: str) -> List[str]: |
|
domain = urlparse(base_url).netloc |
|
anchors = await page.eval_on_selector_all('a[href]', 'els => els.map(el => el.href)') |
|
internal_links = [ |
|
link for link in anchors |
|
if urlparse(link).netloc == domain |
|
] |
|
return list(set(internal_links)) |
|
|
|
|
|
|
|
def classify_endpoint(endpoint: str, domain: str) -> str: |
|
return "first-party" if urlparse(endpoint).netloc.endswith(domain) else "third-party" |
|
|
|
|
|
|
|
async def simulate_interactions(page): |
|
try: |
|
for _ in range(5): |
|
await page.mouse.wheel(0, 800) |
|
await page.wait_for_timeout(1000) |
|
|
|
selectors_to_click = ['button', 'a', '[role="button"]', '.load-more', '.show-more'] |
|
for selector in selectors_to_click: |
|
elements = await page.query_selector_all(selector) |
|
for el in elements[:5]: |
|
try: |
|
await el.click() |
|
await page.wait_for_timeout(1000) |
|
except: |
|
continue |
|
except Exception as e: |
|
print(f"[!] Interaction simulation failed: {e}") |
|
|
|
|
|
|
|
async def scrape_page_for_endpoints(page, url: str, base_domain: str) -> List[Dict]: |
|
endpoints_data = [] |
|
captured_urls = [] |
|
|
|
async def capture_request(req): |
|
captured_urls.append({ |
|
"url": req.url, |
|
"method": req.method |
|
}) |
|
|
|
page.on("request", capture_request) |
|
|
|
try: |
|
await page.goto(url, timeout=60000) |
|
await page.wait_for_timeout(3000) |
|
|
|
await simulate_interactions(page) |
|
|
|
|
|
js_urls = await page.eval_on_selector_all('script[src]', 'els => els.map(el => el.src)') |
|
for js_url in js_urls: |
|
if js_url not in js_cache: |
|
js_cache.add(js_url) |
|
try: |
|
response = await page.request.get(js_url) |
|
if response.ok: |
|
body = await response.text() |
|
for ep in extract_possible_endpoints(body): |
|
endpoints_data.append({ |
|
"url": ep, |
|
"source": "js", |
|
"type": classify_endpoint(ep, base_domain), |
|
"method": "UNKNOWN" |
|
}) |
|
except: |
|
continue |
|
|
|
|
|
for item in captured_urls: |
|
ep = item["url"] |
|
if any(x in ep for x in ["/api/", ".json", ".php", ".xml", ".ajax"]): |
|
endpoints_data.append({ |
|
"url": ep, |
|
"source": "network", |
|
"type": classify_endpoint(ep, base_domain), |
|
"method": item["method"] |
|
}) |
|
|
|
except Exception as e: |
|
print(f"[!] Failed to scrape {url}: {e}") |
|
|
|
return endpoints_data |
|
|
|
|
|
@app.get("/scrape-api-endpoints") |
|
async def scrape_api_endpoints( |
|
website: str = Query(..., description="Website URL to scrape"), |
|
max_depth: int = Query(1, description="How deep to crawl internal links") |
|
): |
|
try: |
|
all_endpoints: List[Dict] = [] |
|
visited: Set[str] = set() |
|
|
|
async with async_playwright() as p: |
|
browser = await p.chromium.launch(headless=True) |
|
context = await browser.new_context() |
|
page = await context.new_page() |
|
|
|
queue = [(website, 0)] |
|
base_domain = urlparse(website).netloc |
|
|
|
while queue: |
|
current_url, depth = queue.pop(0) |
|
if current_url in visited or depth > max_depth: |
|
continue |
|
|
|
visited.add(current_url) |
|
print(f"[+] Scraping {current_url}") |
|
|
|
endpoints = await scrape_page_for_endpoints(page, current_url, base_domain) |
|
all_endpoints.extend(endpoints) |
|
|
|
if depth < max_depth: |
|
links = await extract_internal_links(page, website) |
|
for link in links: |
|
if link not in visited: |
|
queue.append((link, depth + 1)) |
|
|
|
await browser.close() |
|
|
|
|
|
unique_by_url = {} |
|
for item in all_endpoints: |
|
unique_by_url[item["url"]] = item |
|
|
|
return { |
|
"website": website, |
|
"pages_visited": len(visited), |
|
"total_endpoints_found": len(unique_by_url), |
|
"api_endpoints": list(unique_by_url.values()) |
|
} |
|
|
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=f"Scraping failed: {str(e)}") |
|
|