AI_SEO_Crawler / api.py
sagarnildass's picture
Upload folder using huggingface_hub
6f509ec verified
"""
Web API for the web crawler.
This module provides a FastAPI-based web API for controlling and monitoring the web crawler.
"""
import os
import sys
import time
import json
import logging
import datetime
from typing import List, Dict, Any, Optional
from fastapi import FastAPI, HTTPException, Query, Path, BackgroundTasks, Depends
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from pydantic import BaseModel, HttpUrl, Field
import uvicorn
from crawler import Crawler
from models import URL, URLStatus, Priority
import config
# Configure logging
logging.basicConfig(
level=getattr(logging, config.LOG_LEVEL),
format=config.LOG_FORMAT
)
logger = logging.getLogger(__name__)
# Create FastAPI app
app = FastAPI(
title="Web Crawler API",
description="API for controlling and monitoring the web crawler",
version="1.0.0"
)
# Enable CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Global crawler instance
crawler = None
def get_crawler() -> Crawler:
"""Get or initialize the crawler instance"""
global crawler
if crawler is None:
crawler = Crawler()
return crawler
# API Models
class SeedURL(BaseModel):
url: HttpUrl
priority: Optional[str] = Field(
default="NORMAL",
description="URL priority (VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW)"
)
class SeedURLs(BaseModel):
urls: List[SeedURL]
class CrawlerStatus(BaseModel):
running: bool
paused: bool
start_time: Optional[float] = None
uptime_seconds: Optional[float] = None
pages_crawled: int
pages_failed: int
urls_discovered: int
urls_filtered: int
domains_crawled: int
frontier_size: int
class CrawlerConfig(BaseModel):
max_depth: int = Field(..., description="Maximum crawl depth")
max_workers: int = Field(..., description="Maximum number of worker threads")
delay_between_requests: float = Field(..., description="Delay between requests to the same domain (seconds)")
class PageDetail(BaseModel):
url: str
domain: str
status_code: int
content_type: str
content_length: int
crawled_at: str
is_seed: bool
depth: int
title: Optional[str] = None
description: Optional[str] = None
class URLDetail(BaseModel):
url: str
normalized_url: str
domain: str
status: str
priority: str
depth: int
parent_url: Optional[str] = None
last_crawled: Optional[str] = None
error: Optional[str] = None
retries: int
class DomainStats(BaseModel):
domain: str
pages_count: int
successful_requests: int
failed_requests: int
avg_page_size: float
content_types: Dict[str, int]
status_codes: Dict[str, int]
# API Routes
@app.get("/")
async def read_root():
"""Root endpoint"""
return {
"name": "Web Crawler API",
"version": "1.0.0",
"description": "API for controlling and monitoring the web crawler",
"endpoints": {
"GET /": "This help message",
"GET /status": "Get crawler status",
"GET /stats": "Get crawler statistics",
"GET /config": "Get crawler configuration",
"PUT /config": "Update crawler configuration",
"POST /start": "Start the crawler",
"POST /stop": "Stop the crawler",
"POST /pause": "Pause the crawler",
"POST /resume": "Resume the crawler",
"GET /pages": "List crawled pages",
"GET /pages/{url}": "Get page details",
"GET /urls": "List discovered URLs",
"GET /urls/{url}": "Get URL details",
"POST /seed": "Add seed URLs",
"GET /domains": "Get domain statistics",
"GET /domains/{domain}": "Get statistics for a specific domain",
}
}
@app.get("/status", response_model=CrawlerStatus)
async def get_status(crawler: Crawler = Depends(get_crawler)):
"""Get crawler status"""
status = {
"running": crawler.running,
"paused": crawler.paused,
"start_time": crawler.stats.get('start_time'),
"uptime_seconds": time.time() - crawler.stats.get('start_time', time.time()) if crawler.running else None,
"pages_crawled": crawler.stats.get('pages_crawled', 0),
"pages_failed": crawler.stats.get('pages_failed', 0),
"urls_discovered": crawler.stats.get('urls_discovered', 0),
"urls_filtered": crawler.stats.get('urls_filtered', 0),
"domains_crawled": len(crawler.stats.get('domains_crawled', set())),
"frontier_size": crawler.frontier.size()
}
return status
@app.get("/stats")
async def get_stats(crawler: Crawler = Depends(get_crawler)):
"""Get detailed crawler statistics"""
stats = crawler.stats.copy()
# Convert sets to lists for JSON serialization
for key, value in stats.items():
if isinstance(value, set):
stats[key] = list(value)
# Add uptime
if stats.get('start_time'):
stats['uptime_seconds'] = time.time() - stats['start_time']
stats['uptime_formatted'] = str(datetime.timedelta(seconds=int(stats['uptime_seconds'])))
# Add DNS cache statistics if available
try:
dns_stats = crawler.dns_resolver.get_stats()
stats['dns_cache'] = dns_stats
except (AttributeError, Exception) as e:
logger.warning(f"Failed to get DNS stats: {e}")
stats['dns_cache'] = {'error': 'Stats not available'}
# Add frontier statistics if available
try:
stats['frontier_size'] = crawler.frontier.size()
if hasattr(crawler.frontier, 'get_stats'):
frontier_stats = crawler.frontier.get_stats()
stats['frontier'] = frontier_stats
else:
stats['frontier'] = {'size': crawler.frontier.size()}
except Exception as e:
logger.warning(f"Failed to get frontier stats: {e}")
stats['frontier'] = {'error': 'Stats not available'}
return stats
@app.get("/config", response_model=CrawlerConfig)
async def get_config():
"""Get crawler configuration"""
return {
"max_depth": config.MAX_DEPTH,
"max_workers": config.MAX_WORKERS,
"delay_between_requests": config.DELAY_BETWEEN_REQUESTS
}
@app.put("/config", response_model=CrawlerConfig)
async def update_config(
crawler_config: CrawlerConfig,
crawler: Crawler = Depends(get_crawler)
):
"""Update crawler configuration"""
# Update configuration
config.MAX_DEPTH = crawler_config.max_depth
config.MAX_WORKERS = crawler_config.max_workers
config.DELAY_BETWEEN_REQUESTS = crawler_config.delay_between_requests
return crawler_config
@app.post("/start")
async def start_crawler(
background_tasks: BackgroundTasks,
num_workers: int = Query(None, description="Number of worker threads"),
async_mode: bool = Query(False, description="Whether to use async mode"),
crawler: Crawler = Depends(get_crawler)
):
"""Start the crawler"""
if crawler.running:
return {"status": "Crawler is already running"}
# Start crawler in background
def start_crawler_task():
try:
crawler.start(num_workers=num_workers, async_mode=async_mode)
except Exception as e:
logger.error(f"Error starting crawler: {e}")
background_tasks.add_task(start_crawler_task)
return {"status": "Crawler starting in background"}
@app.post("/stop")
async def stop_crawler(crawler: Crawler = Depends(get_crawler)):
"""Stop the crawler"""
if not crawler.running:
return {"status": "Crawler is not running"}
crawler.stop()
return {"status": "Crawler stopped"}
@app.post("/pause")
async def pause_crawler(crawler: Crawler = Depends(get_crawler)):
"""Pause the crawler"""
if not crawler.running:
return {"status": "Crawler is not running"}
if crawler.paused:
return {"status": "Crawler is already paused"}
crawler.pause()
return {"status": "Crawler paused"}
@app.post("/resume")
async def resume_crawler(crawler: Crawler = Depends(get_crawler)):
"""Resume the crawler"""
if not crawler.running:
return {"status": "Crawler is not running"}
if not crawler.paused:
return {"status": "Crawler is not paused"}
crawler.resume()
return {"status": "Crawler resumed"}
@app.get("/pages")
async def list_pages(
limit: int = Query(10, ge=1, le=100, description="Number of pages to return"),
offset: int = Query(0, ge=0, description="Offset for pagination"),
domain: Optional[str] = Query(None, description="Filter by domain"),
status_code: Optional[int] = Query(None, description="Filter by HTTP status code"),
crawler: Crawler = Depends(get_crawler)
):
"""List crawled pages"""
# Build query
query = {}
if domain:
query['domain'] = domain
if status_code:
query['status_code'] = status_code
# Execute query
try:
pages = list(crawler.db.pages_collection.find(
query,
{'_id': 0}
).skip(offset).limit(limit))
# Count total pages matching query
total_count = crawler.db.pages_collection.count_documents(query)
return {
"pages": pages,
"total": total_count,
"limit": limit,
"offset": offset
}
except Exception as e:
logger.error(f"Error listing pages: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/pages/{url:path}", response_model=PageDetail)
async def get_page(
url: str,
include_content: bool = Query(False, description="Include page content"),
crawler: Crawler = Depends(get_crawler)
):
"""Get page details"""
try:
# Decode URL from path parameter
url = url.replace("___", "/")
# Find page in database
page = crawler.db.pages_collection.find_one({'url': url}, {'_id': 0})
if not page:
raise HTTPException(status_code=404, detail="Page not found")
# Load content if requested
if include_content:
try:
if crawler.use_s3:
content = crawler._load_content_s3(url)
else:
content = crawler._load_content_disk(url)
if content:
page['content'] = content
except Exception as e:
logger.error(f"Error loading content for {url}: {e}")
page['content'] = None
return page
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting page {url}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/urls")
async def list_urls(
limit: int = Query(10, ge=1, le=100, description="Number of URLs to return"),
offset: int = Query(0, ge=0, description="Offset for pagination"),
status: Optional[str] = Query(None, description="Filter by URL status"),
domain: Optional[str] = Query(None, description="Filter by domain"),
priority: Optional[str] = Query(None, description="Filter by priority"),
crawler: Crawler = Depends(get_crawler)
):
"""List discovered URLs"""
# Build query
query = {}
if status:
query['status'] = status
if domain:
query['domain'] = domain
if priority:
query['priority'] = priority
# Execute query
try:
urls = list(crawler.db.urls_collection.find(
query,
{'_id': 0}
).skip(offset).limit(limit))
# Count total URLs matching query
total_count = crawler.db.urls_collection.count_documents(query)
return {
"urls": urls,
"total": total_count,
"limit": limit,
"offset": offset
}
except Exception as e:
logger.error(f"Error listing URLs: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/urls/{url:path}", response_model=URLDetail)
async def get_url(
url: str,
crawler: Crawler = Depends(get_crawler)
):
"""Get URL details"""
try:
# Decode URL from path parameter
url = url.replace("___", "/")
# Find URL in database
url_obj = crawler.db.urls_collection.find_one({'url': url}, {'_id': 0})
if not url_obj:
raise HTTPException(status_code=404, detail="URL not found")
return url_obj
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting URL {url}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/seed")
async def add_seed_urls(
seed_urls: SeedURLs,
crawler: Crawler = Depends(get_crawler)
):
"""Add seed URLs to the frontier"""
try:
urls_added = 0
for seed in seed_urls.urls:
url = str(seed.url)
priority = getattr(Priority, seed.priority, Priority.NORMAL)
# Create URL object
url_obj = URL(
url=url,
status=URLStatus.PENDING,
priority=priority,
depth=0 # Seed URLs are at depth 0
)
# Add to frontier
if crawler.frontier.add_url(url_obj):
# Save URL to database
crawler.urls_collection.update_one(
{'url': url},
{'$set': url_obj.dict()},
upsert=True
)
urls_added += 1
logger.info(f"Added seed URL: {url}")
return {"status": "success", "urls_added": urls_added}
except Exception as e:
logger.error(f"Error adding seed URLs: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/domains")
async def list_domains(
limit: int = Query(10, ge=1, le=100, description="Number of domains to return"),
offset: int = Query(0, ge=0, description="Offset for pagination"),
crawler: Crawler = Depends(get_crawler)
):
"""Get domain statistics"""
try:
# Get domains with counts
domain_counts = crawler.db.pages_collection.aggregate([
{"$group": {
"_id": "$domain",
"pages_count": {"$sum": 1},
"avg_page_size": {"$avg": "$content_length"}
}},
{"$sort": {"pages_count": -1}},
{"$skip": offset},
{"$limit": limit}
])
# Get total domains count
total_domains = len(crawler.stats.get('domains_crawled', set()))
# Format result
domains = []
for domain in domain_counts:
domains.append({
"domain": domain["_id"],
"pages_count": domain["pages_count"],
"avg_page_size": domain["avg_page_size"]
})
return {
"domains": domains,
"total": total_domains,
"limit": limit,
"offset": offset
}
except Exception as e:
logger.error(f"Error listing domains: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/domains/{domain}", response_model=DomainStats)
async def get_domain_stats(
domain: str,
crawler: Crawler = Depends(get_crawler)
):
"""Get statistics for a specific domain"""
try:
# Get basic domain stats
domain_stats = crawler.db.pages_collection.aggregate([
{"$match": {"domain": domain}},
{"$group": {
"_id": "$domain",
"pages_count": {"$sum": 1},
"successful_requests": {"$sum": {"$cond": [{"$lt": ["$status_code", 400]}, 1, 0]}},
"failed_requests": {"$sum": {"$cond": [{"$gte": ["$status_code", 400]}, 1, 0]}},
"avg_page_size": {"$avg": "$content_length"}
}}
]).next()
# Get content type distribution
content_types = crawler.db.pages_collection.aggregate([
{"$match": {"domain": domain}},
{"$group": {
"_id": "$content_type",
"count": {"$sum": 1}
}}
])
content_type_map = {}
for ct in content_types:
content_type_map[ct["_id"]] = ct["count"]
# Get status code distribution
status_codes = crawler.db.pages_collection.aggregate([
{"$match": {"domain": domain}},
{"$group": {
"_id": "$status_code",
"count": {"$sum": 1}
}}
])
status_code_map = {}
for sc in status_codes:
status_code_map[str(sc["_id"])] = sc["count"]
# Format result
result = {
"domain": domain,
"pages_count": domain_stats["pages_count"],
"successful_requests": domain_stats["successful_requests"],
"failed_requests": domain_stats["failed_requests"],
"avg_page_size": domain_stats["avg_page_size"],
"content_types": content_type_map,
"status_codes": status_code_map
}
return result
except StopIteration:
# Domain not found
raise HTTPException(status_code=404, detail=f"Domain '{domain}' not found")
except Exception as e:
logger.error(f"Error getting domain stats for {domain}: {e}")
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
# Run the API server
uvicorn.run(
"api:app",
host="0.0.0.0",
port=8000,
reload=True
)