|
import requests |
|
from bs4 import BeautifulSoup, Tag |
|
from urllib.parse import urljoin, urlparse |
|
import re |
|
from typing import List, Set, Optional, Callable, Tuple |
|
import xml.etree.ElementTree as ET |
|
|
|
from ankigen_core.models import CrawledPage |
|
from ankigen_core.utils import RateLimiter, get_logger |
|
from ankigen_core.logging import logger |
|
|
|
|
|
class WebCrawler: |
|
def __init__( |
|
self, |
|
start_url: str, |
|
max_depth: int = 2, |
|
requests_per_second: float = 1.0, |
|
user_agent: str = "AnkiGenBot/1.0", |
|
include_patterns: Optional[List[str]] = None, |
|
exclude_patterns: Optional[List[str]] = None, |
|
sitemap_url: Optional[str] = None, |
|
use_sitemap: bool = False, |
|
): |
|
self.start_url = start_url |
|
self.parsed_start_url = urlparse(start_url) |
|
self.base_domain = self.parsed_start_url.netloc |
|
self.max_depth = max_depth |
|
self.requests_per_second = requests_per_second |
|
self.delay = 1.0 / requests_per_second if requests_per_second > 0 else 0 |
|
self.user_agent = user_agent |
|
self.visited_urls: Set[str] = set() |
|
self.include_patterns = ( |
|
[re.compile(p) for p in include_patterns] if include_patterns else [] |
|
) |
|
self.exclude_patterns = ( |
|
[re.compile(p) for p in exclude_patterns] if exclude_patterns else [] |
|
) |
|
self.sitemap_url = sitemap_url |
|
self.use_sitemap = use_sitemap |
|
self.logger = get_logger() |
|
self.session = requests.Session() |
|
self.session.headers.update({"User-Agent": self.user_agent}) |
|
self.rate_limiter = RateLimiter(self.requests_per_second) |
|
|
|
def _is_valid_url(self, url: str) -> bool: |
|
""" |
|
Checks if the URL is valid for crawling (same domain, scheme, matches patterns). |
|
""" |
|
try: |
|
parsed_url = urlparse(url) |
|
if not parsed_url.scheme or parsed_url.scheme.lower() not in [ |
|
"http", |
|
"https", |
|
]: |
|
logger.debug(f"Invalid scheme for URL: {url}") |
|
return False |
|
if parsed_url.netloc != self.base_domain: |
|
logger.debug(f"URL {url} not in base domain {self.base_domain}") |
|
return False |
|
|
|
|
|
if self.include_patterns and not any( |
|
p.search(url) for p in self.include_patterns |
|
): |
|
logger.debug(f"URL {url} did not match any include patterns.") |
|
return False |
|
|
|
|
|
if self.exclude_patterns and any( |
|
p.search(url) for p in self.exclude_patterns |
|
): |
|
logger.debug(f"URL {url} matched an exclude pattern.") |
|
return False |
|
|
|
except ValueError: |
|
logger.warning(f"ValueError when parsing URL: {url}", exc_info=True) |
|
return False |
|
return True |
|
|
|
def _extract_links(self, soup: BeautifulSoup, base_url: str) -> List[str]: |
|
""" |
|
Extracts, normalizes, and validates links from a BeautifulSoup object. |
|
""" |
|
found_links: Set[str] = set() |
|
for a_tag in soup.find_all("a", href=True): |
|
href = a_tag["href"] |
|
if not href: |
|
continue |
|
|
|
href = href.strip() |
|
if ( |
|
not href |
|
or href.startswith("#") |
|
or href.lower().startswith(("javascript:", "mailto:", "tel:")) |
|
): |
|
continue |
|
|
|
try: |
|
|
|
absolute_url = urljoin(base_url, href) |
|
|
|
|
|
parsed_absolute_url = urlparse(absolute_url) |
|
normalized_url = parsed_absolute_url._replace(fragment="").geturl() |
|
|
|
|
|
final_parsed_url = urlparse(normalized_url) |
|
if not final_parsed_url.scheme: |
|
base_parsed_url = urlparse(self.start_url) |
|
normalized_url = final_parsed_url._replace( |
|
scheme=base_parsed_url.scheme |
|
).geturl() |
|
|
|
if self._is_valid_url(normalized_url): |
|
found_links.add(normalized_url) |
|
except ValueError as e: |
|
logger.warning( |
|
f"Skipping malformed link {href} from base {base_url}: {e}", |
|
exc_info=False, |
|
) |
|
continue |
|
|
|
return list(found_links) |
|
|
|
def _extract_text(self, soup: BeautifulSoup) -> str: |
|
""" |
|
Extracts and cleans text content from a BeautifulSoup object. |
|
""" |
|
for script_or_style in soup(["script", "style"]): |
|
script_or_style.decompose() |
|
text = soup.get_text(separator=" ", strip=True) |
|
return text |
|
|
|
|
|
def _fetch_sitemap_content(self, sitemap_url: str) -> Optional[str]: |
|
"""Fetches the content of a given sitemap URL.""" |
|
self.logger.info(f"Fetching sitemap content from: {sitemap_url}") |
|
try: |
|
response = self.session.get(sitemap_url, timeout=10) |
|
response.raise_for_status() |
|
return response.text |
|
except requests.RequestException as e: |
|
self.logger.error(f"Error fetching sitemap {sitemap_url}: {e}") |
|
return None |
|
|
|
def _parse_sitemap(self, sitemap_content: str) -> List[str]: |
|
"""Parses XML sitemap content and extracts URLs. Handles sitemap indexes.""" |
|
urls: List[str] = [] |
|
try: |
|
root = ET.fromstring(sitemap_content) |
|
|
|
|
|
if root.tag.endswith("sitemapindex"): |
|
self.logger.info("Sitemap index detected. Processing sub-sitemaps.") |
|
for sitemap_element in root.findall(".//{*}sitemap"): |
|
loc_element = sitemap_element.find("{*}loc") |
|
if loc_element is not None and loc_element.text: |
|
sub_sitemap_url = loc_element.text.strip() |
|
self.logger.info(f"Found sub-sitemap: {sub_sitemap_url}") |
|
sub_sitemap_content = self._fetch_sitemap_content( |
|
sub_sitemap_url |
|
) |
|
if sub_sitemap_content: |
|
urls.extend(self._parse_sitemap(sub_sitemap_content)) |
|
|
|
elif root.tag.endswith("urlset"): |
|
for url_element in root.findall(".//{*}url"): |
|
loc_element = url_element.find("{*}loc") |
|
if loc_element is not None and loc_element.text: |
|
urls.append(loc_element.text.strip()) |
|
else: |
|
self.logger.warning(f"Unknown root tag in sitemap: {root.tag}") |
|
|
|
except ET.ParseError as e: |
|
self.logger.error(f"Error parsing sitemap XML: {e}") |
|
return list(set(urls)) |
|
|
|
def _get_urls_from_sitemap(self) -> List[str]: |
|
"""Fetches and parses the sitemap to get a list of URLs.""" |
|
if not self.sitemap_url: |
|
self.logger.warning( |
|
"Sitemap URL is not provided. Cannot fetch URLs from sitemap." |
|
) |
|
return [] |
|
|
|
sitemap_content = self._fetch_sitemap_content(self.sitemap_url) |
|
if not sitemap_content: |
|
return [] |
|
|
|
sitemap_urls = self._parse_sitemap(sitemap_content) |
|
self.logger.info(f"Extracted {len(sitemap_urls)} unique URLs from sitemap(s).") |
|
return sitemap_urls |
|
|
|
|
|
|
|
def crawl( |
|
self, progress_callback: Optional[Callable[[int, int, str], None]] = None |
|
) -> List[CrawledPage]: |
|
urls_to_visit: List[Tuple[str, int, Optional[str]]] = [] |
|
crawled_pages: List[CrawledPage] = [] |
|
initial_total_for_progress = 0 |
|
|
|
if self.use_sitemap and self.sitemap_url: |
|
self.logger.info(f"Attempting to use sitemap: {self.sitemap_url}") |
|
sitemap_extracted_urls = self._get_urls_from_sitemap() |
|
if sitemap_extracted_urls: |
|
for url in sitemap_extracted_urls: |
|
if self._is_valid_url( |
|
url |
|
): |
|
urls_to_visit.append( |
|
(url, 0, None) |
|
) |
|
self.logger.info( |
|
f"Initialized {len(urls_to_visit)} URLs to visit from sitemap after validation." |
|
) |
|
initial_total_for_progress = len(urls_to_visit) |
|
else: |
|
self.logger.warning( |
|
"Sitemap processing yielded no URLs, or sitemap_url not set. Falling back to start_url if provided." |
|
) |
|
|
|
if self._is_valid_url(self.start_url): |
|
urls_to_visit.append((self.start_url, 0, None)) |
|
initial_total_for_progress = len(urls_to_visit) |
|
else: |
|
if self._is_valid_url(self.start_url): |
|
urls_to_visit.append((self.start_url, 0, None)) |
|
initial_total_for_progress = len(urls_to_visit) |
|
|
|
processed_count = 0 |
|
while urls_to_visit: |
|
current_url, current_depth, current_parent_url = urls_to_visit.pop(0) |
|
|
|
current_total_for_progress = ( |
|
initial_total_for_progress |
|
if self.use_sitemap |
|
else processed_count + len(urls_to_visit) + 1 |
|
) |
|
|
|
if progress_callback: |
|
progress_callback( |
|
processed_count, |
|
current_total_for_progress, |
|
current_url, |
|
) |
|
|
|
if current_url in self.visited_urls: |
|
self.logger.debug(f"URL already visited: {current_url}. Skipping.") |
|
if progress_callback: |
|
|
|
|
|
|
|
dynamic_total = ( |
|
initial_total_for_progress |
|
if self.use_sitemap |
|
else processed_count + len(urls_to_visit) + 1 |
|
) |
|
progress_callback( |
|
processed_count, |
|
dynamic_total, |
|
f"Skipped (visited): {current_url}", |
|
) |
|
continue |
|
|
|
if current_depth > self.max_depth: |
|
logger.debug( |
|
f"Skipping URL {current_url} due to depth {current_depth} > max_depth {self.max_depth}" |
|
) |
|
continue |
|
|
|
self.logger.info( |
|
f"Crawling (Depth {current_depth}): {current_url} ({processed_count + 1}/{current_total_for_progress})" |
|
) |
|
|
|
if progress_callback: |
|
progress_callback( |
|
processed_count, current_total_for_progress, current_url |
|
) |
|
|
|
self.visited_urls.add(current_url) |
|
|
|
self.rate_limiter.wait() |
|
|
|
try: |
|
response = self.session.get(current_url, timeout=10) |
|
response.raise_for_status() |
|
html_content = response.text |
|
soup = BeautifulSoup(html_content, "html.parser") |
|
|
|
|
|
page_title_tag = soup.find("title") |
|
page_title: Optional[str] = None |
|
if isinstance(page_title_tag, Tag) and page_title_tag.string: |
|
page_title = page_title_tag.string.strip() |
|
else: |
|
self.logger.debug(f"No title tag found for {current_url}") |
|
|
|
meta_desc_tag = soup.find("meta", attrs={"name": "description"}) |
|
meta_description: Optional[str] = None |
|
if isinstance(meta_desc_tag, Tag): |
|
content = meta_desc_tag.get("content") |
|
if isinstance(content, str): |
|
meta_description = content.strip() |
|
elif isinstance(content, list): |
|
meta_description = " ".join( |
|
str(item) for item in content |
|
).strip() |
|
self.logger.debug( |
|
f"Meta description for {current_url} was a list, joined: {meta_description}" |
|
) |
|
else: |
|
self.logger.debug(f"No meta description found for {current_url}") |
|
|
|
meta_keywords_tag = soup.find("meta", attrs={"name": "keywords"}) |
|
meta_keywords: List[str] = [] |
|
if isinstance(meta_keywords_tag, Tag): |
|
content = meta_keywords_tag.get("content") |
|
raw_keywords_content: str = "" |
|
if isinstance(content, str): |
|
raw_keywords_content = content |
|
elif isinstance(content, list): |
|
raw_keywords_content = " ".join(str(item) for item in content) |
|
self.logger.debug( |
|
f"Meta keywords for {current_url} was a list, joined: {raw_keywords_content}" |
|
) |
|
|
|
if raw_keywords_content: |
|
meta_keywords = [ |
|
k.strip() |
|
for k in raw_keywords_content.split(",") |
|
if k.strip() |
|
] |
|
else: |
|
self.logger.debug(f"No meta keywords found for {current_url}") |
|
|
|
|
|
text_content = self._extract_text(soup) |
|
|
|
page_data = CrawledPage( |
|
url=current_url, |
|
html_content=html_content, |
|
text_content=text_content, |
|
title=page_title, |
|
meta_description=meta_description, |
|
meta_keywords=meta_keywords, |
|
crawl_depth=current_depth, |
|
parent_url=current_parent_url, |
|
) |
|
crawled_pages.append(page_data) |
|
self.logger.info(f"Successfully processed and stored: {current_url}") |
|
|
|
if current_depth < self.max_depth: |
|
found_links = self._extract_links(soup, current_url) |
|
self.logger.debug( |
|
f"Found {len(found_links)} links on {current_url}" |
|
) |
|
for link in found_links: |
|
if link not in self.visited_urls: |
|
urls_to_visit.append((link, current_depth + 1, current_url)) |
|
|
|
except requests.exceptions.HTTPError as e: |
|
self.logger.error( |
|
f"HTTPError for {current_url}: {e.response.status_code} - {e.response.reason}. Response: {e.response.text[:200]}...", |
|
exc_info=False, |
|
) |
|
processed_count += 1 |
|
except requests.exceptions.ConnectionError as e: |
|
self.logger.error( |
|
f"ConnectionError for {current_url}: {e}", exc_info=False |
|
) |
|
processed_count += 1 |
|
except requests.exceptions.Timeout as e: |
|
self.logger.error(f"Timeout for {current_url}: {e}", exc_info=False) |
|
processed_count += 1 |
|
except requests.exceptions.RequestException as e: |
|
self.logger.error( |
|
f"RequestException for {current_url}: {e}", exc_info=True |
|
) |
|
processed_count += 1 |
|
except Exception as e: |
|
self.logger.error( |
|
f"An unexpected error occurred while processing {current_url}: {e}", |
|
exc_info=True, |
|
) |
|
processed_count += 1 |
|
|
|
self.logger.info( |
|
f"Crawl completed. Total pages processed/attempted: {processed_count}. Successfully crawled pages: {len(crawled_pages)}" |
|
) |
|
if progress_callback: |
|
progress_callback(processed_count, processed_count, "Crawling complete.") |
|
|
|
return crawled_pages |
|
|