File size: 5,456 Bytes
f66080c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
from fastapi import FastAPI, HTTPException, Query
from playwright.async_api import async_playwright
from urllib.parse import urlparse
from typing import List, Dict, Set
import re
app = FastAPI()
# In-memory cache for JS file URLs
js_cache: Set[str] = set()
# Extract URLs from any text
def extract_possible_endpoints(text: str) -> List[str]:
pattern = re.compile(r'https?://[^\s"\'<>]+')
urls = pattern.findall(text)
return list(set([
url for url in urls
if '/api/' in url or re.search(r'\.(json|php|xml|ajax|aspx|jsp)', url)
]))
# Identify internal links
async def extract_internal_links(page, base_url: str) -> List[str]:
domain = urlparse(base_url).netloc
anchors = await page.eval_on_selector_all('a[href]', 'els => els.map(el => el.href)')
internal_links = [
link for link in anchors
if urlparse(link).netloc == domain
]
return list(set(internal_links))
# Classify whether an endpoint is internal or third-party
def classify_endpoint(endpoint: str, domain: str) -> str:
return "first-party" if urlparse(endpoint).netloc.endswith(domain) else "third-party"
# Simulate scroll and click interactions
async def simulate_interactions(page):
try:
for _ in range(5): # Scroll
await page.mouse.wheel(0, 800)
await page.wait_for_timeout(1000)
selectors_to_click = ['button', 'a', '[role="button"]', '.load-more', '.show-more']
for selector in selectors_to_click:
elements = await page.query_selector_all(selector)
for el in elements[:5]: # Avoid overclicking
try:
await el.click()
await page.wait_for_timeout(1000)
except:
continue
except Exception as e:
print(f"[!] Interaction simulation failed: {e}")
# Main scraper per page
async def scrape_page_for_endpoints(page, url: str, base_domain: str) -> List[Dict]:
endpoints_data = []
captured_urls = []
async def capture_request(req):
captured_urls.append({
"url": req.url,
"method": req.method
})
page.on("request", capture_request)
try:
await page.goto(url, timeout=60000)
await page.wait_for_timeout(3000)
await simulate_interactions(page)
# Extract and parse JS files
js_urls = await page.eval_on_selector_all('script[src]', 'els => els.map(el => el.src)')
for js_url in js_urls:
if js_url not in js_cache:
js_cache.add(js_url)
try:
response = await page.request.get(js_url)
if response.ok:
body = await response.text()
for ep in extract_possible_endpoints(body):
endpoints_data.append({
"url": ep,
"source": "js",
"type": classify_endpoint(ep, base_domain),
"method": "UNKNOWN"
})
except:
continue
# Network-based endpoints
for item in captured_urls:
ep = item["url"]
if any(x in ep for x in ["/api/", ".json", ".php", ".xml", ".ajax"]):
endpoints_data.append({
"url": ep,
"source": "network",
"type": classify_endpoint(ep, base_domain),
"method": item["method"]
})
except Exception as e:
print(f"[!] Failed to scrape {url}: {e}")
return endpoints_data
@app.get("/scrape-api-endpoints")
async def scrape_api_endpoints(
website: str = Query(..., description="Website URL to scrape"),
max_depth: int = Query(1, description="How deep to crawl internal links")
):
try:
all_endpoints: List[Dict] = []
visited: Set[str] = set()
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context()
page = await context.new_page()
queue = [(website, 0)]
base_domain = urlparse(website).netloc
while queue:
current_url, depth = queue.pop(0)
if current_url in visited or depth > max_depth:
continue
visited.add(current_url)
print(f"[+] Scraping {current_url}")
endpoints = await scrape_page_for_endpoints(page, current_url, base_domain)
all_endpoints.extend(endpoints)
if depth < max_depth:
links = await extract_internal_links(page, website)
for link in links:
if link not in visited:
queue.append((link, depth + 1))
await browser.close()
# Deduplicate by URL
unique_by_url = {}
for item in all_endpoints:
unique_by_url[item["url"]] = item
return {
"website": website,
"pages_visited": len(visited),
"total_endpoints_found": len(unique_by_url),
"api_endpoints": list(unique_by_url.values())
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Scraping failed: {str(e)}")
|