|
from fastapi import FastAPI, HTTPException, Query |
|
from pydantic import BaseModel |
|
from playwright.async_api import async_playwright |
|
import asyncio |
|
import base64 |
|
import logging |
|
from typing import List, Optional |
|
from urllib.parse import urlparse, parse_qs |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
app = FastAPI(title="Query-Based Web Scraper", description="Scrape websites based on search queries") |
|
|
|
|
|
|
|
@app.get("/") |
|
async def root(): |
|
return { |
|
"message": "🚀 Query-Based Web Scraper API", |
|
"tagline": "Search and scrape websites based on queries", |
|
"endpoints": { |
|
"/scrape": "Search Google for the query and scrape the top result", |
|
"/docs": "API documentation" |
|
}, |
|
"example": "/scrape?query=plumbers+near+me&lead_generation=true&screenshot=true", |
|
"note": "Now accepts search queries instead of direct URLs" |
|
} |
|
|
|
async def get_top_search_result(query: str): |
|
"""Perform Google search and return top result URL with CAPTCHA handling""" |
|
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" |
|
async with async_playwright() as p: |
|
|
|
proxy_server = "us.proxyrack.net:10000" |
|
browser = await p.chromium.launch( |
|
headless=True, |
|
proxy={ |
|
"server": f"http://{proxy_server}", |
|
"username": "your-proxy-username", |
|
"password": "your-proxy-password" |
|
}, |
|
args=[ |
|
'--no-sandbox', |
|
'--disable-setuid-sandbox', |
|
'--disable-dev-shm-usage', |
|
'--disable-accelerated-2d-canvas', |
|
'--no-first-run', |
|
'--no-zygote', |
|
'--disable-gpu' |
|
] |
|
) |
|
context = await browser.new_context( |
|
user_agent=user_agent, |
|
locale='en-US', |
|
viewport={'width': 1920, 'height': 1080}, |
|
|
|
java_script_enabled=True, |
|
bypass_csp=True |
|
) |
|
page = await context.new_page() |
|
|
|
try: |
|
logger.info(f"Searching Google for: {query}") |
|
await page.goto("https://www.google.com", timeout=60000) |
|
|
|
|
|
try: |
|
consent_button = await page.wait_for_selector('button:has-text("Accept all"), button:has-text("I agree")', timeout=5000) |
|
if consent_button: |
|
await consent_button.click() |
|
logger.info("Accepted Google consent form") |
|
await asyncio.sleep(1) |
|
except: |
|
pass |
|
|
|
|
|
search_box = await page.wait_for_selector('textarea[name="q"]', timeout=10000) |
|
await search_box.fill(query) |
|
await page.keyboard.press("Enter") |
|
|
|
|
|
try: |
|
|
|
captcha = await page.query_selector('form#captcha-form, div#recaptcha') |
|
if captcha: |
|
logger.error("CAPTCHA encountered during search") |
|
raise Exception("Google CAPTCHA encountered. Cannot proceed with search.") |
|
|
|
|
|
await page.wait_for_selector('.g, .tF2Cxc', timeout=30000) |
|
except: |
|
|
|
try: |
|
await page.wait_for_selector('#search', timeout=10000) |
|
except: |
|
logger.error("Search results not found") |
|
raise Exception("Search results not found") |
|
|
|
|
|
results = await page.query_selector_all('.g, .tF2Cxc') |
|
if not results: |
|
results = await page.query_selector_all('div[data-snf]') |
|
|
|
if not results: |
|
raise Exception("No search results found") |
|
|
|
urls = [] |
|
for result in results[:3]: |
|
try: |
|
link = await result.query_selector('a') |
|
if not link: |
|
continue |
|
|
|
|
|
data_href = await link.get_attribute('data-href') |
|
href = await link.get_attribute('href') |
|
target_url = data_href or href |
|
|
|
if target_url and target_url.startswith('/url?q='): |
|
target_url = f"https://www.google.com{target_url}" |
|
|
|
if target_url and target_url.startswith('https://www.google.com/url?'): |
|
parsed = urlparse(target_url) |
|
qs = parse_qs(parsed.query) |
|
target_url = qs.get('q', [target_url])[0] |
|
|
|
if target_url and target_url.startswith('http'): |
|
urls.append(target_url) |
|
logger.info(f"Found search result: {target_url}") |
|
except Exception as e: |
|
logger.warning(f"Error processing result: {str(e)}") |
|
|
|
if not urls: |
|
raise Exception("No valid URLs found in search results") |
|
|
|
await browser.close() |
|
return urls[0] |
|
|
|
except Exception as e: |
|
logger.error(f"Search failed: {str(e)}") |
|
await page.screenshot(path="search_error.png") |
|
await browser.close() |
|
raise |
|
|
|
@app.get("/scrape") |
|
async def scrape_page( |
|
query: str = Query(..., description="Search query to find a website"), |
|
lead_generation: bool = Query(True, description="Extract lead generation data"), |
|
screenshot: bool = Query(True, description="Take a full page screenshot"), |
|
get_links: bool = Query(True, description="Extract all links from the page"), |
|
get_body: bool = Query(False, description="Extract body tag content") |
|
): |
|
logger.info(f"Starting scrape for query: {query}") |
|
|
|
try: |
|
|
|
target_url = await get_top_search_result(query) |
|
logger.info(f"Scraping top result: {target_url}") |
|
except Exception as e: |
|
logger.error(f"Search error: {str(e)}") |
|
raise HTTPException(status_code=500, detail=f"Search failed: {str(e)}") |
|
|
|
|