File size: 11,972 Bytes
1284099
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
import asyncio
import logging
import re
import requests

from bs4 import BeautifulSoup
from langchain.chains import RetrievalQA
from langchain_community.document_loaders import WebBaseLoader
from langchain_core.documents import Document
from markdownify import markdownify as md
from playwright.async_api import async_playwright
from typing import Any, AsyncIterator, Dict, List, Iterator, Optional, Sequence, Union


logger = logging.getLogger(__name__)

UNWANTED_SECTIONS = {
    "references",
    "external links",
    "further reading",
    "see also",
    "notes",
}


def build_metadata(soup: Any, url: str) -> dict:
    """Build metadata from BeautifulSoup output."""
    metadata = {"source": url}
    if title := soup.find("title"):
        metadata["title"] = title.get_text()
    if description := soup.find("meta", attrs={"name": "description"}):
        metadata["description"] = description.get("content", "No description found.")
    if html := soup.find("html"):
        metadata["language"] = html.get("lang", "No language found.")
    return metadata


class MarkdownWebBaseLoader(WebBaseLoader):
    """
    A WebBaseLoader subclass that uses Playwright to render JS, then
    strips boilerplate and converts structured pieces to Markdown.
    """
    def __init__(
        self,
        web_path: Union[str, Sequence[str]] = "",
        header_template: Optional[dict] = None,
        verify_ssl: bool = True,
        proxies: Optional[dict] = None,
        continue_on_failure: bool = False,
        autoset_encoding: bool = True,
        encoding: Optional[str] = None,
        web_paths: Sequence[str] = (),
        requests_per_second: int = 2,
        default_parser: str = "html.parser",
        requests_kwargs: Optional[Dict[str, Any]] = None,
        raise_for_status: bool = False,
        bs_get_text_kwargs: Optional[Dict[str, Any]] = None,
        bs_kwargs: Optional[Dict[str, Any]] = None,
        session: Any = None,
        markdown_kwargs: Optional[Dict[str, Any]] = None,
        unwanted_css: Optional[List[str]] = None,
        unwanted_headings: Optional[List[str]] = None,
        render_wait: float = 1.0,
        *,
        show_progress: bool = True,
        trust_env: bool = False,
    ) -> None:
        """Initialize loader.

        Args:
            markdown_kwargs: Optional[Dict[str, Any]]: Arguments for markdownify.
            unwanted_css: Optional[List[str]]: CSS selectors to remove from the page.
            unwanted_headings: Optional[List[str]]: Headings to remove from the page.
            render_wait: float: Time to wait for JS rendering (default: 2.0 seconds).
        """
        super().__init__(
            web_path=web_path,
            header_template=header_template,
            verify_ssl=verify_ssl,
            proxies=proxies,
            continue_on_failure=continue_on_failure,
            autoset_encoding=autoset_encoding,
            encoding=encoding,
            web_paths=web_paths,
            requests_per_second=requests_per_second,
            default_parser=default_parser,
            requests_kwargs=requests_kwargs,
            raise_for_status=raise_for_status,
            bs_get_text_kwargs=bs_get_text_kwargs,
            bs_kwargs=bs_kwargs,
            session=session,
            show_progress=show_progress,
            trust_env=trust_env,
        )
        self.markdown_kwargs = markdown_kwargs or {
            "heading_style": "ATX",
            "bullets": "*+-",
            "strip": ["a", "span"],
            "table_infer_header": True
        }
        self.unwanted_css = unwanted_css or [
            ".toc", ".navbox", ".sidebar", ".advertisement", ".cookie-banner", ".vertical-navbox",
            ".hatnote", ".reflist", ".mw-references-wrap"
        ]
        self.unwanted_headings = [h.lower() for h in (unwanted_headings or UNWANTED_SECTIONS)]
        self.render_wait = render_wait

    @staticmethod
    def _should_render(html: str, soup: Any) -> bool:
        low_text = len(soup.get_text(strip=True)) < 100
        has_noscript = bool(soup.find("noscript"))
        cf_challenge = "just a moment" in html.lower() or "enable javascript" in html.lower()
        many_scripts = len(soup.find_all("script")) > 20
        return has_noscript or cf_challenge or low_text or many_scripts

    async def _fetch_with_playwright(self, url: str) -> str:
        async with async_playwright() as pw:
            browser = await pw.chromium.launch(headless=True)
            page = await browser.new_page()
            # If you need cookies/auth, you can do:
            # await page.set_extra_http_headers(self.session.headers)
            await page.goto(url)
            await asyncio.sleep(self.render_wait)  # allow JS to finish
            content = await page.content()
            await browser.close()
            return content

    def _scrape(
            self,
            url: str,
            parser: Union[str, None] = None,
            bs_kwargs: Optional[dict] = None,
    ) -> Any:
        if parser is None:
            parser = "xml" if url.endswith(".xml") else self.default_parser
        self._check_parser(parser)

        resp = self.session.get(url, **self.requests_kwargs)
        if self.raise_for_status:
            resp.raise_for_status()
        if self.encoding is not None:
            resp.encoding = self.encoding
        elif self.autoset_encoding:
            resp.encoding = resp.apparent_encoding
        html = resp.text

        soup = BeautifulSoup(html, parser, **(bs_kwargs or {}))

        # If the html looks JS-heavy, re-render with Playwright
        if not url.endswith(".xml") and self._should_render(html, soup):
            try:
                rendered = asyncio.run(self._fetch_with_playwright(url))
                soup = BeautifulSoup(rendered, parser, **(bs_kwargs or {}))
            except Exception as e:
                logger.warning("Playwright rendering failed for %s: %s. Falling back to requests.", url, e)

        return soup

    @staticmethod
    def normalize_whitespace(text: str) -> str:
        """
        Collapse runs of spaces, tabs, etc. down to single spaces—but skip
        inside fenced code blocks ```…``` or inline code `…`.
        """
        # Replace non-breaking and invisible spaces with regular spaces
        text = text.replace("\u00A0", " ")
        # Strip zero-width spaces:
        text = re.sub(r"[\u200B\u200C\u200D\uFEFF]", "", text)

        # Split out fenced code -> keep code blocks intact while normalizing other text
        parts = re.split(r'(```.*?```)', text, flags=re.S)
        for i, part in enumerate(parts):
            if not part.startswith("```"):
                # further split out inline code
                subparts = re.split(r'(`[^`\n]+`)', part)
                for j, sp in enumerate(subparts):
                    if not sp.startswith("`"):
                        # collapse whitespace, strip edges of each segment
                        subparts[j] = re.sub(r'[ \t\r\f\v]+', ' ', sp).strip()
                parts[i] = "".join(subparts)
        # Rejoin and ensure paragraphs are separated by a single blank line
        normalized = "\n\n".join(p for p in parts if p.strip() != "")
        return normalized

    def _convert_soup_to_text(self, soup: Any) -> str:
        # Strip scripts & styles
        for tag in soup(["script", "style"]):
            tag.decompose()
        # Drop blocks whose first heading matches unwanted
        for sec in soup.find_all(["section", "div", "aside"]):
            h = sec.find(["h1", "h2", "h3", "h4", "h5", "h6"])
            if h and any(h.get_text(strip=True).lower().startswith(u) for u in self.unwanted_headings):
                sec.decompose()
        # Drop by CSS selector
        for sel in self.unwanted_css:
            for el in soup.select(sel):
                el.decompose()
        # Isolate the main content container if present
        soup = soup.find("div", class_="mw-parser-output") or soup.find("main") or soup.find("article") or soup

        # Convert to Markdown text with markdownify
        markdown = md(str(soup), **self.markdown_kwargs)
        markdown = self.normalize_whitespace(markdown)
        return markdown

    def lazy_load(self) -> Iterator[Document]:
        """Lazy load text from the url(s) in web_path."""
        for path in self.web_paths:
            soup = self._scrape(path, bs_kwargs=self.bs_kwargs)
            text = self._convert_soup_to_text(soup)
            metadata = build_metadata(soup, path)
            yield Document(page_content=text, metadata=metadata)

    async def alazy_load(self) -> AsyncIterator[Document]:
        """Async lazy load text from the url(s) in web_path."""
        results = await self.ascrape_all(self.web_paths)
        for path, soup in zip(self.web_paths, results):
            text = self._convert_soup_to_text(soup)
            metadata = build_metadata(soup, path)
            yield Document(page_content=text, metadata=metadata)


def fetch_wikipedia_page(page_key: str, lang: str = "en") -> Dict[str, str]:
    """Fetches a Wikipedia page by its key and returns its content in Markdown format.

    Args:
        page_key (str): The unique key of the Wikipedia page.
        lang (str): The language code for the Wikipedia edition to fetch (default: "en").
    """
    page_key = page_key.replace(" ", "_")  # Ensure the page key is URL-safe
    page_url = f"https://api.wikimedia.org/core/v1/wikipedia/{lang}/page/{page_key}/html"
    visit_website_tool = MarkdownWebBaseLoader(page_url)
    markdown = visit_website_tool.load()[0].page_content
    return {
        "page_key": page_key,
        "markdown": markdown,
    }


def get_wikipedia_article(query: str, lang: str = "en") -> Dict[str, str]:
    """Searches and fetches a Wikipedia article for a given query and returns its content in Markdown format.

    Args:
        query (str): The search query.
        lang (str): The language code for the Wikipedia edition to search (default: "en").
    """
    headers = {
        'User-Agent': 'MyLLMAgent ([email protected])'
    }

    search_url = f"https://api.wikimedia.org/core/v1/wikipedia/en/search/page"
    search_params = {'q': query, 'limit': 1}
    search_response = requests.get(search_url, headers=headers, params=search_params, timeout=15)

    if search_response.status_code != 200:
        raise Exception(f"Search error: {search_response.status_code} - {search_response.text}")

    results = search_response.json().get("pages", [])
    if not results:
        raise Exception(f"No results found for query: {query}")

    page = results[0]
    page_key = page["key"]

    return fetch_wikipedia_page(page_key, lang)


def parse_sections(markdown_text: str) -> Dict[str, Dict]:
    """
    Parses markdown into a nested dict:
    { section_title: {
         "full": full_section_md,
         "subsections": { sub_title: sub_md, ... }
      }, ... }
    """
    # First split top-level sections
    top_pat = re.compile(r"^##\s+(.*)$", re.MULTILINE)
    top_matches = list(top_pat.finditer(markdown_text))
    sections: Dict[str, Dict] = {}
    for i, m in enumerate(top_matches):
        sec_title = m.group(1).strip()
        start = m.start()
        end = top_matches[i+1].start() if i+1 < len(top_matches) else len(markdown_text)
        sec_md = markdown_text[start:end].strip()

        # Now split subsections within this block
        sub_pat = re.compile(r"^###\s+(.*)$", re.MULTILINE)
        subs: Dict[str, str] = {}
        sub_matches = list(sub_pat.finditer(sec_md))
        for j, sm in enumerate(sub_matches):
            sub_title = sm.group(1).strip()
            sub_start = sm.start()
            sub_end = sub_matches[j+1].start() if j+1 < len(sub_matches) else len(sec_md)
            subs[sub_title] = sec_md[sub_start:sub_end].strip()

        sections[sec_title] = {"full": sec_md, "subsections": subs}
    return sections