Spaces:
Sleeping
Sleeping
from fastapi import FastAPI, HTTPException | |
from pydantic import BaseModel, HttpUrl | |
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig, CacheMode | |
from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerator | |
from crawl4ai.content_filter_strategy import BM25ContentFilter, PruningContentFilter | |
import uvicorn | |
import asyncio | |
import nest_asyncio | |
import re | |
from typing import Optional, List, Dict | |
from bs4 import BeautifulSoup | |
from datetime import datetime | |
# Apply nest_asyncio to allow nested event loops | |
nest_asyncio.apply() | |
app = FastAPI( | |
title="Crawl4AI API", | |
description="A web API for Crawl4AI web scraping service", | |
version="1.0.0" | |
) | |
class CrawlRequest(BaseModel): | |
url: HttpUrl | |
cache_mode: str = "DISABLED" | |
excluded_tags: list[str] = ["nav", "footer", "aside", "header", "script", "style"] | |
remove_overlay_elements: bool = True | |
ignore_links: bool = True | |
subject: Optional[str] = None # Optional subject for content filtering | |
class Article(BaseModel): | |
title: str | |
url: str | |
description: Optional[str] = None | |
image_url: Optional[str] = None | |
timestamp: Optional[str] = None | |
category: Optional[str] = None | |
source_url: Optional[str] = None # Added to track original source | |
class CrawlResponse(BaseModel): | |
url: str | |
success: bool | |
error: Optional[str] = None | |
metadata: Dict = {} | |
articles: List[Article] = [] | |
raw_markdown: Optional[str] = None | |
stats: Dict = {} | |
def clean_url(url: str) -> str: | |
"""Clean and normalize URLs""" | |
# Remove angle brackets and spaces | |
url = url.replace('<', '').replace('>', '').strip() | |
# Extract domain from the first https:// occurrence | |
if url.startswith('https://'): | |
domain = url[8:].split('/')[0] | |
# Remove any duplicate domains | |
cleaned_url = url.replace(f'https://{domain}/{domain}', domain) | |
cleaned_url = cleaned_url.replace(f'https://{domain}/https:/', '') | |
cleaned_url = cleaned_url.replace(f'https://{domain}/https://{domain}', domain) | |
# Ensure proper https:// prefix | |
if not cleaned_url.startswith('https://'): | |
cleaned_url = f'https://{cleaned_url}' | |
else: | |
cleaned_url = url | |
# Remove any markdown formatting or extra parameters | |
cleaned_url = cleaned_url.split(' ')[0].split(')')[0] | |
# Remove any trailing slashes | |
cleaned_url = cleaned_url.rstrip('/') | |
return cleaned_url | |
def is_valid_title(title: str) -> bool: | |
"""Check if the title is valid""" | |
invalid_patterns = [ | |
'**_access_time_', | |
'existing code', | |
'...', | |
'navigation', | |
'menu', | |
'logo' | |
] | |
# Check for invalid patterns | |
if any(pattern in title.lower() for pattern in invalid_patterns): | |
return False | |
# Check if it's likely a filename or URL | |
if title.count('-') > 3 or title.count('_') > 2: | |
return False | |
# Check if title is too short | |
if len(title.strip()) < 5: | |
return False | |
return True | |
def clean_description(description: str) -> Optional[str]: | |
"""Clean and normalize description text""" | |
if not description: | |
return None | |
# Remove access_time markers | |
if '_access_time_' in description: | |
return None | |
# Remove markdown links | |
description = re.sub(r'\[([^\]]+)\]\([^)]+\)', r'\1', description) | |
# Remove URLs | |
description = re.sub(r'https?://\S+', '', description) | |
# Remove special characters and extra whitespace | |
description = description.replace('(', '').replace(')', '').replace('<', '').replace('>', '') | |
description = ' '.join(description.split()) | |
return description if len(description) > 10 else None | |
def extract_articles(markdown: str) -> List[Article]: | |
articles = [] | |
seen_urls = set() # Track unique URLs | |
# Updated regex pattern | |
article_pattern = r'(?:!\[([^\]]*)\])?\[([^\]]+)\]\(([^)]+)\)(?:\s*\(([^)]+)\))?\s*(?:\[(.*?)\])?\s*([^[\n]*)' | |
matches = re.finditer(article_pattern, markdown, re.DOTALL) | |
for match in matches: | |
title = match.group(2) # Article title | |
url = match.group(3) # Article URL | |
description = match.group(6) # Description text | |
# Skip if title is invalid | |
if not is_valid_title(title): | |
continue | |
# Clean and validate URL | |
url = clean_url(url) | |
# Skip if URL already processed or is an image | |
if url in seen_urls or url.lower().endswith(('.jpg', '.png', '.gif', '.jpeg')): | |
continue | |
seen_urls.add(url) | |
# Clean description | |
clean_desc = clean_description(description) | |
# Extract image URL if present | |
image_url = None | |
image_match = re.search(r'!\[([^\]]*)\]\(([^)]+)\)', description) if description else None | |
if image_match: | |
image_url = clean_url(image_match.group(2)) | |
article = Article( | |
title=title.strip(), | |
url=url, | |
description=clean_desc, | |
image_url=image_url, | |
timestamp=None, | |
category=None, | |
source_url=None # Will be set later | |
) | |
articles.append(article) | |
return articles | |
def extract_metadata(markdown: str, html: str) -> Dict: | |
metadata = { | |
"timestamp": datetime.now().isoformat(), | |
"categories": [], | |
"total_articles": 0 | |
} | |
# Extract categories | |
category_pattern = r'##\s+\[(.*?)\]' | |
categories = re.findall(category_pattern, markdown) | |
if categories: | |
metadata["categories"] = [cat.strip() for cat in categories] | |
return metadata | |
async def crawl_url(request: CrawlRequest): | |
try: | |
# Force cache mode to DISABLED | |
cache_mode = CacheMode.DISABLED | |
# Configure markdown generator | |
if request.subject: | |
content_filter = BM25ContentFilter( | |
user_query=request.subject, | |
bm25_threshold=1.2 | |
) | |
else: | |
content_filter = PruningContentFilter( | |
threshold=0.48, | |
threshold_type="fixed", | |
min_word_threshold=50 | |
) | |
# Create options dictionary with ignore_images | |
options = {"ignore_images": True} | |
# Add ignore_links if requested | |
if request.ignore_links: | |
options["ignore_links"] = True | |
md_generator = DefaultMarkdownGenerator( | |
content_filter=content_filter, | |
options=options | |
) | |
# Create crawler with configuration | |
async with AsyncWebCrawler() as crawler: | |
config = CrawlerRunConfig( | |
cache_mode=cache_mode, # Always DISABLED | |
excluded_tags=request.excluded_tags, | |
remove_overlay_elements=request.remove_overlay_elements, | |
markdown_generator=md_generator, | |
exclude_external_links=True, | |
exclude_social_media_links=True, | |
exclude_external_images=True, | |
exclude_domains=["facebook.com", "twitter.com", "instagram.com", "youtube.com", "tiktok.com", "pinterest.com"] | |
) | |
result = await crawler.arun( | |
url=str(request.url), | |
config=config | |
) | |
# Process results | |
markdown = result.markdown_v2.raw_markdown | |
html = result.html | |
articles = extract_articles(markdown) | |
metadata = extract_metadata(markdown, html) | |
metadata["subject"] = request.subject | |
for article in articles: | |
article.source_url = str(request.url) | |
return CrawlResponse( | |
url=str(request.url), | |
success=result.success, | |
metadata=metadata, | |
articles=articles, | |
raw_markdown=markdown if result.success else None, | |
stats={ | |
"total_links": len(result.links) if result.links else 0, | |
"processing_time": result.processing_time if hasattr(result, 'processing_time') else None | |
} | |
) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
def read_root(): | |
return { | |
"message": "Welcome to Crawl4AI API", | |
"docs": "/docs", | |
"redoc": "/redoc" | |
} | |
if __name__ == "__main__": | |
uvicorn.run(app, host="0.0.0.0", port=7860) |