apexherbert200 commited on
Commit
9a5d887
·
1 Parent(s): 3c655d3

Tool for scraping contacts

Browse files
Files changed (1) hide show
  1. scrapeAPI2.py +50 -39
scrapeAPI2.py CHANGED
@@ -1,82 +1,93 @@
1
  from fastapi import FastAPI, HTTPException, Query
2
  from playwright.async_api import async_playwright
3
  from urllib.parse import urlparse
4
- from typing import List, Set, Dict
5
  import re
6
 
7
  app = FastAPI()
8
 
9
- contact_info_pattern = {
10
- "email": re.compile(r'[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+'),
11
- "phone": re.compile(r'(\+\d{1,3}[- ]?)?\(?\d{2,4}\)?[-.\s]?\d{3}[-.\s]?\d{4,6}'),
12
- "social": re.compile(r'https?://(www\.)?(twitter|linkedin|facebook|instagram)\.com/[^\s"\'<>]+')
13
- }
 
 
 
 
14
 
15
  async def extract_internal_links(page, base_url: str) -> List[str]:
16
  anchors = await page.eval_on_selector_all('a[href]', 'els => els.map(el => el.href)')
17
  domain = urlparse(base_url).netloc
18
- return list(set([
19
- link for link in anchors if urlparse(link).netloc == domain
20
- ]))
 
 
21
 
22
- def extract_contact_info(text: str) -> Dict[str, List[str]]:
23
- return {
24
- key: list(set(pattern.findall(text))) for key, pattern in contact_info_pattern.items()
25
- }
26
 
27
- async def scrape_page_for_contacts(page, url: str) -> Dict[str, List[str]]:
28
  try:
29
- await page.goto(url, timeout=60000)
30
- await page.wait_for_timeout(2000)
 
31
  content = await page.content()
32
- return extract_contact_info(content)
 
 
 
 
33
  except Exception as e:
34
- print(f"[!] Failed to scrape {url}: {e}")
35
- return {key: [] for key in contact_info_pattern.keys()}
 
36
 
37
- @app.get("/scrape-contact-info")
38
- async def scrape_contact_info(
39
- website: str = Query(..., description="Website URL to crawl for contact info"),
40
- max_depth: int = Query(1, description="Max link crawl depth (1 = homepage only)")
41
  ):
42
  try:
43
- visited = set()
44
- combined_info = {key: set() for key in contact_info_pattern.keys()}
45
 
46
  async with async_playwright() as p:
47
  browser = await p.chromium.launch(headless=True)
48
  context = await browser.new_context()
49
  page = await context.new_page()
 
50
  queue = [(website, 0)]
51
 
52
  while queue:
53
  current_url, depth = queue.pop(0)
54
- if current_url in visited or depth > max_depth:
55
  continue
56
- visited.add(current_url)
57
 
58
- print(f"[+] Visiting: {current_url}")
59
- info = await scrape_page_for_contacts(page, current_url)
60
- for key in combined_info:
61
- combined_info[key].update(info.get(key, []))
 
62
 
63
  if depth < max_depth:
64
  try:
65
- links = await extract_internal_links(page, website)
66
- for link in links:
67
- if link not in visited:
68
  queue.append((link, depth + 1))
69
  except Exception as e:
70
- print(f"[!] Failed to get links: {e}")
71
 
72
  await browser.close()
73
 
74
  return {
75
  "website": website,
76
- "pages_visited": len(visited),
77
- "emails": list(combined_info["email"]),
78
- "phone_numbers": list(combined_info["phone"]),
79
- "social_profiles": list(combined_info["social"])
80
  }
81
 
82
  except Exception as e:
 
1
  from fastapi import FastAPI, HTTPException, Query
2
  from playwright.async_api import async_playwright
3
  from urllib.parse import urlparse
4
+ from typing import List, Set
5
  import re
6
 
7
  app = FastAPI()
8
 
9
+ visited_links: Set[str] = set()
10
+
11
+ # Improved regex patterns
12
+ email_pattern = re.compile(r"[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+")
13
+ phone_pattern = re.compile(r"\+?\d[\d\s().-]{7,}\d")
14
+ social_pattern = re.compile(r"https?://(?:www\.)?(?:facebook|linkedin|twitter|instagram)\.com/[^\s\"'<>]+")
15
+
16
+ def extract_matches(pattern, text):
17
+ return list(set(filter(lambda x: x and x.strip(), pattern.findall(text))))
18
 
19
  async def extract_internal_links(page, base_url: str) -> List[str]:
20
  anchors = await page.eval_on_selector_all('a[href]', 'els => els.map(el => el.href)')
21
  domain = urlparse(base_url).netloc
22
+ internal_links = [
23
+ link for link in anchors
24
+ if urlparse(link).netloc == domain and link not in visited_links
25
+ ]
26
+ return list(set(internal_links))
27
 
28
+ async def scrape_contacts_from_page(page, url: str):
29
+ contacts = {"emails": [], "phones": [], "socials": []}
 
 
30
 
 
31
  try:
32
+ await page.goto(url, timeout=30000)
33
+ await page.wait_for_timeout(1500)
34
+
35
  content = await page.content()
36
+
37
+ contacts["emails"] = extract_matches(email_pattern, content)
38
+ contacts["phones"] = extract_matches(phone_pattern, content)
39
+ contacts["socials"] = extract_matches(social_pattern, content)
40
+
41
  except Exception as e:
42
+ print(f"[!] Failed at {url}: {e}")
43
+
44
+ return contacts
45
 
46
+ @app.get("/scrape-contacts")
47
+ async def scrape_contacts(
48
+ website: str = Query(..., description="Base website URL"),
49
+ max_depth: int = Query(1, description="How deep to crawl (recommended: 1 or 2)")
50
  ):
51
  try:
52
+ all_emails, all_phones, all_socials = set(), set(), set()
53
+ visited_links.clear()
54
 
55
  async with async_playwright() as p:
56
  browser = await p.chromium.launch(headless=True)
57
  context = await browser.new_context()
58
  page = await context.new_page()
59
+
60
  queue = [(website, 0)]
61
 
62
  while queue:
63
  current_url, depth = queue.pop(0)
64
+ if current_url in visited_links or depth > max_depth:
65
  continue
66
+ visited_links.add(current_url)
67
 
68
+ print(f"[+] Crawling: {current_url}")
69
+ data = await scrape_contacts_from_page(page, current_url)
70
+ all_emails.update(data["emails"])
71
+ all_phones.update(data["phones"])
72
+ all_socials.update(data["socials"])
73
 
74
  if depth < max_depth:
75
  try:
76
+ internal_links = await extract_internal_links(page, website)
77
+ for link in internal_links:
78
+ if any(x in link.lower() for x in ["contact", "about", "support"]):
79
  queue.append((link, depth + 1))
80
  except Exception as e:
81
+ print(f"[!] Link extraction failed at {current_url}: {e}")
82
 
83
  await browser.close()
84
 
85
  return {
86
  "website": website,
87
+ "pages_visited": len(visited_links),
88
+ "emails": list(all_emails),
89
+ "phone_numbers": list(all_phones),
90
+ "social_profiles": list(all_socials)
91
  }
92
 
93
  except Exception as e: