File size: 3,455 Bytes
f66080c
 
 
9a5d887
f66080c
 
 
 
9a5d887
 
 
 
 
 
 
 
 
f66080c
 
 
3c655d3
9a5d887
 
 
 
 
f66080c
9a5d887
 
f66080c
 
9a5d887
 
 
3c655d3
9a5d887
 
 
 
 
f66080c
9a5d887
 
 
f66080c
9a5d887
 
 
 
f66080c
 
9a5d887
 
f66080c
 
 
 
 
9a5d887
f66080c
 
 
 
9a5d887
f66080c
9a5d887
f66080c
9a5d887
 
 
 
 
f66080c
 
3c655d3
9a5d887
 
 
3c655d3
 
9a5d887
f66080c
 
 
 
 
9a5d887
 
 
 
f66080c
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
from fastapi import FastAPI, HTTPException, Query
from playwright.async_api import async_playwright
from urllib.parse import urlparse
from typing import List, Set
import re

app = FastAPI()

visited_links: Set[str] = set()

# Improved regex patterns
email_pattern = re.compile(r"[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+")
phone_pattern = re.compile(r"\+?\d[\d\s().-]{7,}\d")
social_pattern = re.compile(r"https?://(?:www\.)?(?:facebook|linkedin|twitter|instagram)\.com/[^\s\"'<>]+")

def extract_matches(pattern, text):
    return list(set(filter(lambda x: x and x.strip(), pattern.findall(text))))

async def extract_internal_links(page, base_url: str) -> List[str]:
    anchors = await page.eval_on_selector_all('a[href]', 'els => els.map(el => el.href)')
    domain = urlparse(base_url).netloc
    internal_links = [
        link for link in anchors
        if urlparse(link).netloc == domain and link not in visited_links
    ]
    return list(set(internal_links))

async def scrape_contacts_from_page(page, url: str):
    contacts = {"emails": [], "phones": [], "socials": []}

    try:
        await page.goto(url, timeout=30000)
        await page.wait_for_timeout(1500)

        content = await page.content()

        contacts["emails"] = extract_matches(email_pattern, content)
        contacts["phones"] = extract_matches(phone_pattern, content)
        contacts["socials"] = extract_matches(social_pattern, content)

    except Exception as e:
        print(f"[!] Failed at {url}: {e}")

    return contacts

@app.get("/scrape-contacts")
async def scrape_contacts(
    website: str = Query(..., description="Base website URL"),
    max_depth: int = Query(1, description="How deep to crawl (recommended: 1 or 2)")
):
    try:
        all_emails, all_phones, all_socials = set(), set(), set()
        visited_links.clear()

        async with async_playwright() as p:
            browser = await p.chromium.launch(headless=True)
            context = await browser.new_context()
            page = await context.new_page()

            queue = [(website, 0)]

            while queue:
                current_url, depth = queue.pop(0)
                if current_url in visited_links or depth > max_depth:
                    continue
                visited_links.add(current_url)

                print(f"[+] Crawling: {current_url}")
                data = await scrape_contacts_from_page(page, current_url)
                all_emails.update(data["emails"])
                all_phones.update(data["phones"])
                all_socials.update(data["socials"])

                if depth < max_depth:
                    try:
                        internal_links = await extract_internal_links(page, website)
                        for link in internal_links:
                            if any(x in link.lower() for x in ["contact", "about", "support"]):
                                queue.append((link, depth + 1))
                    except Exception as e:
                        print(f"[!] Link extraction failed at {current_url}: {e}")

            await browser.close()

        return {
            "website": website,
            "pages_visited": len(visited_links),
            "emails": list(all_emails),
            "phone_numbers": list(all_phones),
            "social_profiles": list(all_socials)
        }

    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Scraping failed: {str(e)}")