Final_Assignment_Template / web_utilities.py
phucdev's picture
Move utility code into separate modules and add MarkdownWebBaseLoader implementation
1284099
import asyncio
import logging
import re
import requests
from bs4 import BeautifulSoup
from langchain.chains import RetrievalQA
from langchain_community.document_loaders import WebBaseLoader
from langchain_core.documents import Document
from markdownify import markdownify as md
from playwright.async_api import async_playwright
from typing import Any, AsyncIterator, Dict, List, Iterator, Optional, Sequence, Union
logger = logging.getLogger(__name__)
UNWANTED_SECTIONS = {
"references",
"external links",
"further reading",
"see also",
"notes",
}
def build_metadata(soup: Any, url: str) -> dict:
"""Build metadata from BeautifulSoup output."""
metadata = {"source": url}
if title := soup.find("title"):
metadata["title"] = title.get_text()
if description := soup.find("meta", attrs={"name": "description"}):
metadata["description"] = description.get("content", "No description found.")
if html := soup.find("html"):
metadata["language"] = html.get("lang", "No language found.")
return metadata
class MarkdownWebBaseLoader(WebBaseLoader):
"""
A WebBaseLoader subclass that uses Playwright to render JS, then
strips boilerplate and converts structured pieces to Markdown.
"""
def __init__(
self,
web_path: Union[str, Sequence[str]] = "",
header_template: Optional[dict] = None,
verify_ssl: bool = True,
proxies: Optional[dict] = None,
continue_on_failure: bool = False,
autoset_encoding: bool = True,
encoding: Optional[str] = None,
web_paths: Sequence[str] = (),
requests_per_second: int = 2,
default_parser: str = "html.parser",
requests_kwargs: Optional[Dict[str, Any]] = None,
raise_for_status: bool = False,
bs_get_text_kwargs: Optional[Dict[str, Any]] = None,
bs_kwargs: Optional[Dict[str, Any]] = None,
session: Any = None,
markdown_kwargs: Optional[Dict[str, Any]] = None,
unwanted_css: Optional[List[str]] = None,
unwanted_headings: Optional[List[str]] = None,
render_wait: float = 1.0,
*,
show_progress: bool = True,
trust_env: bool = False,
) -> None:
"""Initialize loader.
Args:
markdown_kwargs: Optional[Dict[str, Any]]: Arguments for markdownify.
unwanted_css: Optional[List[str]]: CSS selectors to remove from the page.
unwanted_headings: Optional[List[str]]: Headings to remove from the page.
render_wait: float: Time to wait for JS rendering (default: 2.0 seconds).
"""
super().__init__(
web_path=web_path,
header_template=header_template,
verify_ssl=verify_ssl,
proxies=proxies,
continue_on_failure=continue_on_failure,
autoset_encoding=autoset_encoding,
encoding=encoding,
web_paths=web_paths,
requests_per_second=requests_per_second,
default_parser=default_parser,
requests_kwargs=requests_kwargs,
raise_for_status=raise_for_status,
bs_get_text_kwargs=bs_get_text_kwargs,
bs_kwargs=bs_kwargs,
session=session,
show_progress=show_progress,
trust_env=trust_env,
)
self.markdown_kwargs = markdown_kwargs or {
"heading_style": "ATX",
"bullets": "*+-",
"strip": ["a", "span"],
"table_infer_header": True
}
self.unwanted_css = unwanted_css or [
".toc", ".navbox", ".sidebar", ".advertisement", ".cookie-banner", ".vertical-navbox",
".hatnote", ".reflist", ".mw-references-wrap"
]
self.unwanted_headings = [h.lower() for h in (unwanted_headings or UNWANTED_SECTIONS)]
self.render_wait = render_wait
@staticmethod
def _should_render(html: str, soup: Any) -> bool:
low_text = len(soup.get_text(strip=True)) < 100
has_noscript = bool(soup.find("noscript"))
cf_challenge = "just a moment" in html.lower() or "enable javascript" in html.lower()
many_scripts = len(soup.find_all("script")) > 20
return has_noscript or cf_challenge or low_text or many_scripts
async def _fetch_with_playwright(self, url: str) -> str:
async with async_playwright() as pw:
browser = await pw.chromium.launch(headless=True)
page = await browser.new_page()
# If you need cookies/auth, you can do:
# await page.set_extra_http_headers(self.session.headers)
await page.goto(url)
await asyncio.sleep(self.render_wait) # allow JS to finish
content = await page.content()
await browser.close()
return content
def _scrape(
self,
url: str,
parser: Union[str, None] = None,
bs_kwargs: Optional[dict] = None,
) -> Any:
if parser is None:
parser = "xml" if url.endswith(".xml") else self.default_parser
self._check_parser(parser)
resp = self.session.get(url, **self.requests_kwargs)
if self.raise_for_status:
resp.raise_for_status()
if self.encoding is not None:
resp.encoding = self.encoding
elif self.autoset_encoding:
resp.encoding = resp.apparent_encoding
html = resp.text
soup = BeautifulSoup(html, parser, **(bs_kwargs or {}))
# If the html looks JS-heavy, re-render with Playwright
if not url.endswith(".xml") and self._should_render(html, soup):
try:
rendered = asyncio.run(self._fetch_with_playwright(url))
soup = BeautifulSoup(rendered, parser, **(bs_kwargs or {}))
except Exception as e:
logger.warning("Playwright rendering failed for %s: %s. Falling back to requests.", url, e)
return soup
@staticmethod
def normalize_whitespace(text: str) -> str:
"""
Collapse runs of spaces, tabs, etc. down to single spaces—but skip
inside fenced code blocks ```…``` or inline code `…`.
"""
# Replace non-breaking and invisible spaces with regular spaces
text = text.replace("\u00A0", " ")
# Strip zero-width spaces:
text = re.sub(r"[\u200B\u200C\u200D\uFEFF]", "", text)
# Split out fenced code -> keep code blocks intact while normalizing other text
parts = re.split(r'(```.*?```)', text, flags=re.S)
for i, part in enumerate(parts):
if not part.startswith("```"):
# further split out inline code
subparts = re.split(r'(`[^`\n]+`)', part)
for j, sp in enumerate(subparts):
if not sp.startswith("`"):
# collapse whitespace, strip edges of each segment
subparts[j] = re.sub(r'[ \t\r\f\v]+', ' ', sp).strip()
parts[i] = "".join(subparts)
# Rejoin and ensure paragraphs are separated by a single blank line
normalized = "\n\n".join(p for p in parts if p.strip() != "")
return normalized
def _convert_soup_to_text(self, soup: Any) -> str:
# Strip scripts & styles
for tag in soup(["script", "style"]):
tag.decompose()
# Drop blocks whose first heading matches unwanted
for sec in soup.find_all(["section", "div", "aside"]):
h = sec.find(["h1", "h2", "h3", "h4", "h5", "h6"])
if h and any(h.get_text(strip=True).lower().startswith(u) for u in self.unwanted_headings):
sec.decompose()
# Drop by CSS selector
for sel in self.unwanted_css:
for el in soup.select(sel):
el.decompose()
# Isolate the main content container if present
soup = soup.find("div", class_="mw-parser-output") or soup.find("main") or soup.find("article") or soup
# Convert to Markdown text with markdownify
markdown = md(str(soup), **self.markdown_kwargs)
markdown = self.normalize_whitespace(markdown)
return markdown
def lazy_load(self) -> Iterator[Document]:
"""Lazy load text from the url(s) in web_path."""
for path in self.web_paths:
soup = self._scrape(path, bs_kwargs=self.bs_kwargs)
text = self._convert_soup_to_text(soup)
metadata = build_metadata(soup, path)
yield Document(page_content=text, metadata=metadata)
async def alazy_load(self) -> AsyncIterator[Document]:
"""Async lazy load text from the url(s) in web_path."""
results = await self.ascrape_all(self.web_paths)
for path, soup in zip(self.web_paths, results):
text = self._convert_soup_to_text(soup)
metadata = build_metadata(soup, path)
yield Document(page_content=text, metadata=metadata)
def fetch_wikipedia_page(page_key: str, lang: str = "en") -> Dict[str, str]:
"""Fetches a Wikipedia page by its key and returns its content in Markdown format.
Args:
page_key (str): The unique key of the Wikipedia page.
lang (str): The language code for the Wikipedia edition to fetch (default: "en").
"""
page_key = page_key.replace(" ", "_") # Ensure the page key is URL-safe
page_url = f"https://api.wikimedia.org/core/v1/wikipedia/{lang}/page/{page_key}/html"
visit_website_tool = MarkdownWebBaseLoader(page_url)
markdown = visit_website_tool.load()[0].page_content
return {
"page_key": page_key,
"markdown": markdown,
}
def get_wikipedia_article(query: str, lang: str = "en") -> Dict[str, str]:
"""Searches and fetches a Wikipedia article for a given query and returns its content in Markdown format.
Args:
query (str): The search query.
lang (str): The language code for the Wikipedia edition to search (default: "en").
"""
headers = {
'User-Agent': 'MyLLMAgent ([email protected])'
}
search_url = f"https://api.wikimedia.org/core/v1/wikipedia/en/search/page"
search_params = {'q': query, 'limit': 1}
search_response = requests.get(search_url, headers=headers, params=search_params, timeout=15)
if search_response.status_code != 200:
raise Exception(f"Search error: {search_response.status_code} - {search_response.text}")
results = search_response.json().get("pages", [])
if not results:
raise Exception(f"No results found for query: {query}")
page = results[0]
page_key = page["key"]
return fetch_wikipedia_page(page_key, lang)
def parse_sections(markdown_text: str) -> Dict[str, Dict]:
"""
Parses markdown into a nested dict:
{ section_title: {
"full": full_section_md,
"subsections": { sub_title: sub_md, ... }
}, ... }
"""
# First split top-level sections
top_pat = re.compile(r"^##\s+(.*)$", re.MULTILINE)
top_matches = list(top_pat.finditer(markdown_text))
sections: Dict[str, Dict] = {}
for i, m in enumerate(top_matches):
sec_title = m.group(1).strip()
start = m.start()
end = top_matches[i+1].start() if i+1 < len(top_matches) else len(markdown_text)
sec_md = markdown_text[start:end].strip()
# Now split subsections within this block
sub_pat = re.compile(r"^###\s+(.*)$", re.MULTILINE)
subs: Dict[str, str] = {}
sub_matches = list(sub_pat.finditer(sec_md))
for j, sm in enumerate(sub_matches):
sub_title = sm.group(1).strip()
sub_start = sm.start()
sub_end = sub_matches[j+1].start() if j+1 < len(sub_matches) else len(sec_md)
subs[sub_title] = sec_md[sub_start:sub_end].strip()
sections[sec_title] = {"full": sec_md, "subsections": subs}
return sections