from fastapi import FastAPI, HTTPException from pydantic import BaseModel, HttpUrl from crawl4ai import AsyncWebCrawler, CrawlerRunConfig, CacheMode from crawl4ai.markdown_generation_strategy import DefaultMarkdownGenerator from crawl4ai.content_filter_strategy import BM25ContentFilter, PruningContentFilter import uvicorn import asyncio import nest_asyncio import re from typing import Optional, List, Dict from bs4 import BeautifulSoup from datetime import datetime # Apply nest_asyncio to allow nested event loops nest_asyncio.apply() app = FastAPI( title="Crawl4AI API", description="A web API for Crawl4AI web scraping service", version="1.0.0" ) class CrawlRequest(BaseModel): url: HttpUrl cache_mode: str = "DISABLED" excluded_tags: list[str] = ["nav", "footer", "aside", "header", "script", "style"] remove_overlay_elements: bool = True ignore_links: bool = True subject: Optional[str] = None # Optional subject for content filtering class Article(BaseModel): title: str url: str description: Optional[str] = None image_url: Optional[str] = None timestamp: Optional[str] = None category: Optional[str] = None source_url: Optional[str] = None # Added to track original source class CrawlResponse(BaseModel): url: str success: bool error: Optional[str] = None metadata: Dict = {} articles: List[Article] = [] raw_markdown: Optional[str] = None stats: Dict = {} def clean_url(url: str) -> str: """Clean and normalize URLs""" # Remove angle brackets and spaces url = url.replace('<', '').replace('>', '').strip() # Extract domain from the first https:// occurrence if url.startswith('https://'): domain = url[8:].split('/')[0] # Remove any duplicate domains cleaned_url = url.replace(f'https://{domain}/{domain}', domain) cleaned_url = cleaned_url.replace(f'https://{domain}/https:/', '') cleaned_url = cleaned_url.replace(f'https://{domain}/https://{domain}', domain) # Ensure proper https:// prefix if not cleaned_url.startswith('https://'): cleaned_url = f'https://{cleaned_url}' else: cleaned_url = url # Remove any markdown formatting or extra parameters cleaned_url = cleaned_url.split(' ')[0].split(')')[0] # Remove any trailing slashes cleaned_url = cleaned_url.rstrip('/') return cleaned_url def is_valid_title(title: str) -> bool: """Check if the title is valid""" invalid_patterns = [ '**_access_time_', 'existing code', '...', 'navigation', 'menu', 'logo' ] # Check for invalid patterns if any(pattern in title.lower() for pattern in invalid_patterns): return False # Check if it's likely a filename or URL if title.count('-') > 3 or title.count('_') > 2: return False # Check if title is too short if len(title.strip()) < 5: return False return True def clean_description(description: str) -> Optional[str]: """Clean and normalize description text""" if not description: return None # Remove access_time markers if '_access_time_' in description: return None # Remove markdown links description = re.sub(r'\[([^\]]+)\]\([^)]+\)', r'\1', description) # Remove URLs description = re.sub(r'https?://\S+', '', description) # Remove special characters and extra whitespace description = description.replace('(', '').replace(')', '').replace('<', '').replace('>', '') description = ' '.join(description.split()) return description if len(description) > 10 else None def extract_articles(markdown: str) -> List[Article]: articles = [] seen_urls = set() # Track unique URLs # Updated regex pattern article_pattern = r'(?:!\[([^\]]*)\])?\[([^\]]+)\]\(([^)]+)\)(?:\s*\(([^)]+)\))?\s*(?:\[(.*?)\])?\s*([^[\n]*)' matches = re.finditer(article_pattern, markdown, re.DOTALL) for match in matches: title = match.group(2) # Article title url = match.group(3) # Article URL description = match.group(6) # Description text # Skip if title is invalid if not is_valid_title(title): continue # Clean and validate URL url = clean_url(url) # Skip if URL already processed or is an image if url in seen_urls or url.lower().endswith(('.jpg', '.png', '.gif', '.jpeg')): continue seen_urls.add(url) # Clean description clean_desc = clean_description(description) # Extract image URL if present image_url = None image_match = re.search(r'!\[([^\]]*)\]\(([^)]+)\)', description) if description else None if image_match: image_url = clean_url(image_match.group(2)) article = Article( title=title.strip(), url=url, description=clean_desc, image_url=image_url, timestamp=None, category=None, source_url=None # Will be set later ) articles.append(article) return articles def extract_metadata(markdown: str, html: str) -> Dict: metadata = { "timestamp": datetime.now().isoformat(), "categories": [], "total_articles": 0 } # Extract categories category_pattern = r'##\s+\[(.*?)\]' categories = re.findall(category_pattern, markdown) if categories: metadata["categories"] = [cat.strip() for cat in categories] return metadata @app.post("/crawl", response_model=CrawlResponse) async def crawl_url(request: CrawlRequest): try: # Force cache mode to DISABLED cache_mode = CacheMode.DISABLED # Configure markdown generator if request.subject: content_filter = BM25ContentFilter( user_query=request.subject, bm25_threshold=1.2 ) else: content_filter = PruningContentFilter( threshold=0.48, threshold_type="fixed", min_word_threshold=50 ) # Create options dictionary with ignore_images options = {"ignore_images": True} # Add ignore_links if requested if request.ignore_links: options["ignore_links"] = True md_generator = DefaultMarkdownGenerator( content_filter=content_filter, options=options ) # Create crawler with configuration async with AsyncWebCrawler() as crawler: config = CrawlerRunConfig( cache_mode=cache_mode, # Always DISABLED excluded_tags=request.excluded_tags, remove_overlay_elements=request.remove_overlay_elements, markdown_generator=md_generator, exclude_external_links=True, exclude_social_media_links=True, exclude_external_images=True, exclude_domains=["facebook.com", "twitter.com", "instagram.com", "youtube.com", "tiktok.com", "pinterest.com"] ) result = await crawler.arun( url=str(request.url), config=config ) # Process results markdown = result.markdown_v2.raw_markdown html = result.html articles = extract_articles(markdown) metadata = extract_metadata(markdown, html) metadata["subject"] = request.subject for article in articles: article.source_url = str(request.url) return CrawlResponse( url=str(request.url), success=result.success, metadata=metadata, articles=articles, raw_markdown=markdown if result.success else None, stats={ "total_links": len(result.links) if result.links else 0, "processing_time": result.processing_time if hasattr(result, 'processing_time') else None } ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/") def read_root(): return { "message": "Welcome to Crawl4AI API", "docs": "/docs", "redoc": "/redoc" } if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)