File size: 3,455 Bytes
f66080c 9a5d887 f66080c 9a5d887 f66080c 3c655d3 9a5d887 f66080c 9a5d887 f66080c 9a5d887 3c655d3 9a5d887 f66080c 9a5d887 f66080c 9a5d887 f66080c 9a5d887 f66080c 9a5d887 f66080c 9a5d887 f66080c 9a5d887 f66080c 9a5d887 f66080c 3c655d3 9a5d887 3c655d3 9a5d887 f66080c 9a5d887 f66080c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
from fastapi import FastAPI, HTTPException, Query
from playwright.async_api import async_playwright
from urllib.parse import urlparse
from typing import List, Set
import re
app = FastAPI()
visited_links: Set[str] = set()
# Improved regex patterns
email_pattern = re.compile(r"[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+")
phone_pattern = re.compile(r"\+?\d[\d\s().-]{7,}\d")
social_pattern = re.compile(r"https?://(?:www\.)?(?:facebook|linkedin|twitter|instagram)\.com/[^\s\"'<>]+")
def extract_matches(pattern, text):
return list(set(filter(lambda x: x and x.strip(), pattern.findall(text))))
async def extract_internal_links(page, base_url: str) -> List[str]:
anchors = await page.eval_on_selector_all('a[href]', 'els => els.map(el => el.href)')
domain = urlparse(base_url).netloc
internal_links = [
link for link in anchors
if urlparse(link).netloc == domain and link not in visited_links
]
return list(set(internal_links))
async def scrape_contacts_from_page(page, url: str):
contacts = {"emails": [], "phones": [], "socials": []}
try:
await page.goto(url, timeout=30000)
await page.wait_for_timeout(1500)
content = await page.content()
contacts["emails"] = extract_matches(email_pattern, content)
contacts["phones"] = extract_matches(phone_pattern, content)
contacts["socials"] = extract_matches(social_pattern, content)
except Exception as e:
print(f"[!] Failed at {url}: {e}")
return contacts
@app.get("/scrape-contacts")
async def scrape_contacts(
website: str = Query(..., description="Base website URL"),
max_depth: int = Query(1, description="How deep to crawl (recommended: 1 or 2)")
):
try:
all_emails, all_phones, all_socials = set(), set(), set()
visited_links.clear()
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context()
page = await context.new_page()
queue = [(website, 0)]
while queue:
current_url, depth = queue.pop(0)
if current_url in visited_links or depth > max_depth:
continue
visited_links.add(current_url)
print(f"[+] Crawling: {current_url}")
data = await scrape_contacts_from_page(page, current_url)
all_emails.update(data["emails"])
all_phones.update(data["phones"])
all_socials.update(data["socials"])
if depth < max_depth:
try:
internal_links = await extract_internal_links(page, website)
for link in internal_links:
if any(x in link.lower() for x in ["contact", "about", "support"]):
queue.append((link, depth + 1))
except Exception as e:
print(f"[!] Link extraction failed at {current_url}: {e}")
await browser.close()
return {
"website": website,
"pages_visited": len(visited_links),
"emails": list(all_emails),
"phone_numbers": list(all_phones),
"social_profiles": list(all_socials)
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Scraping failed: {str(e)}")
|