apexherbert200 commited on
Commit
f66080c
·
1 Parent(s): 0883890

Tool for scraping url endpoints

Browse files
Files changed (2) hide show
  1. Dockerfile +1 -1
  2. scrapeAPI2.py +163 -0
Dockerfile CHANGED
@@ -53,4 +53,4 @@ RUN python -m playwright install chromium
53
  EXPOSE 7860
54
 
55
  # Run the FastAPI application
56
- CMD ["python", "-m", "uvicorn", "scrapeAPI:app", "--host", "0.0.0.0", "--port", "7860"]
 
53
  EXPOSE 7860
54
 
55
  # Run the FastAPI application
56
+ CMD ["python", "-m", "uvicorn", "scrapeAPI2:app", "--host", "0.0.0.0", "--port", "7860"]
scrapeAPI2.py ADDED
@@ -0,0 +1,163 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import FastAPI, HTTPException, Query
2
+ from playwright.async_api import async_playwright
3
+ from urllib.parse import urlparse
4
+ from typing import List, Dict, Set
5
+ import re
6
+
7
+ app = FastAPI()
8
+
9
+ # In-memory cache for JS file URLs
10
+ js_cache: Set[str] = set()
11
+
12
+
13
+ # Extract URLs from any text
14
+ def extract_possible_endpoints(text: str) -> List[str]:
15
+ pattern = re.compile(r'https?://[^\s"\'<>]+')
16
+ urls = pattern.findall(text)
17
+ return list(set([
18
+ url for url in urls
19
+ if '/api/' in url or re.search(r'\.(json|php|xml|ajax|aspx|jsp)', url)
20
+ ]))
21
+
22
+
23
+ # Identify internal links
24
+ async def extract_internal_links(page, base_url: str) -> List[str]:
25
+ domain = urlparse(base_url).netloc
26
+ anchors = await page.eval_on_selector_all('a[href]', 'els => els.map(el => el.href)')
27
+ internal_links = [
28
+ link for link in anchors
29
+ if urlparse(link).netloc == domain
30
+ ]
31
+ return list(set(internal_links))
32
+
33
+
34
+ # Classify whether an endpoint is internal or third-party
35
+ def classify_endpoint(endpoint: str, domain: str) -> str:
36
+ return "first-party" if urlparse(endpoint).netloc.endswith(domain) else "third-party"
37
+
38
+
39
+ # Simulate scroll and click interactions
40
+ async def simulate_interactions(page):
41
+ try:
42
+ for _ in range(5): # Scroll
43
+ await page.mouse.wheel(0, 800)
44
+ await page.wait_for_timeout(1000)
45
+
46
+ selectors_to_click = ['button', 'a', '[role="button"]', '.load-more', '.show-more']
47
+ for selector in selectors_to_click:
48
+ elements = await page.query_selector_all(selector)
49
+ for el in elements[:5]: # Avoid overclicking
50
+ try:
51
+ await el.click()
52
+ await page.wait_for_timeout(1000)
53
+ except:
54
+ continue
55
+ except Exception as e:
56
+ print(f"[!] Interaction simulation failed: {e}")
57
+
58
+
59
+ # Main scraper per page
60
+ async def scrape_page_for_endpoints(page, url: str, base_domain: str) -> List[Dict]:
61
+ endpoints_data = []
62
+ captured_urls = []
63
+
64
+ async def capture_request(req):
65
+ captured_urls.append({
66
+ "url": req.url,
67
+ "method": req.method
68
+ })
69
+
70
+ page.on("request", capture_request)
71
+
72
+ try:
73
+ await page.goto(url, timeout=60000)
74
+ await page.wait_for_timeout(3000)
75
+
76
+ await simulate_interactions(page)
77
+
78
+ # Extract and parse JS files
79
+ js_urls = await page.eval_on_selector_all('script[src]', 'els => els.map(el => el.src)')
80
+ for js_url in js_urls:
81
+ if js_url not in js_cache:
82
+ js_cache.add(js_url)
83
+ try:
84
+ response = await page.request.get(js_url)
85
+ if response.ok:
86
+ body = await response.text()
87
+ for ep in extract_possible_endpoints(body):
88
+ endpoints_data.append({
89
+ "url": ep,
90
+ "source": "js",
91
+ "type": classify_endpoint(ep, base_domain),
92
+ "method": "UNKNOWN"
93
+ })
94
+ except:
95
+ continue
96
+
97
+ # Network-based endpoints
98
+ for item in captured_urls:
99
+ ep = item["url"]
100
+ if any(x in ep for x in ["/api/", ".json", ".php", ".xml", ".ajax"]):
101
+ endpoints_data.append({
102
+ "url": ep,
103
+ "source": "network",
104
+ "type": classify_endpoint(ep, base_domain),
105
+ "method": item["method"]
106
+ })
107
+
108
+ except Exception as e:
109
+ print(f"[!] Failed to scrape {url}: {e}")
110
+
111
+ return endpoints_data
112
+
113
+
114
+ @app.get("/scrape-api-endpoints")
115
+ async def scrape_api_endpoints(
116
+ website: str = Query(..., description="Website URL to scrape"),
117
+ max_depth: int = Query(1, description="How deep to crawl internal links")
118
+ ):
119
+ try:
120
+ all_endpoints: List[Dict] = []
121
+ visited: Set[str] = set()
122
+
123
+ async with async_playwright() as p:
124
+ browser = await p.chromium.launch(headless=True)
125
+ context = await browser.new_context()
126
+ page = await context.new_page()
127
+
128
+ queue = [(website, 0)]
129
+ base_domain = urlparse(website).netloc
130
+
131
+ while queue:
132
+ current_url, depth = queue.pop(0)
133
+ if current_url in visited or depth > max_depth:
134
+ continue
135
+
136
+ visited.add(current_url)
137
+ print(f"[+] Scraping {current_url}")
138
+
139
+ endpoints = await scrape_page_for_endpoints(page, current_url, base_domain)
140
+ all_endpoints.extend(endpoints)
141
+
142
+ if depth < max_depth:
143
+ links = await extract_internal_links(page, website)
144
+ for link in links:
145
+ if link not in visited:
146
+ queue.append((link, depth + 1))
147
+
148
+ await browser.close()
149
+
150
+ # Deduplicate by URL
151
+ unique_by_url = {}
152
+ for item in all_endpoints:
153
+ unique_by_url[item["url"]] = item
154
+
155
+ return {
156
+ "website": website,
157
+ "pages_visited": len(visited),
158
+ "total_endpoints_found": len(unique_by_url),
159
+ "api_endpoints": list(unique_by_url.values())
160
+ }
161
+
162
+ except Exception as e:
163
+ raise HTTPException(status_code=500, detail=f"Scraping failed: {str(e)}")