web-scraper-new / app.py
service-internal's picture
Upload 3 files
a8febb3 verified
from fastapi import FastAPI
from pydantic import BaseModel
from playwright.async_api import async_playwright
from bs4 import BeautifulSoup
import logging
import re
app = FastAPI()
logging.basicConfig(level=logging.INFO)
class RedirectRequest(BaseModel):
url: str
@app.post("/resolve")
async def resolve_redirect(data: RedirectRequest):
try:
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context()
page = await context.new_page()
# Step 1: Start navigation to the RSS link
await page.goto(data.url, wait_until="domcontentloaded", timeout=15000)
# Step 2: Wait for navigation to a non-Google domain
try:
await page.wait_for_url(re.compile(r"^(?!.*news\.google\.com).*"), timeout=10000)
except:
pass # fallback if no hard redirect happened
final_url = page.url
await browser.close()
return {"final_url": final_url}
except Exception as e:
logging.error("Redirect resolution failed", exc_info=True)
return {"error": str(e)}
class ScrapeRequest(BaseModel):
url: str
@app.post("/scrape")
async def scrape_page(data: ScrapeRequest):
try:
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context()
page = await context.new_page()
await page.goto(data.url, wait_until="domcontentloaded", timeout=40000)
# Extract visible text using JS walker for generalized coverage
text = await page.evaluate("""
() => {
const walker = document.createTreeWalker(document.body, NodeFilter.SHOW_TEXT, {
acceptNode: node => {
const style = window.getComputedStyle(node.parentElement || {});
return style && style.display !== 'none' && style.visibility !== 'hidden' ? NodeFilter.FILTER_ACCEPT : NodeFilter.FILTER_REJECT;
}
});
let text = '';
while (walker.nextNode()) {
text += walker.currentNode.textContent + '\\n';
}
return text.trim();
}
""")
# Get links
links = await page.eval_on_selector_all(
"a[href]",
"""els => els.map(el => ({
text: el.innerText.trim(),
href: el.href
}))"""
)
await browser.close()
return {
"final_url": page.url,
"text": text if text else "No visible content found.",
"links": links
}
except Exception as e:
logging.error("Scraping failed", exc_info=True)
return {"error": str(e)}