AI_SEO_Crawler / models.py
sagarnildass's picture
Upload folder using huggingface_hub
13e0903 verified
"""
Data models for the web crawler
"""
import time
import hashlib
import tldextract
from urllib.parse import urlparse, urljoin, urlunparse
from datetime import datetime
from typing import Dict, List, Any, Optional, Set, Tuple
from pydantic import BaseModel, Field, HttpUrl, field_validator
from enum import Enum
import logging
logger = logging.getLogger(__name__)
class URLStatus(str, Enum):
"""Status of a URL in the crawl process"""
PENDING = "pending" # Not yet processed
IN_PROGRESS = "in_progress" # Currently being processed
COMPLETED = "completed" # Successfully processed
FAILED = "failed" # Failed to process
FILTERED = "filtered" # Filtered out based on rules
ROBOTSTXT_EXCLUDED = "robotstxt_excluded" # Excluded by robots.txt
class Priority(int, Enum):
"""Priority levels for URLs"""
VERY_HIGH = 1
HIGH = 2
MEDIUM = 3
LOW = 4
VERY_LOW = 5
class URL(BaseModel):
"""URL model with metadata for crawling"""
url: str
normalized_url: str = "" # Normalized version of the URL
domain: str = "" # Domain extracted from the URL
depth: int = 0 # Depth from seed URL
discovered_at: datetime = Field(default_factory=datetime.now)
last_crawled: Optional[datetime] = None
completed_at: Optional[datetime] = None # When the URL was completed/failed
status: URLStatus = URLStatus.PENDING
priority: Priority = Priority.MEDIUM
parent_url: Optional[str] = None # URL that led to this URL
retries: int = 0 # Number of times retried
error: Optional[str] = None # Error message if failed
metadata: Dict[str, Any] = Field(default_factory=dict) # Additional metadata
@field_validator("normalized_url", mode="before")
def set_normalized_url(cls, v, values):
"""Normalize the URL if not already set"""
if not v and "url" in values:
return normalize_url(values["url"])
return v
@field_validator("domain", mode="before")
def set_domain(cls, v, values):
"""Extract domain from URL if not already set"""
if not v and "url" in values:
parsed = tldextract.extract(values["url"])
return f"{parsed.domain}.{parsed.suffix}" if parsed.suffix else parsed.domain
return v
class Config:
arbitrary_types_allowed = True
class RobotsInfo(BaseModel):
"""Information from robots.txt for a domain"""
domain: str
allowed: bool = True # Whether crawling is allowed
crawl_delay: Optional[float] = None # Crawl delay in seconds
last_fetched: datetime = Field(default_factory=datetime.now)
user_agents: Dict[str, Dict[str, Any]] = Field(default_factory=dict) # Info per user agent
status_code: Optional[int] = None # HTTP status code when fetching robots.txt
class Config:
arbitrary_types_allowed = True
class Page(BaseModel):
"""Web page model with content and metadata"""
url: str
status_code: int
content: str # HTML content
content_type: str
content_length: int
content_hash: str # Hash of the content for duplicate detection
headers: Dict[str, str] = Field(default_factory=dict)
links: List[str] = Field(default_factory=list) # Links extracted from the page
crawled_at: datetime = Field(default_factory=datetime.now)
redirect_url: Optional[str] = None # URL after redirects
elapsed_time: float = 0.0 # Time taken to fetch the page
is_duplicate: bool = False # Whether this is duplicate content
metadata: Dict[str, Any] = Field(default_factory=dict) # Additional metadata
class Config:
arbitrary_types_allowed = True
class DomainStats(BaseModel):
"""Statistics for a domain"""
domain: str
pages_crawled: int = 0
successful_crawls: int = 0
failed_crawls: int = 0
last_crawled: Optional[datetime] = None
robots_info: Optional[RobotsInfo] = None
crawl_times: List[float] = Field(default_factory=list) # Recent crawl times
errors: Dict[int, int] = Field(default_factory=dict) # Status code counts for errors
class Config:
arbitrary_types_allowed = True
def normalize_url(url: str) -> str:
"""
Normalize a URL by:
1. Converting to lowercase
2. Removing fragments
3. Removing default ports
4. Sorting query parameters
5. Removing trailing slashes
6. Adding scheme if missing
"""
try:
# Parse URL
parsed = urlparse(url)
# Add scheme if missing
if not parsed.scheme:
url = 'http://' + url
parsed = urlparse(url)
# Get domain and path
domain = parsed.netloc.lower()
path = parsed.path
# Remove default ports
if ':' in domain:
domain_parts = domain.split(':')
if (parsed.scheme == 'http' and domain_parts[1] == '80') or \
(parsed.scheme == 'https' and domain_parts[1] == '443'):
domain = domain_parts[0]
# Sort query parameters
query = parsed.query
if query:
query_params = sorted(query.split('&'))
query = '&'.join(query_params)
# Remove trailing slashes from path
while path.endswith('/') and len(path) > 1:
path = path[:-1]
# Add leading slash if missing
if not path:
path = '/'
# Reconstruct URL
normalized = f"{parsed.scheme}://{domain}{path}"
if query:
normalized += f"?{query}"
logger.debug(f"Normalized URL: {url} -> {normalized}")
return normalized
except Exception as e:
logger.error(f"Error normalizing URL {url}: {e}")
return url
def calculate_content_hash(content: str) -> str:
"""Calculate hash of content for duplicate detection"""
return hashlib.md5(content.encode('utf-8')).hexdigest()