from fastapi import FastAPI, HTTPException, Query from playwright.async_api import async_playwright from urllib.parse import urlparse from typing import List, Dict, Set import re app = FastAPI() # In-memory cache for JS file URLs js_cache: Set[str] = set() # Extract URLs from any text def extract_possible_endpoints(text: str) -> List[str]: pattern = re.compile(r'https?://[^\s"\'<>]+') urls = pattern.findall(text) return list(set([ url for url in urls if '/api/' in url or re.search(r'\.(json|php|xml|ajax|aspx|jsp)', url) ])) # Identify internal links async def extract_internal_links(page, base_url: str) -> List[str]: domain = urlparse(base_url).netloc anchors = await page.eval_on_selector_all('a[href]', 'els => els.map(el => el.href)') internal_links = [ link for link in anchors if urlparse(link).netloc == domain ] return list(set(internal_links)) # Classify whether an endpoint is internal or third-party def classify_endpoint(endpoint: str, domain: str) -> str: return "first-party" if urlparse(endpoint).netloc.endswith(domain) else "third-party" # Simulate scroll and click interactions async def simulate_interactions(page): try: for _ in range(5): # Scroll await page.mouse.wheel(0, 800) await page.wait_for_timeout(1000) selectors_to_click = ['button', 'a', '[role="button"]', '.load-more', '.show-more'] for selector in selectors_to_click: elements = await page.query_selector_all(selector) for el in elements[:5]: # Avoid overclicking try: await el.click() await page.wait_for_timeout(1000) except: continue except Exception as e: print(f"[!] Interaction simulation failed: {e}") # Main scraper per page async def scrape_page_for_endpoints(page, url: str, base_domain: str) -> List[Dict]: endpoints_data = [] captured_urls = [] async def capture_request(req): captured_urls.append({ "url": req.url, "method": req.method }) page.on("request", capture_request) try: await page.goto(url, timeout=60000) await page.wait_for_timeout(3000) await simulate_interactions(page) # Extract and parse JS files js_urls = await page.eval_on_selector_all('script[src]', 'els => els.map(el => el.src)') for js_url in js_urls: if js_url not in js_cache: js_cache.add(js_url) try: response = await page.request.get(js_url) if response.ok: body = await response.text() for ep in extract_possible_endpoints(body): endpoints_data.append({ "url": ep, "source": "js", "type": classify_endpoint(ep, base_domain), "method": "UNKNOWN" }) except: continue # Network-based endpoints for item in captured_urls: ep = item["url"] if any(x in ep for x in ["/api/", ".json", ".php", ".xml", ".ajax"]): endpoints_data.append({ "url": ep, "source": "network", "type": classify_endpoint(ep, base_domain), "method": item["method"] }) except Exception as e: print(f"[!] Failed to scrape {url}: {e}") return endpoints_data @app.get("/scrape-api-endpoints") async def scrape_api_endpoints( website: str = Query(..., description="Website URL to scrape"), max_depth: int = Query(1, description="How deep to crawl internal links") ): try: all_endpoints: List[Dict] = [] visited: Set[str] = set() async with async_playwright() as p: browser = await p.chromium.launch(headless=True) context = await browser.new_context() page = await context.new_page() queue = [(website, 0)] base_domain = urlparse(website).netloc while queue: current_url, depth = queue.pop(0) if current_url in visited or depth > max_depth: continue visited.add(current_url) print(f"[+] Scraping {current_url}") endpoints = await scrape_page_for_endpoints(page, current_url, base_domain) all_endpoints.extend(endpoints) if depth < max_depth: links = await extract_internal_links(page, website) for link in links: if link not in visited: queue.append((link, depth + 1)) await browser.close() # Deduplicate by URL unique_by_url = {} for item in all_endpoints: unique_by_url[item["url"]] = item return { "website": website, "pages_visited": len(visited), "total_endpoints_found": len(unique_by_url), "api_endpoints": list(unique_by_url.values()) } except Exception as e: raise HTTPException(status_code=500, detail=f"Scraping failed: {str(e)}")