Spaces:
Sleeping
Sleeping
| import os, time | |
| os.environ["TOKENIZERS_PARALLELISM"] = "false" | |
| from pathlib import Path | |
| from .models import UrlModel, CrawlResult | |
| from .database import init_db, get_cached_url, cache_url, DB_PATH, flush_db | |
| from .utils import * | |
| from .chunking_strategy import * | |
| from .extraction_strategy import * | |
| from .crawler_strategy import * | |
| from typing import List | |
| from concurrent.futures import ThreadPoolExecutor | |
| from .content_scraping_strategy import WebScrapingStrategy | |
| from .config import * | |
| import warnings | |
| import json | |
| warnings.filterwarnings("ignore", message='Field "model_name" has conflict with protected namespace "model_".') | |
| class WebCrawler: | |
| def __init__(self, crawler_strategy: CrawlerStrategy = None, always_by_pass_cache: bool = False, verbose: bool = False): | |
| self.crawler_strategy = crawler_strategy or LocalSeleniumCrawlerStrategy(verbose=verbose) | |
| self.always_by_pass_cache = always_by_pass_cache | |
| self.crawl4ai_folder = os.path.join(os.getenv("CRAWL4_AI_BASE_DIRECTORY", Path.home()), ".crawl4ai") | |
| os.makedirs(self.crawl4ai_folder, exist_ok=True) | |
| os.makedirs(f"{self.crawl4ai_folder}/cache", exist_ok=True) | |
| init_db() | |
| self.ready = False | |
| def warmup(self): | |
| print("[LOG] π€οΈ Warming up the WebCrawler") | |
| self.run( | |
| url='https://google.com/', | |
| word_count_threshold=5, | |
| extraction_strategy=NoExtractionStrategy(), | |
| bypass_cache=False, | |
| verbose=False | |
| ) | |
| self.ready = True | |
| print("[LOG] π WebCrawler is ready to crawl") | |
| def fetch_page( | |
| self, | |
| url_model: UrlModel, | |
| provider: str = DEFAULT_PROVIDER, | |
| api_token: str = None, | |
| extract_blocks_flag: bool = True, | |
| word_count_threshold=MIN_WORD_THRESHOLD, | |
| css_selector: str = None, | |
| screenshot: bool = False, | |
| use_cached_html: bool = False, | |
| extraction_strategy: ExtractionStrategy = None, | |
| chunking_strategy: ChunkingStrategy = RegexChunking(), | |
| **kwargs, | |
| ) -> CrawlResult: | |
| return self.run( | |
| url_model.url, | |
| word_count_threshold, | |
| extraction_strategy or NoExtractionStrategy(), | |
| chunking_strategy, | |
| bypass_cache=url_model.forced, | |
| css_selector=css_selector, | |
| screenshot=screenshot, | |
| **kwargs, | |
| ) | |
| pass | |
| def fetch_pages( | |
| self, | |
| url_models: List[UrlModel], | |
| provider: str = DEFAULT_PROVIDER, | |
| api_token: str = None, | |
| extract_blocks_flag: bool = True, | |
| word_count_threshold=MIN_WORD_THRESHOLD, | |
| use_cached_html: bool = False, | |
| css_selector: str = None, | |
| screenshot: bool = False, | |
| extraction_strategy: ExtractionStrategy = None, | |
| chunking_strategy: ChunkingStrategy = RegexChunking(), | |
| **kwargs, | |
| ) -> List[CrawlResult]: | |
| extraction_strategy = extraction_strategy or NoExtractionStrategy() | |
| def fetch_page_wrapper(url_model, *args, **kwargs): | |
| return self.fetch_page(url_model, *args, **kwargs) | |
| with ThreadPoolExecutor() as executor: | |
| results = list( | |
| executor.map( | |
| fetch_page_wrapper, | |
| url_models, | |
| [provider] * len(url_models), | |
| [api_token] * len(url_models), | |
| [extract_blocks_flag] * len(url_models), | |
| [word_count_threshold] * len(url_models), | |
| [css_selector] * len(url_models), | |
| [screenshot] * len(url_models), | |
| [use_cached_html] * len(url_models), | |
| [extraction_strategy] * len(url_models), | |
| [chunking_strategy] * len(url_models), | |
| *[kwargs] * len(url_models), | |
| ) | |
| ) | |
| return results | |
| def run( | |
| self, | |
| url: str, | |
| word_count_threshold=MIN_WORD_THRESHOLD, | |
| extraction_strategy: ExtractionStrategy = None, | |
| chunking_strategy: ChunkingStrategy = RegexChunking(), | |
| bypass_cache: bool = False, | |
| css_selector: str = None, | |
| screenshot: bool = False, | |
| user_agent: str = None, | |
| verbose=True, | |
| **kwargs, | |
| ) -> CrawlResult: | |
| try: | |
| extraction_strategy = extraction_strategy or NoExtractionStrategy() | |
| extraction_strategy.verbose = verbose | |
| if not isinstance(extraction_strategy, ExtractionStrategy): | |
| raise ValueError("Unsupported extraction strategy") | |
| if not isinstance(chunking_strategy, ChunkingStrategy): | |
| raise ValueError("Unsupported chunking strategy") | |
| word_count_threshold = max(word_count_threshold, MIN_WORD_THRESHOLD) | |
| cached = None | |
| screenshot_data = None | |
| extracted_content = None | |
| if not bypass_cache and not self.always_by_pass_cache: | |
| cached = get_cached_url(url) | |
| if kwargs.get("warmup", True) and not self.ready: | |
| return None | |
| if cached: | |
| html = sanitize_input_encode(cached[1]) | |
| extracted_content = sanitize_input_encode(cached[4]) | |
| if screenshot: | |
| screenshot_data = cached[9] | |
| if not screenshot_data: | |
| cached = None | |
| if not cached or not html: | |
| if user_agent: | |
| self.crawler_strategy.update_user_agent(user_agent) | |
| t1 = time.time() | |
| html = sanitize_input_encode(self.crawler_strategy.crawl(url, **kwargs)) | |
| t2 = time.time() | |
| if verbose: | |
| print(f"[LOG] π Crawling done for {url}, success: {bool(html)}, time taken: {t2 - t1:.2f} seconds") | |
| if screenshot: | |
| screenshot_data = self.crawler_strategy.take_screenshot() | |
| crawl_result = self.process_html(url, html, extracted_content, word_count_threshold, extraction_strategy, chunking_strategy, css_selector, screenshot_data, verbose, bool(cached), **kwargs) | |
| crawl_result.success = bool(html) | |
| return crawl_result | |
| except Exception as e: | |
| if not hasattr(e, "msg"): | |
| e.msg = str(e) | |
| print(f"[ERROR] π« Failed to crawl {url}, error: {e.msg}") | |
| return CrawlResult(url=url, html="", success=False, error_message=e.msg) | |
| def process_html( | |
| self, | |
| url: str, | |
| html: str, | |
| extracted_content: str, | |
| word_count_threshold: int, | |
| extraction_strategy: ExtractionStrategy, | |
| chunking_strategy: ChunkingStrategy, | |
| css_selector: str, | |
| screenshot: bool, | |
| verbose: bool, | |
| is_cached: bool, | |
| **kwargs, | |
| ) -> CrawlResult: | |
| t = time.time() | |
| # Extract content from HTML | |
| try: | |
| t1 = time.time() | |
| scrapping_strategy = WebScrapingStrategy() | |
| extra_params = {k: v for k, v in kwargs.items() if k not in ["only_text", "image_description_min_word_threshold"]} | |
| result = scrapping_strategy.scrap( | |
| url, | |
| html, | |
| word_count_threshold=word_count_threshold, | |
| css_selector=css_selector, | |
| only_text=kwargs.get("only_text", False), | |
| image_description_min_word_threshold=kwargs.get( | |
| "image_description_min_word_threshold", IMAGE_DESCRIPTION_MIN_WORD_THRESHOLD | |
| ), | |
| **extra_params, | |
| ) | |
| # result = get_content_of_website_optimized(url, html, word_count_threshold, css_selector=css_selector, only_text=kwargs.get("only_text", False)) | |
| if verbose: | |
| print(f"[LOG] π Content extracted for {url}, success: True, time taken: {time.time() - t1:.2f} seconds") | |
| if result is None: | |
| raise ValueError(f"Failed to extract content from the website: {url}") | |
| except InvalidCSSSelectorError as e: | |
| raise ValueError(str(e)) | |
| cleaned_html = sanitize_input_encode(result.get("cleaned_html", "")) | |
| markdown = sanitize_input_encode(result.get("markdown", "")) | |
| media = result.get("media", []) | |
| links = result.get("links", []) | |
| metadata = result.get("metadata", {}) | |
| if extracted_content is None: | |
| if verbose: | |
| print(f"[LOG] π₯ Extracting semantic blocks for {url}, Strategy: {extraction_strategy.name}") | |
| sections = chunking_strategy.chunk(markdown) | |
| extracted_content = extraction_strategy.run(url, sections) | |
| extracted_content = json.dumps(extracted_content, indent=4, default=str, ensure_ascii=False) | |
| if verbose: | |
| print(f"[LOG] π Extraction done for {url}, time taken: {time.time() - t:.2f} seconds.") | |
| screenshot = None if not screenshot else screenshot | |
| if not is_cached: | |
| cache_url( | |
| url, | |
| html, | |
| cleaned_html, | |
| markdown, | |
| extracted_content, | |
| True, | |
| json.dumps(media), | |
| json.dumps(links), | |
| json.dumps(metadata), | |
| screenshot=screenshot, | |
| ) | |
| return CrawlResult( | |
| url=url, | |
| html=html, | |
| cleaned_html=format_html(cleaned_html), | |
| markdown=markdown, | |
| media=media, | |
| links=links, | |
| metadata=metadata, | |
| screenshot=screenshot, | |
| extracted_content=extracted_content, | |
| success=True, | |
| error_message="", | |
| ) |