chatui-helper / scraping_service.py
milwright's picture
Integrate Crawl4AI for enhanced web scraping
f3a35a2
raw
history blame
4.96 kB
import asyncio
from crawl4ai import AsyncWebCrawler
from crawl4ai.extraction_strategy import JsonCssExtractionStrategy, LLMExtractionStrategy
import json
from typing import List, Dict, Optional
class Crawl4AIScraper:
"""Web scraping service using Crawl4AI for better content extraction"""
def __init__(self):
self.crawler = None
async def __aenter__(self):
"""Initialize the crawler when entering async context"""
self.crawler = AsyncWebCrawler(verbose=False)
await self.crawler.__aenter__()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Clean up the crawler when exiting async context"""
if self.crawler:
await self.crawler.__aexit__(exc_type, exc_val, exc_tb)
async def scrape_url(self, url: str, max_chars: int = 4000) -> str:
"""
Scrape a single URL and extract text content
Args:
url: The URL to scrape
max_chars: Maximum characters to return (default 4000)
Returns:
Extracted text content or error message
"""
try:
# Perform the crawl
result = await self.crawler.arun(
url=url,
bypass_cache=True,
word_count_threshold=10,
excluded_tags=['script', 'style', 'nav', 'header', 'footer'],
remove_overlay_elements=True
)
if result.success:
# Get cleaned text content - prefer markdown over cleaned_html
content = result.markdown or result.cleaned_html or ""
# Truncate if needed
if len(content) > max_chars:
content = content[:max_chars] + "..."
return content
else:
return f"Error fetching {url}: Failed to retrieve content"
except Exception as e:
return f"Error fetching {url}: {str(e)}"
async def scrape_multiple_urls(self, urls: List[str], max_chars_per_url: int = 4000) -> Dict[str, str]:
"""
Scrape multiple URLs concurrently
Args:
urls: List of URLs to scrape
max_chars_per_url: Maximum characters per URL
Returns:
Dictionary mapping URLs to their content
"""
tasks = [self.scrape_url(url, max_chars_per_url) for url in urls if url and url.strip()]
results = await asyncio.gather(*tasks, return_exceptions=True)
url_content = {}
for url, result in zip(urls, results):
if isinstance(result, Exception):
url_content[url] = f"Error fetching {url}: {str(result)}"
else:
url_content[url] = result
return url_content
def get_grounding_context_crawl4ai(urls: List[str]) -> str:
"""
Synchronous wrapper to fetch grounding context using Crawl4AI
Args:
urls: List of URLs to fetch context from
Returns:
Formatted grounding context string
"""
if not urls:
return ""
# Filter valid URLs
valid_urls = [url for url in urls if url and url.strip()]
if not valid_urls:
return ""
async def fetch_all():
async with Crawl4AIScraper() as scraper:
return await scraper.scrape_multiple_urls(valid_urls)
# Run the async function - handle existing event loop
try:
loop = asyncio.get_running_loop()
# We're already in an async context, create a new event loop in a thread
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(asyncio.run, fetch_all())
url_content = future.result()
except RuntimeError:
# No event loop running, we can use asyncio.run directly
url_content = asyncio.run(fetch_all())
except Exception as e:
return f"Error initializing scraper: {str(e)}"
# Format the context
context_parts = []
for i, (url, content) in enumerate(url_content.items(), 1):
context_parts.append(f"Context from URL {i} ({url}):\n{content}")
if context_parts:
return "\n\n" + "\n\n".join(context_parts) + "\n\n"
return ""
# Backwards compatibility function
def fetch_url_content_crawl4ai(url: str) -> str:
"""
Synchronous wrapper for single URL scraping (backwards compatibility)
Args:
url: The URL to fetch
Returns:
Extracted content or error message
"""
async def fetch_one():
async with Crawl4AIScraper() as scraper:
return await scraper.scrape_url(url)
try:
return asyncio.run(fetch_one())
except Exception as e:
return f"Error fetching {url}: {str(e)}"