AI_SEO_Crawler / dns_resolver.py
sagarnildass's picture
Upload folder using huggingface_hub
6f509ec verified
"""
DNS resolver with caching for web crawler
"""
import socket
import logging
import time
from typing import Dict, Optional, Tuple
from urllib.parse import urlparse
from datetime import datetime, timedelta
from cachetools import TTLCache
import threading
import dns
import dns.resolver
import config
# Import local configuration if available
try:
import local_config
# Override config settings with local settings
for key in dir(local_config):
if key.isupper():
setattr(config, key, getattr(local_config, key))
logging.info("Loaded local configuration")
except ImportError:
pass
# Configure logging
logging.basicConfig(
level=getattr(logging, config.LOG_LEVEL),
format=config.LOG_FORMAT
)
logger = logging.getLogger(__name__)
class DNSResolver:
"""
DNS resolver with caching to improve performance
DNS resolution can be a bottleneck for crawlers due to the synchronous
nature of many DNS interfaces. This class provides a cached resolver
to reduce the number of DNS lookups.
"""
def __init__(self, cache_size: int = 10000, cache_ttl: int = 3600):
"""
Initialize DNS resolver
Args:
cache_size: Maximum number of DNS records to cache
cache_ttl: Time to live for cache entries in seconds
"""
self.cache = TTLCache(maxsize=cache_size, ttl=cache_ttl)
self.lock = threading.RLock() # Thread-safe operations
self.resolver = dns.resolver.Resolver()
self.resolver.timeout = 3.0 # Timeout for DNS requests in seconds
self.resolver.lifetime = 5.0 # Total timeout for all DNS requests
# Stats tracking
self.hit_count = 0
self.miss_count = 0
def resolve(self, url: str) -> Optional[str]:
"""
Resolve a URL to an IP address
Args:
url: URL to resolve
Returns:
IP address or None if resolution fails
"""
try:
parsed = urlparse(url)
hostname = parsed.netloc.split(':')[0] # Remove port if present
# Check cache first
with self.lock:
if hostname in self.cache:
logger.debug(f"DNS cache hit for {hostname}")
self.hit_count += 1
return self.cache[hostname]
# Cache miss - resolve hostname
ip_address = self._resolve_hostname(hostname)
# Update cache
if ip_address:
with self.lock:
self.cache[hostname] = ip_address
self.miss_count += 1
return ip_address
except Exception as e:
logger.warning(f"Error resolving DNS for {url}: {e}")
return None
def _resolve_hostname(self, hostname: str) -> Optional[str]:
"""
Resolve hostname to IP address
Args:
hostname: Hostname to resolve
Returns:
IP address or None if resolution fails
"""
try:
# First try using dnspython for more control
answers = self.resolver.resolve(hostname, 'A')
if answers:
# Return first IP address
return str(answers[0])
except dns.exception.DNSException as e:
logger.debug(f"dnspython DNS resolution failed for {hostname}: {e}")
# Fall back to socket.gethostbyname
try:
return socket.gethostbyname(hostname)
except socket.gaierror as e:
logger.warning(f"Socket DNS resolution failed for {hostname}: {e}")
return None
def bulk_resolve(self, urls: list) -> Dict[str, Optional[str]]:
"""
Resolve multiple URLs to IP addresses
Args:
urls: List of URLs to resolve
Returns:
Dictionary mapping URLs to IP addresses
"""
results = {}
for url in urls:
results[url] = self.resolve(url)
return results
def clear_cache(self) -> None:
"""Clear the DNS cache"""
with self.lock:
self.cache.clear()
def get_stats(self) -> Dict[str, int]:
"""
Get statistics about the DNS cache
Returns:
Dictionary with cache statistics
"""
with self.lock:
return {
'size': len(self.cache),
'max_size': self.cache.maxsize,
'ttl': self.cache.ttl,
'hit_count': self.hit_count,
'miss_count': self.miss_count,
'hit_ratio': self.hit_count / (self.hit_count + self.miss_count) if (self.hit_count + self.miss_count) > 0 else 0
}