import requests from bs4 import BeautifulSoup, Tag from urllib.parse import urljoin, urlparse import re from typing import List, Set, Optional, Callable, Tuple import xml.etree.ElementTree as ET # Added for Sitemap parsing from ankigen_core.models import CrawledPage from ankigen_core.utils import RateLimiter, get_logger from ankigen_core.logging import logger # Added class WebCrawler: def __init__( self, start_url: str, max_depth: int = 2, requests_per_second: float = 1.0, user_agent: str = "AnkiGenBot/1.0", include_patterns: Optional[List[str]] = None, exclude_patterns: Optional[List[str]] = None, sitemap_url: Optional[str] = None, # Added for Sitemap (Task 14.1) use_sitemap: bool = False, # Added for Sitemap (Task 14.1) ): self.start_url = start_url self.parsed_start_url = urlparse(start_url) self.base_domain = self.parsed_start_url.netloc self.max_depth = max_depth self.requests_per_second = requests_per_second self.delay = 1.0 / requests_per_second if requests_per_second > 0 else 0 self.user_agent = user_agent self.visited_urls: Set[str] = set() self.include_patterns = ( [re.compile(p) for p in include_patterns] if include_patterns else [] ) self.exclude_patterns = ( [re.compile(p) for p in exclude_patterns] if exclude_patterns else [] ) self.sitemap_url = sitemap_url # Added for Sitemap (Task 14.1) self.use_sitemap = use_sitemap # Added for Sitemap (Task 14.1) self.logger = get_logger() self.session = requests.Session() self.session.headers.update({"User-Agent": self.user_agent}) self.rate_limiter = RateLimiter(self.requests_per_second) def _is_valid_url(self, url: str) -> bool: """ Checks if the URL is valid for crawling (same domain, scheme, matches patterns). """ try: parsed_url = urlparse(url) if not parsed_url.scheme or parsed_url.scheme.lower() not in [ "http", "https", ]: logger.debug(f"Invalid scheme for URL: {url}") return False if parsed_url.netloc != self.base_domain: logger.debug(f"URL {url} not in base domain {self.base_domain}") return False # Check include patterns if self.include_patterns and not any( p.search(url) for p in self.include_patterns ): logger.debug(f"URL {url} did not match any include patterns.") return False # Check exclude patterns if self.exclude_patterns and any( p.search(url) for p in self.exclude_patterns ): logger.debug(f"URL {url} matched an exclude pattern.") return False except ValueError: # Handle potential errors from urlparse on malformed URLs logger.warning(f"ValueError when parsing URL: {url}", exc_info=True) return False return True def _extract_links(self, soup: BeautifulSoup, base_url: str) -> List[str]: """ Extracts, normalizes, and validates links from a BeautifulSoup object. """ found_links: Set[str] = set() for a_tag in soup.find_all("a", href=True): href = a_tag["href"] if not href: # Skip if href is empty continue href = href.strip() if ( not href or href.startswith("#") or href.lower().startswith(("javascript:", "mailto:", "tel:")) ): continue try: # Construct absolute URL absolute_url = urljoin(base_url, href) # Normalize: remove fragment and ensure scheme parsed_absolute_url = urlparse(absolute_url) normalized_url = parsed_absolute_url._replace(fragment="").geturl() # Re-parse to check scheme after normalization, urljoin might produce schemeless if base had none and href was absolute-path-relative final_parsed_url = urlparse(normalized_url) if not final_parsed_url.scheme: base_parsed_url = urlparse(self.start_url) normalized_url = final_parsed_url._replace( scheme=base_parsed_url.scheme ).geturl() if self._is_valid_url(normalized_url): found_links.add(normalized_url) except ValueError as e: logger.warning( f"Skipping malformed link {href} from base {base_url}: {e}", exc_info=False, ) continue return list(found_links) def _extract_text(self, soup: BeautifulSoup) -> str: """ Extracts and cleans text content from a BeautifulSoup object. """ for script_or_style in soup(["script", "style"]): script_or_style.decompose() text = soup.get_text(separator=" ", strip=True) return text # --- Sitemap Processing Methods (Task 14.1) --- def _fetch_sitemap_content(self, sitemap_url: str) -> Optional[str]: """Fetches the content of a given sitemap URL.""" self.logger.info(f"Fetching sitemap content from: {sitemap_url}") try: response = self.session.get(sitemap_url, timeout=10) response.raise_for_status() return response.text except requests.RequestException as e: self.logger.error(f"Error fetching sitemap {sitemap_url}: {e}") return None def _parse_sitemap(self, sitemap_content: str) -> List[str]: """Parses XML sitemap content and extracts URLs. Handles sitemap indexes.""" urls: List[str] = [] try: root = ET.fromstring(sitemap_content) # Check for sitemap index if root.tag.endswith("sitemapindex"): self.logger.info("Sitemap index detected. Processing sub-sitemaps.") for sitemap_element in root.findall(".//{*}sitemap"): loc_element = sitemap_element.find("{*}loc") if loc_element is not None and loc_element.text: sub_sitemap_url = loc_element.text.strip() self.logger.info(f"Found sub-sitemap: {sub_sitemap_url}") sub_sitemap_content = self._fetch_sitemap_content( sub_sitemap_url ) if sub_sitemap_content: urls.extend(self._parse_sitemap(sub_sitemap_content)) # Process regular sitemap elif root.tag.endswith("urlset"): for url_element in root.findall(".//{*}url"): loc_element = url_element.find("{*}loc") if loc_element is not None and loc_element.text: urls.append(loc_element.text.strip()) else: self.logger.warning(f"Unknown root tag in sitemap: {root.tag}") except ET.ParseError as e: self.logger.error(f"Error parsing sitemap XML: {e}") return list(set(urls)) # Return unique URLs def _get_urls_from_sitemap(self) -> List[str]: """Fetches and parses the sitemap to get a list of URLs.""" if not self.sitemap_url: self.logger.warning( "Sitemap URL is not provided. Cannot fetch URLs from sitemap." ) return [] sitemap_content = self._fetch_sitemap_content(self.sitemap_url) if not sitemap_content: return [] sitemap_urls = self._parse_sitemap(sitemap_content) self.logger.info(f"Extracted {len(sitemap_urls)} unique URLs from sitemap(s).") return sitemap_urls # --- End Sitemap Processing Methods --- def crawl( self, progress_callback: Optional[Callable[[int, int, str], None]] = None ) -> List[CrawledPage]: urls_to_visit: List[Tuple[str, int, Optional[str]]] = [] crawled_pages: List[CrawledPage] = [] initial_total_for_progress = 0 if self.use_sitemap and self.sitemap_url: self.logger.info(f"Attempting to use sitemap: {self.sitemap_url}") sitemap_extracted_urls = self._get_urls_from_sitemap() if sitemap_extracted_urls: for url in sitemap_extracted_urls: if self._is_valid_url( url ): # Checks domain, include/exclude patterns urls_to_visit.append( (url, 0, None) ) # Add with depth 0 and None parent self.logger.info( f"Initialized {len(urls_to_visit)} URLs to visit from sitemap after validation." ) initial_total_for_progress = len(urls_to_visit) else: self.logger.warning( "Sitemap processing yielded no URLs, or sitemap_url not set. Falling back to start_url if provided." ) # Fallback to start_url if sitemap is empty or fails if self._is_valid_url(self.start_url): urls_to_visit.append((self.start_url, 0, None)) # None parent initial_total_for_progress = len(urls_to_visit) else: if self._is_valid_url(self.start_url): urls_to_visit.append((self.start_url, 0, None)) # None parent initial_total_for_progress = len(urls_to_visit) processed_count = 0 while urls_to_visit: current_url, current_depth, current_parent_url = urls_to_visit.pop(0) current_total_for_progress = ( initial_total_for_progress if self.use_sitemap else processed_count + len(urls_to_visit) + 1 ) if progress_callback: progress_callback( processed_count, current_total_for_progress, current_url, ) if current_url in self.visited_urls: self.logger.debug(f"URL already visited: {current_url}. Skipping.") if progress_callback: # When skipping, processed_count doesn't increment, but one item is removed from effective queue for this iteration. # current_total_for_progress should reflect this for accuracy if it's dynamic. # If sitemap, it remains initial_total_for_progress. dynamic_total = ( initial_total_for_progress if self.use_sitemap else processed_count + len(urls_to_visit) + 1 ) progress_callback( processed_count, dynamic_total, f"Skipped (visited): {current_url}", ) continue if current_depth > self.max_depth: logger.debug( f"Skipping URL {current_url} due to depth {current_depth} > max_depth {self.max_depth}" ) continue self.logger.info( f"Crawling (Depth {current_depth}): {current_url} ({processed_count + 1}/{current_total_for_progress})" ) if progress_callback: progress_callback( processed_count, current_total_for_progress, current_url ) self.visited_urls.add(current_url) self.rate_limiter.wait() try: response = self.session.get(current_url, timeout=10) response.raise_for_status() html_content = response.text soup = BeautifulSoup(html_content, "html.parser") # Revert to original BeautifulSoup parsing logic for title, meta_description, meta_keywords page_title_tag = soup.find("title") page_title: Optional[str] = None if isinstance(page_title_tag, Tag) and page_title_tag.string: page_title = page_title_tag.string.strip() else: self.logger.debug(f"No title tag found for {current_url}") meta_desc_tag = soup.find("meta", attrs={"name": "description"}) meta_description: Optional[str] = None if isinstance(meta_desc_tag, Tag): content = meta_desc_tag.get("content") if isinstance(content, str): meta_description = content.strip() elif isinstance(content, list): meta_description = " ".join( str(item) for item in content ).strip() self.logger.debug( f"Meta description for {current_url} was a list, joined: {meta_description}" ) else: self.logger.debug(f"No meta description found for {current_url}") meta_keywords_tag = soup.find("meta", attrs={"name": "keywords"}) meta_keywords: List[str] = [] if isinstance(meta_keywords_tag, Tag): content = meta_keywords_tag.get("content") raw_keywords_content: str = "" if isinstance(content, str): raw_keywords_content = content elif isinstance(content, list): raw_keywords_content = " ".join(str(item) for item in content) self.logger.debug( f"Meta keywords for {current_url} was a list, joined: {raw_keywords_content}" ) if raw_keywords_content: meta_keywords = [ k.strip() for k in raw_keywords_content.split(",") if k.strip() ] else: self.logger.debug(f"No meta keywords found for {current_url}") # End reverted section text_content = self._extract_text(soup) page_data = CrawledPage( url=current_url, html_content=html_content, text_content=text_content, title=page_title, meta_description=meta_description, meta_keywords=meta_keywords, crawl_depth=current_depth, parent_url=current_parent_url, ) crawled_pages.append(page_data) self.logger.info(f"Successfully processed and stored: {current_url}") if current_depth < self.max_depth: found_links = self._extract_links(soup, current_url) self.logger.debug( f"Found {len(found_links)} links on {current_url}" ) for link in found_links: if link not in self.visited_urls: urls_to_visit.append((link, current_depth + 1, current_url)) except requests.exceptions.HTTPError as e: self.logger.error( f"HTTPError for {current_url}: {e.response.status_code} - {e.response.reason}. Response: {e.response.text[:200]}...", exc_info=False, ) processed_count += 1 except requests.exceptions.ConnectionError as e: self.logger.error( f"ConnectionError for {current_url}: {e}", exc_info=False ) processed_count += 1 except requests.exceptions.Timeout as e: self.logger.error(f"Timeout for {current_url}: {e}", exc_info=False) processed_count += 1 except requests.exceptions.RequestException as e: self.logger.error( f"RequestException for {current_url}: {e}", exc_info=True ) processed_count += 1 except Exception as e: self.logger.error( f"An unexpected error occurred while processing {current_url}: {e}", exc_info=True, ) processed_count += 1 self.logger.info( f"Crawl completed. Total pages processed/attempted: {processed_count}. Successfully crawled pages: {len(crawled_pages)}" ) if progress_callback: progress_callback(processed_count, processed_count, "Crawling complete.") return crawled_pages