Spaces:
Running
Running
import asyncio | |
from crawl4ai import AsyncWebCrawler | |
from crawl4ai.extraction_strategy import JsonCssExtractionStrategy, LLMExtractionStrategy | |
import json | |
from typing import List, Dict, Optional | |
class Crawl4AIScraper: | |
"""Web scraping service using Crawl4AI for better content extraction""" | |
def __init__(self): | |
self.crawler = None | |
async def __aenter__(self): | |
"""Initialize the crawler when entering async context""" | |
self.crawler = AsyncWebCrawler(verbose=False) | |
await self.crawler.__aenter__() | |
return self | |
async def __aexit__(self, exc_type, exc_val, exc_tb): | |
"""Clean up the crawler when exiting async context""" | |
if self.crawler: | |
await self.crawler.__aexit__(exc_type, exc_val, exc_tb) | |
async def scrape_url(self, url: str, max_chars: int = 4000) -> str: | |
""" | |
Scrape a single URL and extract text content | |
Args: | |
url: The URL to scrape | |
max_chars: Maximum characters to return (default 4000) | |
Returns: | |
Extracted text content or error message | |
""" | |
try: | |
# Perform the crawl | |
result = await self.crawler.arun( | |
url=url, | |
bypass_cache=True, | |
word_count_threshold=10, | |
excluded_tags=['script', 'style', 'nav', 'header', 'footer'], | |
remove_overlay_elements=True | |
) | |
if result.success: | |
# Get cleaned text content - prefer markdown over cleaned_html | |
content = result.markdown or result.cleaned_html or "" | |
# Truncate if needed | |
if len(content) > max_chars: | |
content = content[:max_chars] + "..." | |
return content | |
else: | |
return f"Error fetching {url}: Failed to retrieve content" | |
except Exception as e: | |
return f"Error fetching {url}: {str(e)}" | |
async def scrape_multiple_urls(self, urls: List[str], max_chars_per_url: int = 4000) -> Dict[str, str]: | |
""" | |
Scrape multiple URLs concurrently | |
Args: | |
urls: List of URLs to scrape | |
max_chars_per_url: Maximum characters per URL | |
Returns: | |
Dictionary mapping URLs to their content | |
""" | |
tasks = [self.scrape_url(url, max_chars_per_url) for url in urls if url and url.strip()] | |
results = await asyncio.gather(*tasks, return_exceptions=True) | |
url_content = {} | |
for url, result in zip(urls, results): | |
if isinstance(result, Exception): | |
url_content[url] = f"Error fetching {url}: {str(result)}" | |
else: | |
url_content[url] = result | |
return url_content | |
def get_grounding_context_crawl4ai(urls: List[str]) -> str: | |
""" | |
Synchronous wrapper to fetch grounding context using Crawl4AI | |
Args: | |
urls: List of URLs to fetch context from | |
Returns: | |
Formatted grounding context string | |
""" | |
if not urls: | |
return "" | |
# Filter valid URLs | |
valid_urls = [url for url in urls if url and url.strip()] | |
if not valid_urls: | |
return "" | |
async def fetch_all(): | |
async with Crawl4AIScraper() as scraper: | |
return await scraper.scrape_multiple_urls(valid_urls) | |
# Run the async function - handle existing event loop | |
try: | |
loop = asyncio.get_running_loop() | |
# We're already in an async context, create a new event loop in a thread | |
import concurrent.futures | |
with concurrent.futures.ThreadPoolExecutor() as executor: | |
future = executor.submit(asyncio.run, fetch_all()) | |
url_content = future.result() | |
except RuntimeError: | |
# No event loop running, we can use asyncio.run directly | |
url_content = asyncio.run(fetch_all()) | |
except Exception as e: | |
return f"Error initializing scraper: {str(e)}" | |
# Format the context | |
context_parts = [] | |
for i, (url, content) in enumerate(url_content.items(), 1): | |
context_parts.append(f"Context from URL {i} ({url}):\n{content}") | |
if context_parts: | |
return "\n\n" + "\n\n".join(context_parts) + "\n\n" | |
return "" | |
# Backwards compatibility function | |
def fetch_url_content_crawl4ai(url: str) -> str: | |
""" | |
Synchronous wrapper for single URL scraping (backwards compatibility) | |
Args: | |
url: The URL to fetch | |
Returns: | |
Extracted content or error message | |
""" | |
async def fetch_one(): | |
async with Crawl4AIScraper() as scraper: | |
return await scraper.scrape_url(url) | |
try: | |
return asyncio.run(fetch_one()) | |
except Exception as e: | |
return f"Error fetching {url}: {str(e)}" |