apexherbert200 commited on
Commit
3c655d3
·
1 Parent(s): f66080c

Tool for scraping contacts

Browse files
Files changed (2) hide show
  1. contacts.py +0 -0
  2. scrapeAPI2.py +39 -119
contacts.py ADDED
File without changes
scrapeAPI2.py CHANGED
@@ -1,162 +1,82 @@
1
  from fastapi import FastAPI, HTTPException, Query
2
  from playwright.async_api import async_playwright
3
  from urllib.parse import urlparse
4
- from typing import List, Dict, Set
5
  import re
6
 
7
  app = FastAPI()
8
 
9
- # In-memory cache for JS file URLs
10
- js_cache: Set[str] = set()
 
 
 
11
 
12
-
13
- # Extract URLs from any text
14
- def extract_possible_endpoints(text: str) -> List[str]:
15
- pattern = re.compile(r'https?://[^\s"\'<>]+')
16
- urls = pattern.findall(text)
17
- return list(set([
18
- url for url in urls
19
- if '/api/' in url or re.search(r'\.(json|php|xml|ajax|aspx|jsp)', url)
20
- ]))
21
-
22
-
23
- # Identify internal links
24
  async def extract_internal_links(page, base_url: str) -> List[str]:
25
- domain = urlparse(base_url).netloc
26
  anchors = await page.eval_on_selector_all('a[href]', 'els => els.map(el => el.href)')
27
- internal_links = [
28
- link for link in anchors
29
- if urlparse(link).netloc == domain
30
- ]
31
- return list(set(internal_links))
32
-
33
-
34
- # Classify whether an endpoint is internal or third-party
35
- def classify_endpoint(endpoint: str, domain: str) -> str:
36
- return "first-party" if urlparse(endpoint).netloc.endswith(domain) else "third-party"
37
-
38
-
39
- # Simulate scroll and click interactions
40
- async def simulate_interactions(page):
41
- try:
42
- for _ in range(5): # Scroll
43
- await page.mouse.wheel(0, 800)
44
- await page.wait_for_timeout(1000)
45
-
46
- selectors_to_click = ['button', 'a', '[role="button"]', '.load-more', '.show-more']
47
- for selector in selectors_to_click:
48
- elements = await page.query_selector_all(selector)
49
- for el in elements[:5]: # Avoid overclicking
50
- try:
51
- await el.click()
52
- await page.wait_for_timeout(1000)
53
- except:
54
- continue
55
- except Exception as e:
56
- print(f"[!] Interaction simulation failed: {e}")
57
-
58
-
59
- # Main scraper per page
60
- async def scrape_page_for_endpoints(page, url: str, base_domain: str) -> List[Dict]:
61
- endpoints_data = []
62
- captured_urls = []
63
-
64
- async def capture_request(req):
65
- captured_urls.append({
66
- "url": req.url,
67
- "method": req.method
68
- })
69
 
70
- page.on("request", capture_request)
 
 
 
71
 
 
72
  try:
73
  await page.goto(url, timeout=60000)
74
- await page.wait_for_timeout(3000)
75
-
76
- await simulate_interactions(page)
77
-
78
- # Extract and parse JS files
79
- js_urls = await page.eval_on_selector_all('script[src]', 'els => els.map(el => el.src)')
80
- for js_url in js_urls:
81
- if js_url not in js_cache:
82
- js_cache.add(js_url)
83
- try:
84
- response = await page.request.get(js_url)
85
- if response.ok:
86
- body = await response.text()
87
- for ep in extract_possible_endpoints(body):
88
- endpoints_data.append({
89
- "url": ep,
90
- "source": "js",
91
- "type": classify_endpoint(ep, base_domain),
92
- "method": "UNKNOWN"
93
- })
94
- except:
95
- continue
96
-
97
- # Network-based endpoints
98
- for item in captured_urls:
99
- ep = item["url"]
100
- if any(x in ep for x in ["/api/", ".json", ".php", ".xml", ".ajax"]):
101
- endpoints_data.append({
102
- "url": ep,
103
- "source": "network",
104
- "type": classify_endpoint(ep, base_domain),
105
- "method": item["method"]
106
- })
107
-
108
  except Exception as e:
109
  print(f"[!] Failed to scrape {url}: {e}")
 
110
 
111
- return endpoints_data
112
-
113
-
114
- @app.get("/scrape-api-endpoints")
115
- async def scrape_api_endpoints(
116
- website: str = Query(..., description="Website URL to scrape"),
117
- max_depth: int = Query(1, description="How deep to crawl internal links")
118
  ):
119
  try:
120
- all_endpoints: List[Dict] = []
121
- visited: Set[str] = set()
122
 
123
  async with async_playwright() as p:
124
  browser = await p.chromium.launch(headless=True)
125
  context = await browser.new_context()
126
  page = await context.new_page()
127
-
128
  queue = [(website, 0)]
129
- base_domain = urlparse(website).netloc
130
 
131
  while queue:
132
  current_url, depth = queue.pop(0)
133
  if current_url in visited or depth > max_depth:
134
  continue
135
-
136
  visited.add(current_url)
137
- print(f"[+] Scraping {current_url}")
138
 
139
- endpoints = await scrape_page_for_endpoints(page, current_url, base_domain)
140
- all_endpoints.extend(endpoints)
 
 
141
 
142
  if depth < max_depth:
143
- links = await extract_internal_links(page, website)
144
- for link in links:
145
- if link not in visited:
146
- queue.append((link, depth + 1))
 
 
 
147
 
148
  await browser.close()
149
 
150
- # Deduplicate by URL
151
- unique_by_url = {}
152
- for item in all_endpoints:
153
- unique_by_url[item["url"]] = item
154
-
155
  return {
156
  "website": website,
157
  "pages_visited": len(visited),
158
- "total_endpoints_found": len(unique_by_url),
159
- "api_endpoints": list(unique_by_url.values())
 
160
  }
161
 
162
  except Exception as e:
 
1
  from fastapi import FastAPI, HTTPException, Query
2
  from playwright.async_api import async_playwright
3
  from urllib.parse import urlparse
4
+ from typing import List, Set, Dict
5
  import re
6
 
7
  app = FastAPI()
8
 
9
+ contact_info_pattern = {
10
+ "email": re.compile(r'[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+'),
11
+ "phone": re.compile(r'(\+\d{1,3}[- ]?)?\(?\d{2,4}\)?[-.\s]?\d{3}[-.\s]?\d{4,6}'),
12
+ "social": re.compile(r'https?://(www\.)?(twitter|linkedin|facebook|instagram)\.com/[^\s"\'<>]+')
13
+ }
14
 
 
 
 
 
 
 
 
 
 
 
 
 
15
  async def extract_internal_links(page, base_url: str) -> List[str]:
 
16
  anchors = await page.eval_on_selector_all('a[href]', 'els => els.map(el => el.href)')
17
+ domain = urlparse(base_url).netloc
18
+ return list(set([
19
+ link for link in anchors if urlparse(link).netloc == domain
20
+ ]))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
21
 
22
+ def extract_contact_info(text: str) -> Dict[str, List[str]]:
23
+ return {
24
+ key: list(set(pattern.findall(text))) for key, pattern in contact_info_pattern.items()
25
+ }
26
 
27
+ async def scrape_page_for_contacts(page, url: str) -> Dict[str, List[str]]:
28
  try:
29
  await page.goto(url, timeout=60000)
30
+ await page.wait_for_timeout(2000)
31
+ content = await page.content()
32
+ return extract_contact_info(content)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
33
  except Exception as e:
34
  print(f"[!] Failed to scrape {url}: {e}")
35
+ return {key: [] for key in contact_info_pattern.keys()}
36
 
37
+ @app.get("/scrape-contact-info")
38
+ async def scrape_contact_info(
39
+ website: str = Query(..., description="Website URL to crawl for contact info"),
40
+ max_depth: int = Query(1, description="Max link crawl depth (1 = homepage only)")
 
 
 
41
  ):
42
  try:
43
+ visited = set()
44
+ combined_info = {key: set() for key in contact_info_pattern.keys()}
45
 
46
  async with async_playwright() as p:
47
  browser = await p.chromium.launch(headless=True)
48
  context = await browser.new_context()
49
  page = await context.new_page()
 
50
  queue = [(website, 0)]
 
51
 
52
  while queue:
53
  current_url, depth = queue.pop(0)
54
  if current_url in visited or depth > max_depth:
55
  continue
 
56
  visited.add(current_url)
 
57
 
58
+ print(f"[+] Visiting: {current_url}")
59
+ info = await scrape_page_for_contacts(page, current_url)
60
+ for key in combined_info:
61
+ combined_info[key].update(info.get(key, []))
62
 
63
  if depth < max_depth:
64
+ try:
65
+ links = await extract_internal_links(page, website)
66
+ for link in links:
67
+ if link not in visited:
68
+ queue.append((link, depth + 1))
69
+ except Exception as e:
70
+ print(f"[!] Failed to get links: {e}")
71
 
72
  await browser.close()
73
 
 
 
 
 
 
74
  return {
75
  "website": website,
76
  "pages_visited": len(visited),
77
+ "emails": list(combined_info["email"]),
78
+ "phone_numbers": list(combined_info["phone"]),
79
+ "social_profiles": list(combined_info["social"])
80
  }
81
 
82
  except Exception as e: