apexherbert200's picture
Changed timeouts
bcb8c1d
raw
history blame
6.97 kB
from fastapi import FastAPI, HTTPException, Query
from pydantic import BaseModel
from playwright.async_api import async_playwright
import asyncio
import base64
import logging
from typing import List, Optional
from urllib.parse import urlparse, parse_qs
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(title="Query-Based Web Scraper", description="Scrape websites based on search queries")
# ... (Keep all your Pydantic models unchanged) ...
@app.get("/")
async def root():
return {
"message": "🚀 Query-Based Web Scraper API",
"tagline": "Search and scrape websites based on queries",
"endpoints": {
"/scrape": "Search Google for the query and scrape the top result",
"/docs": "API documentation"
},
"example": "/scrape?query=plumbers+near+me&lead_generation=true&screenshot=true",
"note": "Now accepts search queries instead of direct URLs"
}
async def get_top_search_result(query: str):
"""Perform Google search and return top result URL with CAPTCHA handling"""
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
async with async_playwright() as p:
# Use a proxy to avoid CAPTCHAs
proxy_server = "us.proxyrack.net:10000"
browser = await p.chromium.launch(
headless=True,
proxy={
"server": f"http://{proxy_server}",
"username": "your-proxy-username", # Replace with actual credentials
"password": "your-proxy-password"
},
args=[
'--no-sandbox',
'--disable-setuid-sandbox',
'--disable-dev-shm-usage',
'--disable-accelerated-2d-canvas',
'--no-first-run',
'--no-zygote',
'--disable-gpu'
]
)
context = await browser.new_context(
user_agent=user_agent,
locale='en-US',
viewport={'width': 1920, 'height': 1080},
# Bypass automation detection
java_script_enabled=True,
bypass_csp=True
)
page = await context.new_page()
try:
logger.info(f"Searching Google for: {query}")
await page.goto("https://www.google.com", timeout=60000)
# Handle consent form if it appears
try:
consent_button = await page.wait_for_selector('button:has-text("Accept all"), button:has-text("I agree")', timeout=5000)
if consent_button:
await consent_button.click()
logger.info("Accepted Google consent form")
await asyncio.sleep(1) # Small delay for consent to apply
except:
pass # Consent form didn't appear
# Perform search
search_box = await page.wait_for_selector('textarea[name="q"]', timeout=10000)
await search_box.fill(query)
await page.keyboard.press("Enter")
# Wait for search results - use more reliable method
try:
# Check if CAPTCHA appeared
captcha = await page.query_selector('form#captcha-form, div#recaptcha')
if captcha:
logger.error("CAPTCHA encountered during search")
raise Exception("Google CAPTCHA encountered. Cannot proceed with search.")
# Wait for search results to appear
await page.wait_for_selector('.g, .tF2Cxc', timeout=30000)
except:
# Try alternative search result container
try:
await page.wait_for_selector('#search', timeout=10000)
except:
logger.error("Search results not found")
raise Exception("Search results not found")
# Extract top results
results = await page.query_selector_all('.g, .tF2Cxc')
if not results:
results = await page.query_selector_all('div[data-snf]')
if not results:
raise Exception("No search results found")
urls = []
for result in results[:3]: # Check top 3 results
try:
link = await result.query_selector('a')
if not link:
continue
# Extract both data-href and href attributes
data_href = await link.get_attribute('data-href')
href = await link.get_attribute('href')
target_url = data_href or href
if target_url and target_url.startswith('/url?q='):
target_url = f"https://www.google.com{target_url}"
if target_url and target_url.startswith('https://www.google.com/url?'):
parsed = urlparse(target_url)
qs = parse_qs(parsed.query)
target_url = qs.get('q', [target_url])[0]
if target_url and target_url.startswith('http'):
urls.append(target_url)
logger.info(f"Found search result: {target_url}")
except Exception as e:
logger.warning(f"Error processing result: {str(e)}")
if not urls:
raise Exception("No valid URLs found in search results")
await browser.close()
return urls[0] # Return top result
except Exception as e:
logger.error(f"Search failed: {str(e)}")
await page.screenshot(path="search_error.png")
await browser.close()
raise
@app.get("/scrape")
async def scrape_page(
query: str = Query(..., description="Search query to find a website"),
lead_generation: bool = Query(True, description="Extract lead generation data"),
screenshot: bool = Query(True, description="Take a full page screenshot"),
get_links: bool = Query(True, description="Extract all links from the page"),
get_body: bool = Query(False, description="Extract body tag content")
):
logger.info(f"Starting scrape for query: {query}")
try:
# Get top search result URL
target_url = await get_top_search_result(query)
logger.info(f"Scraping top result: {target_url}")
except Exception as e:
logger.error(f"Search error: {str(e)}")
raise HTTPException(status_code=500, detail=f"Search failed: {str(e)}")
# ... (keep the rest of the scraping function unchanged) ...