from fastapi import FastAPI, HTTPException, Query from pydantic import BaseModel from playwright.async_api import async_playwright import asyncio import base64 import logging from typing import List, Optional from urllib.parse import urlparse, parse_qs # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = FastAPI(title="Query-Based Web Scraper", description="Scrape websites based on search queries") # ... (Keep all your Pydantic models unchanged) ... @app.get("/") async def root(): return { "message": "🚀 Query-Based Web Scraper API", "tagline": "Search and scrape websites based on queries", "endpoints": { "/scrape": "Search Google for the query and scrape the top result", "/docs": "API documentation" }, "example": "/scrape?query=plumbers+near+me&lead_generation=true&screenshot=true", "note": "Now accepts search queries instead of direct URLs" } async def get_top_search_result(query: str): """Perform Google search and return top result URL with CAPTCHA handling""" user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" async with async_playwright() as p: # Use a proxy to avoid CAPTCHAs proxy_server = "us.proxyrack.net:10000" browser = await p.chromium.launch( headless=True, proxy={ "server": f"http://{proxy_server}", "username": "your-proxy-username", # Replace with actual credentials "password": "your-proxy-password" }, args=[ '--no-sandbox', '--disable-setuid-sandbox', '--disable-dev-shm-usage', '--disable-accelerated-2d-canvas', '--no-first-run', '--no-zygote', '--disable-gpu' ] ) context = await browser.new_context( user_agent=user_agent, locale='en-US', viewport={'width': 1920, 'height': 1080}, # Bypass automation detection java_script_enabled=True, bypass_csp=True ) page = await context.new_page() try: logger.info(f"Searching Google for: {query}") await page.goto("https://www.google.com", timeout=60000) # Handle consent form if it appears try: consent_button = await page.wait_for_selector('button:has-text("Accept all"), button:has-text("I agree")', timeout=5000) if consent_button: await consent_button.click() logger.info("Accepted Google consent form") await asyncio.sleep(1) # Small delay for consent to apply except: pass # Consent form didn't appear # Perform search search_box = await page.wait_for_selector('textarea[name="q"]', timeout=10000) await search_box.fill(query) await page.keyboard.press("Enter") # Wait for search results - use more reliable method try: # Check if CAPTCHA appeared captcha = await page.query_selector('form#captcha-form, div#recaptcha') if captcha: logger.error("CAPTCHA encountered during search") raise Exception("Google CAPTCHA encountered. Cannot proceed with search.") # Wait for search results to appear await page.wait_for_selector('.g, .tF2Cxc', timeout=30000) except: # Try alternative search result container try: await page.wait_for_selector('#search', timeout=10000) except: logger.error("Search results not found") raise Exception("Search results not found") # Extract top results results = await page.query_selector_all('.g, .tF2Cxc') if not results: results = await page.query_selector_all('div[data-snf]') if not results: raise Exception("No search results found") urls = [] for result in results[:3]: # Check top 3 results try: link = await result.query_selector('a') if not link: continue # Extract both data-href and href attributes data_href = await link.get_attribute('data-href') href = await link.get_attribute('href') target_url = data_href or href if target_url and target_url.startswith('/url?q='): target_url = f"https://www.google.com{target_url}" if target_url and target_url.startswith('https://www.google.com/url?'): parsed = urlparse(target_url) qs = parse_qs(parsed.query) target_url = qs.get('q', [target_url])[0] if target_url and target_url.startswith('http'): urls.append(target_url) logger.info(f"Found search result: {target_url}") except Exception as e: logger.warning(f"Error processing result: {str(e)}") if not urls: raise Exception("No valid URLs found in search results") await browser.close() return urls[0] # Return top result except Exception as e: logger.error(f"Search failed: {str(e)}") await page.screenshot(path="search_error.png") await browser.close() raise @app.get("/scrape") async def scrape_page( query: str = Query(..., description="Search query to find a website"), lead_generation: bool = Query(True, description="Extract lead generation data"), screenshot: bool = Query(True, description="Take a full page screenshot"), get_links: bool = Query(True, description="Extract all links from the page"), get_body: bool = Query(False, description="Extract body tag content") ): logger.info(f"Starting scrape for query: {query}") try: # Get top search result URL target_url = await get_top_search_result(query) logger.info(f"Scraping top result: {target_url}") except Exception as e: logger.error(f"Search error: {str(e)}") raise HTTPException(status_code=500, detail=f"Search failed: {str(e)}") # ... (keep the rest of the scraping function unchanged) ...