Spaces:
Sleeping
Sleeping
""" | |
URL Frontier implementation for web crawler | |
The URL Frontier maintains URLs to be crawled with two main goals: | |
1. Prioritization - Important URLs are crawled first | |
2. Politeness - Avoid overloading web servers with too many requests | |
""" | |
import time | |
import logging | |
import heapq | |
import pickle | |
import threading | |
import random | |
from typing import Dict, List, Tuple, Optional, Any, Set | |
from collections import deque | |
import redis | |
from redis.exceptions import RedisError | |
import mmh3 | |
import os | |
import json | |
from models import URL, Priority, URLStatus | |
import config | |
# Import local configuration if available | |
try: | |
import local_config | |
# Override config settings with local settings | |
for key in dir(local_config): | |
if key.isupper(): | |
setattr(config, key, getattr(local_config, key)) | |
logging.info("Loaded local configuration") | |
except ImportError: | |
pass | |
# Configure logging | |
logging.basicConfig( | |
level=getattr(logging, config.LOG_LEVEL), | |
format=config.LOG_FORMAT | |
) | |
logger = logging.getLogger(__name__) | |
class URLFrontier: | |
""" | |
URL Frontier implementation with prioritization and politeness | |
Architecture: | |
- Front queues: Priority-based queues | |
- Back queues: Host-based queues for politeness | |
This uses Redis for persistent storage to handle large number of URLs | |
and enable distributed crawling. In deployment mode, it can also use | |
in-memory storage. | |
""" | |
def __init__(self, redis_client: Optional[redis.Redis] = None, use_memory: bool = False): | |
"""Initialize the URL Frontier""" | |
self.use_memory = use_memory | |
if use_memory: | |
# Initialize in-memory storage | |
self.memory_storage = { | |
'seen_urls': set(), | |
'priority_queues': [[] for _ in range(config.PRIORITY_QUEUE_NUM)], | |
'host_queues': [[] for _ in range(config.HOST_QUEUE_NUM)] | |
} | |
else: | |
# Use Redis | |
self.redis = redis_client or redis.from_url(config.REDIS_URI) | |
self.priority_count = config.PRIORITY_QUEUE_NUM # Number of priority queues | |
self.host_count = config.HOST_QUEUE_NUM # Number of host queues | |
self.url_seen_key = "webcrawler:url_seen" # Bloom filter for seen URLs | |
self.priority_queue_key_prefix = "webcrawler:priority_queue:" | |
self.host_queue_key_prefix = "webcrawler:host_queue:" | |
self.lock = threading.RLock() # Thread-safe operations | |
# Simple mode uses Redis Set instead of Bloom filter | |
self.use_simple_mode = getattr(config, 'USE_SIMPLE_URL_SEEN', False) | |
logger.info(f"URLFrontier using simple mode: {self.use_simple_mode}") | |
# Ensure directory for checkpoint exists | |
if not os.path.exists(config.STORAGE_PATH): | |
os.makedirs(config.STORAGE_PATH) | |
# Initialize URL seen storage | |
if not self.use_memory: | |
self._init_url_seen() | |
def _init_url_seen(self): | |
"""Initialize URL seen storage based on configuration""" | |
try: | |
# If using simple mode, just use a Redis set | |
if self.use_simple_mode: | |
if not self.redis.exists(self.url_seen_key): | |
logger.info("Initializing URL seen set") | |
self.redis.sadd(self.url_seen_key, "initialized") | |
return | |
# Try to use Bloom filter | |
if not self.redis.exists(self.url_seen_key): | |
logger.info("Initializing URL seen bloom filter") | |
try: | |
# Use a bloom filter with 100 million items and 0.01 false positive rate | |
# This requires approximately 119.5 MB of memory | |
self.redis.execute_command("BF.RESERVE", self.url_seen_key, 0.01, 100000000) | |
except RedisError as e: | |
logger.error(f"Failed to initialize bloom filter: {e}") | |
logger.info("Falling back to simple set for URL seen detection") | |
self.use_simple_mode = True | |
# Initialize a set instead | |
if not self.redis.exists(self.url_seen_key): | |
self.redis.sadd(self.url_seen_key, "initialized") | |
except RedisError as e: | |
logger.error(f"Error initializing URL seen: {e}") | |
# Fallback to set if bloom filter is not available | |
self.use_simple_mode = True | |
if not self.redis.exists(self.url_seen_key): | |
self.redis.sadd(self.url_seen_key, "initialized") | |
def add_url(self, url_obj: URL) -> bool: | |
"""Add a URL to the frontier""" | |
with self.lock: | |
url = url_obj.url | |
# Check if URL has been seen | |
if self.use_memory: | |
if url in self.memory_storage['seen_urls']: | |
return False | |
self.memory_storage['seen_urls'].add(url) | |
else: | |
if self.use_simple_mode: | |
if self.redis.sismember(self.url_seen_key, url): | |
return False | |
self.redis.sadd(self.url_seen_key, url) | |
else: | |
if self._check_url_seen(url): | |
return False | |
self._mark_url_seen(url) | |
# Add to priority queue | |
priority_index = url_obj.priority.value % self.priority_count | |
if self.use_memory: | |
self.memory_storage['priority_queues'][priority_index].append(url_obj) | |
else: | |
priority_key = f"{self.priority_queue_key_prefix}{priority_index}" | |
self.redis.rpush(priority_key, url_obj.json()) | |
return True | |
def get_next_url(self) -> Optional[URL]: | |
"""Get the next URL to crawl""" | |
with self.lock: | |
# Try each priority queue | |
for i in range(self.priority_count): | |
if self.use_memory: | |
queue = self.memory_storage['priority_queues'][i] | |
if queue: | |
return queue.pop(0) | |
else: | |
priority_key = f"{self.priority_queue_key_prefix}{i}" | |
url_data = self.redis.lpop(priority_key) | |
if url_data: | |
return URL.parse_raw(url_data) | |
return None | |
def _check_url_seen(self, url: str) -> bool: | |
"""Check if URL has been seen""" | |
if self.use_memory: | |
return url in self.memory_storage['seen_urls'] | |
elif self.use_simple_mode: | |
return self.redis.sismember(self.url_seen_key, url) | |
else: | |
# Using Redis Bloom filter | |
return bool(self.redis.getbit(self.url_seen_key, self._hash_url(url))) | |
def _mark_url_seen(self, url: str) -> None: | |
"""Mark URL as seen""" | |
if self.use_memory: | |
self.memory_storage['seen_urls'].add(url) | |
elif self.use_simple_mode: | |
self.redis.sadd(self.url_seen_key, url) | |
else: | |
# Using Redis Bloom filter | |
self.redis.setbit(self.url_seen_key, self._hash_url(url), 1) | |
def _hash_url(self, url: str) -> int: | |
"""Hash URL for Bloom filter""" | |
return hash(url) % (1 << 32) # 32-bit hash | |
def size(self) -> int: | |
"""Get the total size of all queues""" | |
if self.use_memory: | |
return sum(len(q) for q in self.memory_storage['priority_queues']) | |
else: | |
total = 0 | |
for i in range(self.priority_count): | |
priority_key = f"{self.priority_queue_key_prefix}{i}" | |
total += self.redis.llen(priority_key) | |
return total | |
def get_stats(self) -> Dict[str, Any]: | |
"""Get frontier statistics""" | |
stats = { | |
"size": self.size(), | |
"priority_queues": {}, | |
"host_queues": {}, | |
} | |
try: | |
# Get priority queue stats | |
for priority in range(1, self.priority_count + 1): | |
queue_key = f"{self.priority_queue_key_prefix}{priority}" | |
stats["priority_queues"][f"priority_{priority}"] = self.redis.llen(queue_key) | |
# Get host queue stats (just count total host queues with items) | |
host_queue_count = 0 | |
for host_id in range(self.host_count): | |
queue_key = f"{self.host_queue_key_prefix}{host_id}" | |
if self.redis.llen(queue_key) > 0: | |
host_queue_count += 1 | |
stats["host_queues"]["count_with_items"] = host_queue_count | |
# Add URLs seen count if using simple mode | |
if self.use_simple_mode: | |
stats["urls_seen"] = self.redis.scard(self.url_seen_key) | |
return stats | |
except RedisError as e: | |
logger.error(f"Error getting frontier stats: {e}") | |
return stats | |
def checkpoint(self) -> bool: | |
"""Save frontier state""" | |
if self.use_memory: | |
# No need to checkpoint in-memory storage | |
return True | |
try: | |
# Save priority queues | |
for i in range(self.priority_count): | |
priority_key = f"{self.priority_queue_key_prefix}{i}" | |
queue_data = [] | |
while True: | |
url_data = self.redis.lpop(priority_key) | |
if not url_data: | |
break | |
queue_data.append(url_data) | |
# Save to file | |
checkpoint_file = os.path.join(config.STORAGE_PATH, f"priority_queue_{i}.json") | |
with open(checkpoint_file, 'w') as f: | |
json.dump(queue_data, f) | |
# Restore queue | |
for url_data in reversed(queue_data): | |
self.redis.rpush(priority_key, url_data) | |
return True | |
except Exception as e: | |
logger.error(f"Error creating frontier checkpoint: {e}") | |
return False | |
def restore(self) -> bool: | |
"""Restore frontier state""" | |
if self.use_memory: | |
# No need to restore in-memory storage | |
return True | |
try: | |
# Restore priority queues | |
for i in range(self.priority_count): | |
checkpoint_file = os.path.join(config.STORAGE_PATH, f"priority_queue_{i}.json") | |
if os.path.exists(checkpoint_file): | |
with open(checkpoint_file, 'r') as f: | |
queue_data = json.load(f) | |
# Clear existing queue | |
priority_key = f"{self.priority_queue_key_prefix}{i}" | |
self.redis.delete(priority_key) | |
# Restore queue | |
for url_data in queue_data: | |
self.redis.rpush(priority_key, url_data) | |
return True | |
except Exception as e: | |
logger.error(f"Error restoring frontier checkpoint: {e}") | |
return False | |
def clear(self) -> bool: | |
""" | |
Clear all queues in the frontier | |
Returns: | |
bool: True if successful, False otherwise | |
""" | |
try: | |
# Delete all queue keys | |
keys = [] | |
for priority in range(1, self.priority_count + 1): | |
keys.append(f"{self.priority_queue_key_prefix}{priority}") | |
for host_id in range(self.host_count): | |
keys.append(f"{self.host_queue_key_prefix}{host_id}") | |
if keys: | |
self.redis.delete(*keys) | |
# Reset URL seen filter (optional) | |
self.redis.delete(self.url_seen_key) | |
logger.info("Frontier cleared") | |
return True | |
except Exception as e: | |
logger.error(f"Error clearing frontier: {e}") | |
return False |