import asyncio from crawl4ai import AsyncWebCrawler from crawl4ai.extraction_strategy import JsonCssExtractionStrategy, LLMExtractionStrategy import json from typing import List, Dict, Optional class Crawl4AIScraper: """Web scraping service using Crawl4AI for better content extraction""" def __init__(self): self.crawler = None async def __aenter__(self): """Initialize the crawler when entering async context""" self.crawler = AsyncWebCrawler(verbose=False) await self.crawler.__aenter__() return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Clean up the crawler when exiting async context""" if self.crawler: await self.crawler.__aexit__(exc_type, exc_val, exc_tb) async def scrape_url(self, url: str, max_chars: int = 4000) -> str: """ Scrape a single URL and extract text content Args: url: The URL to scrape max_chars: Maximum characters to return (default 4000) Returns: Extracted text content or error message """ try: # Perform the crawl result = await self.crawler.arun( url=url, bypass_cache=True, word_count_threshold=10, excluded_tags=['script', 'style', 'nav', 'header', 'footer'], remove_overlay_elements=True ) if result.success: # Get cleaned text content - prefer markdown over cleaned_html content = result.markdown or result.cleaned_html or "" # Truncate if needed if len(content) > max_chars: content = content[:max_chars] + "..." return content else: return f"Error fetching {url}: Failed to retrieve content" except Exception as e: return f"Error fetching {url}: {str(e)}" async def scrape_multiple_urls(self, urls: List[str], max_chars_per_url: int = 4000) -> Dict[str, str]: """ Scrape multiple URLs concurrently Args: urls: List of URLs to scrape max_chars_per_url: Maximum characters per URL Returns: Dictionary mapping URLs to their content """ tasks = [self.scrape_url(url, max_chars_per_url) for url in urls if url and url.strip()] results = await asyncio.gather(*tasks, return_exceptions=True) url_content = {} for url, result in zip(urls, results): if isinstance(result, Exception): url_content[url] = f"Error fetching {url}: {str(result)}" else: url_content[url] = result return url_content def get_grounding_context_crawl4ai(urls: List[str]) -> str: """ Synchronous wrapper to fetch grounding context using Crawl4AI Args: urls: List of URLs to fetch context from Returns: Formatted grounding context string """ if not urls: return "" # Filter valid URLs valid_urls = [url for url in urls if url and url.strip()] if not valid_urls: return "" async def fetch_all(): async with Crawl4AIScraper() as scraper: return await scraper.scrape_multiple_urls(valid_urls) # Run the async function - handle existing event loop try: loop = asyncio.get_running_loop() # We're already in an async context, create a new event loop in a thread import concurrent.futures with concurrent.futures.ThreadPoolExecutor() as executor: future = executor.submit(asyncio.run, fetch_all()) url_content = future.result() except RuntimeError: # No event loop running, we can use asyncio.run directly url_content = asyncio.run(fetch_all()) except Exception as e: return f"Error initializing scraper: {str(e)}" # Format the context context_parts = [] for i, (url, content) in enumerate(url_content.items(), 1): context_parts.append(f"Context from URL {i} ({url}):\n{content}") if context_parts: return "\n\n" + "\n\n".join(context_parts) + "\n\n" return "" # Backwards compatibility function def fetch_url_content_crawl4ai(url: str) -> str: """ Synchronous wrapper for single URL scraping (backwards compatibility) Args: url: The URL to fetch Returns: Extracted content or error message """ async def fetch_one(): async with Crawl4AIScraper() as scraper: return await scraper.scrape_url(url) try: return asyncio.run(fetch_one()) except Exception as e: return f"Error fetching {url}: {str(e)}"