File size: 6,966 Bytes
43614ad a8c57d0 43614ad 36e0003 bcb8c1d 36e0003 bcb8c1d 358f05c 43614ad bcb8c1d 43614ad 358f05c bcb8c1d 43614ad bcb8c1d 43614ad 358f05c 43614ad 358f05c bcb8c1d 43614ad bcb8c1d 358f05c 43614ad bcb8c1d 43614ad bcb8c1d 43614ad bcb8c1d 43614ad bcb8c1d 43614ad 358f05c bcb8c1d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
from fastapi import FastAPI, HTTPException, Query
from pydantic import BaseModel
from playwright.async_api import async_playwright
import asyncio
import base64
import logging
from typing import List, Optional
from urllib.parse import urlparse, parse_qs
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(title="Query-Based Web Scraper", description="Scrape websites based on search queries")
# ... (Keep all your Pydantic models unchanged) ...
@app.get("/")
async def root():
return {
"message": "🚀 Query-Based Web Scraper API",
"tagline": "Search and scrape websites based on queries",
"endpoints": {
"/scrape": "Search Google for the query and scrape the top result",
"/docs": "API documentation"
},
"example": "/scrape?query=plumbers+near+me&lead_generation=true&screenshot=true",
"note": "Now accepts search queries instead of direct URLs"
}
async def get_top_search_result(query: str):
"""Perform Google search and return top result URL with CAPTCHA handling"""
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
async with async_playwright() as p:
# Use a proxy to avoid CAPTCHAs
proxy_server = "us.proxyrack.net:10000"
browser = await p.chromium.launch(
headless=True,
proxy={
"server": f"http://{proxy_server}",
"username": "your-proxy-username", # Replace with actual credentials
"password": "your-proxy-password"
},
args=[
'--no-sandbox',
'--disable-setuid-sandbox',
'--disable-dev-shm-usage',
'--disable-accelerated-2d-canvas',
'--no-first-run',
'--no-zygote',
'--disable-gpu'
]
)
context = await browser.new_context(
user_agent=user_agent,
locale='en-US',
viewport={'width': 1920, 'height': 1080},
# Bypass automation detection
java_script_enabled=True,
bypass_csp=True
)
page = await context.new_page()
try:
logger.info(f"Searching Google for: {query}")
await page.goto("https://www.google.com", timeout=60000)
# Handle consent form if it appears
try:
consent_button = await page.wait_for_selector('button:has-text("Accept all"), button:has-text("I agree")', timeout=5000)
if consent_button:
await consent_button.click()
logger.info("Accepted Google consent form")
await asyncio.sleep(1) # Small delay for consent to apply
except:
pass # Consent form didn't appear
# Perform search
search_box = await page.wait_for_selector('textarea[name="q"]', timeout=10000)
await search_box.fill(query)
await page.keyboard.press("Enter")
# Wait for search results - use more reliable method
try:
# Check if CAPTCHA appeared
captcha = await page.query_selector('form#captcha-form, div#recaptcha')
if captcha:
logger.error("CAPTCHA encountered during search")
raise Exception("Google CAPTCHA encountered. Cannot proceed with search.")
# Wait for search results to appear
await page.wait_for_selector('.g, .tF2Cxc', timeout=30000)
except:
# Try alternative search result container
try:
await page.wait_for_selector('#search', timeout=10000)
except:
logger.error("Search results not found")
raise Exception("Search results not found")
# Extract top results
results = await page.query_selector_all('.g, .tF2Cxc')
if not results:
results = await page.query_selector_all('div[data-snf]')
if not results:
raise Exception("No search results found")
urls = []
for result in results[:3]: # Check top 3 results
try:
link = await result.query_selector('a')
if not link:
continue
# Extract both data-href and href attributes
data_href = await link.get_attribute('data-href')
href = await link.get_attribute('href')
target_url = data_href or href
if target_url and target_url.startswith('/url?q='):
target_url = f"https://www.google.com{target_url}"
if target_url and target_url.startswith('https://www.google.com/url?'):
parsed = urlparse(target_url)
qs = parse_qs(parsed.query)
target_url = qs.get('q', [target_url])[0]
if target_url and target_url.startswith('http'):
urls.append(target_url)
logger.info(f"Found search result: {target_url}")
except Exception as e:
logger.warning(f"Error processing result: {str(e)}")
if not urls:
raise Exception("No valid URLs found in search results")
await browser.close()
return urls[0] # Return top result
except Exception as e:
logger.error(f"Search failed: {str(e)}")
await page.screenshot(path="search_error.png")
await browser.close()
raise
@app.get("/scrape")
async def scrape_page(
query: str = Query(..., description="Search query to find a website"),
lead_generation: bool = Query(True, description="Extract lead generation data"),
screenshot: bool = Query(True, description="Take a full page screenshot"),
get_links: bool = Query(True, description="Extract all links from the page"),
get_body: bool = Query(False, description="Extract body tag content")
):
logger.info(f"Starting scrape for query: {query}")
try:
# Get top search result URL
target_url = await get_top_search_result(query)
logger.info(f"Scraping top result: {target_url}")
except Exception as e:
logger.error(f"Search error: {str(e)}")
raise HTTPException(status_code=500, detail=f"Search failed: {str(e)}")
# ... (keep the rest of the scraping function unchanged) ... |