Spaces:
Sleeping
Sleeping
File size: 6,811 Bytes
6f509ec |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
"""
Robots.txt handler for web crawler
"""
import time
import logging
import requests
from urllib.parse import urlparse, urljoin
from typing import Dict, Optional, Tuple
import tldextract
from datetime import datetime, timedelta
from cachetools import TTLCache
import robotexclusionrulesparser
from models import RobotsInfo
import config
# Import local configuration if available
try:
import local_config
# Override config settings with local settings
for key in dir(local_config):
if key.isupper():
setattr(config, key, getattr(local_config, key))
logging.info("Loaded local configuration")
except ImportError:
pass
# Configure logging
logging.basicConfig(
level=getattr(logging, config.LOG_LEVEL),
format=config.LOG_FORMAT
)
logger = logging.getLogger(__name__)
class RobotsHandler:
"""Handles robots.txt fetching and parsing"""
def __init__(self, user_agent: Optional[str] = None, cache_size: int = 1000, cache_ttl: int = 3600):
"""
Initialize robots handler
Args:
user_agent: User agent to use when fetching robots.txt
cache_size: Maximum number of robots.txt rules to cache
cache_ttl: Time to live for cache entries in seconds
"""
self.user_agent = user_agent or config.USER_AGENT
self.parser = robotexclusionrulesparser.RobotExclusionRulesParser()
# Cache of robots.txt rules for domains
self.robots_cache = TTLCache(maxsize=cache_size, ttl=cache_ttl)
# Create request session
self.session = requests.Session()
self.session.headers.update({'User-Agent': self.user_agent})
def can_fetch(self, url: str) -> Tuple[bool, Optional[float]]:
"""
Check if URL can be fetched according to robots.txt
Args:
url: URL to check
Returns:
Tuple of (can_fetch, crawl_delay), where crawl_delay is in seconds
"""
try:
parsed = urlparse(url)
base_url = f"{parsed.scheme}://{parsed.netloc}"
domain = self._get_domain(url)
# Check if robots info is in cache
robots_info = self._get_robots_info(base_url, domain)
# Check if allowed
path = parsed.path or "/"
allowed = robots_info.allowed
if allowed:
allowed = self.parser.is_allowed(self.user_agent, path)
# Get crawl delay
crawl_delay = robots_info.crawl_delay
if not crawl_delay and hasattr(self.parser, 'get_crawl_delay'):
try:
crawl_delay = float(self.parser.get_crawl_delay(self.user_agent) or 0)
except:
crawl_delay = 0
return allowed, crawl_delay
except Exception as e:
logger.warning(f"Error checking robots.txt for {url}: {e}")
# In case of error, assume allowed
return True, None
def _get_robots_info(self, base_url: str, domain: str) -> RobotsInfo:
"""
Get robots.txt info for a domain
Args:
base_url: Base URL of the domain
domain: Domain name
Returns:
RobotsInfo object
"""
# Check if in cache
if domain in self.robots_cache:
return self.robots_cache[domain]
# Fetch robots.txt
robots_url = urljoin(base_url, "/robots.txt")
try:
response = self.session.get(
robots_url,
timeout=config.CRAWL_TIMEOUT,
allow_redirects=True
)
status_code = response.status_code
# If robots.txt exists
if status_code == 200:
# Parse robots.txt
self.parser.parse(response.text)
# Create simpler user agents info that doesn't depend on get_user_agents
user_agents = {}
# Just store info for our specific user agent
crawl_delay = None
if hasattr(self.parser, 'get_crawl_delay'):
try:
crawl_delay = self.parser.get_crawl_delay(self.user_agent)
except:
crawl_delay = None
user_agents[self.user_agent] = {
'crawl_delay': crawl_delay
}
# Create robots info
robots_info = RobotsInfo(
domain=domain,
allowed=True,
crawl_delay=crawl_delay,
last_fetched=datetime.now(),
user_agents=user_agents,
status_code=status_code
)
else:
# If no robots.txt or error, assume allowed
self.parser.parse("") # Parse empty robots.txt
robots_info = RobotsInfo(
domain=domain,
allowed=True,
crawl_delay=None,
last_fetched=datetime.now(),
user_agents={},
status_code=status_code
)
# Cache robots info
self.robots_cache[domain] = robots_info
return robots_info
except requests.RequestException as e:
logger.warning(f"Error fetching robots.txt from {robots_url}: {e}")
# In case of error, assume allowed
self.parser.parse("") # Parse empty robots.txt
robots_info = RobotsInfo(
domain=domain,
allowed=True,
crawl_delay=None,
last_fetched=datetime.now(),
user_agents={},
status_code=None
)
# Cache robots info
self.robots_cache[domain] = robots_info
return robots_info
def _get_domain(self, url: str) -> str:
"""Extract domain from URL"""
parsed = tldextract.extract(url)
return f"{parsed.domain}.{parsed.suffix}" if parsed.suffix else parsed.domain
def clear_cache(self) -> None:
"""Clear the robots.txt cache"""
self.robots_cache.clear()
def update_cache(self, domain: str) -> None:
"""
Force update of a domain's robots.txt in the cache
Args:
domain: Domain to update
"""
if domain in self.robots_cache:
del self.robots_cache[domain] |