File size: 6,966 Bytes
43614ad
 
 
 
 
 
 
 
a8c57d0
43614ad
 
 
36e0003
bcb8c1d
36e0003
bcb8c1d
358f05c
43614ad
 
 
 
 
 
 
 
 
 
 
 
 
 
bcb8c1d
43614ad
358f05c
bcb8c1d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
43614ad
 
 
bcb8c1d
 
 
 
43614ad
 
 
358f05c
43614ad
 
 
 
358f05c
bcb8c1d
43614ad
 
 
bcb8c1d
358f05c
43614ad
 
 
bcb8c1d
 
43614ad
bcb8c1d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
43614ad
 
bcb8c1d
 
 
 
43614ad
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bcb8c1d
43614ad
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
358f05c
bcb8c1d
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
from fastapi import FastAPI, HTTPException, Query
from pydantic import BaseModel
from playwright.async_api import async_playwright
import asyncio
import base64
import logging
from typing import List, Optional
from urllib.parse import urlparse, parse_qs

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = FastAPI(title="Query-Based Web Scraper", description="Scrape websites based on search queries")

# ... (Keep all your Pydantic models unchanged) ...

@app.get("/")
async def root():
    return {
        "message": "🚀 Query-Based Web Scraper API",
        "tagline": "Search and scrape websites based on queries",
        "endpoints": {
            "/scrape": "Search Google for the query and scrape the top result",
            "/docs": "API documentation"
        },
        "example": "/scrape?query=plumbers+near+me&lead_generation=true&screenshot=true",
        "note": "Now accepts search queries instead of direct URLs"
    }

async def get_top_search_result(query: str):
    """Perform Google search and return top result URL with CAPTCHA handling"""
    user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
    async with async_playwright() as p:
        # Use a proxy to avoid CAPTCHAs
        proxy_server = "us.proxyrack.net:10000"
        browser = await p.chromium.launch(
            headless=True,
            proxy={
                "server": f"http://{proxy_server}",
                "username": "your-proxy-username",  # Replace with actual credentials
                "password": "your-proxy-password"
            },
            args=[
                '--no-sandbox',
                '--disable-setuid-sandbox',
                '--disable-dev-shm-usage',
                '--disable-accelerated-2d-canvas',
                '--no-first-run',
                '--no-zygote',
                '--disable-gpu'
            ]
        )
        context = await browser.new_context(
            user_agent=user_agent,
            locale='en-US',
            viewport={'width': 1920, 'height': 1080},
            # Bypass automation detection
            java_script_enabled=True,
            bypass_csp=True
        )
        page = await context.new_page()
        
        try:
            logger.info(f"Searching Google for: {query}")
            await page.goto("https://www.google.com", timeout=60000)
            
            # Handle consent form if it appears
            try:
                consent_button = await page.wait_for_selector('button:has-text("Accept all"), button:has-text("I agree")', timeout=5000)
                if consent_button:
                    await consent_button.click()
                    logger.info("Accepted Google consent form")
                    await asyncio.sleep(1)  # Small delay for consent to apply
            except:
                pass  # Consent form didn't appear
            
            # Perform search
            search_box = await page.wait_for_selector('textarea[name="q"]', timeout=10000)
            await search_box.fill(query)
            await page.keyboard.press("Enter")
            
            # Wait for search results - use more reliable method
            try:
                # Check if CAPTCHA appeared
                captcha = await page.query_selector('form#captcha-form, div#recaptcha')
                if captcha:
                    logger.error("CAPTCHA encountered during search")
                    raise Exception("Google CAPTCHA encountered. Cannot proceed with search.")
                
                # Wait for search results to appear
                await page.wait_for_selector('.g, .tF2Cxc', timeout=30000)
            except:
                # Try alternative search result container
                try:
                    await page.wait_for_selector('#search', timeout=10000)
                except:
                    logger.error("Search results not found")
                    raise Exception("Search results not found")
            
            # Extract top results
            results = await page.query_selector_all('.g, .tF2Cxc')
            if not results:
                results = await page.query_selector_all('div[data-snf]')
            
            if not results:
                raise Exception("No search results found")
            
            urls = []
            for result in results[:3]:  # Check top 3 results
                try:
                    link = await result.query_selector('a')
                    if not link:
                        continue
                    
                    # Extract both data-href and href attributes
                    data_href = await link.get_attribute('data-href')
                    href = await link.get_attribute('href')
                    target_url = data_href or href
                    
                    if target_url and target_url.startswith('/url?q='):
                        target_url = f"https://www.google.com{target_url}"
                    
                    if target_url and target_url.startswith('https://www.google.com/url?'):
                        parsed = urlparse(target_url)
                        qs = parse_qs(parsed.query)
                        target_url = qs.get('q', [target_url])[0]
                    
                    if target_url and target_url.startswith('http'):
                        urls.append(target_url)
                        logger.info(f"Found search result: {target_url}")
                except Exception as e:
                    logger.warning(f"Error processing result: {str(e)}")
            
            if not urls:
                raise Exception("No valid URLs found in search results")
            
            await browser.close()
            return urls[0]  # Return top result
        
        except Exception as e:
            logger.error(f"Search failed: {str(e)}")
            await page.screenshot(path="search_error.png")
            await browser.close()
            raise

@app.get("/scrape")
async def scrape_page(
    query: str = Query(..., description="Search query to find a website"),
    lead_generation: bool = Query(True, description="Extract lead generation data"),
    screenshot: bool = Query(True, description="Take a full page screenshot"),
    get_links: bool = Query(True, description="Extract all links from the page"),
    get_body: bool = Query(False, description="Extract body tag content")
):
    logger.info(f"Starting scrape for query: {query}")
    
    try:
        # Get top search result URL
        target_url = await get_top_search_result(query)
        logger.info(f"Scraping top result: {target_url}")
    except Exception as e:
        logger.error(f"Search error: {str(e)}")
        raise HTTPException(status_code=500, detail=f"Search failed: {str(e)}")

    # ... (keep the rest of the scraping function unchanged) ...