File size: 12,211 Bytes
6f509ec
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
"""
URL Frontier implementation for web crawler

The URL Frontier maintains URLs to be crawled with two main goals:
1. Prioritization - Important URLs are crawled first
2. Politeness - Avoid overloading web servers with too many requests
"""

import time
import logging
import heapq
import pickle
import threading
import random
from typing import Dict, List, Tuple, Optional, Any, Set
from collections import deque
import redis
from redis.exceptions import RedisError
import mmh3
import os
import json

from models import URL, Priority, URLStatus
import config

# Import local configuration if available
try:
    import local_config
    # Override config settings with local settings
    for key in dir(local_config):
        if key.isupper():
            setattr(config, key, getattr(local_config, key))
    logging.info("Loaded local configuration")
except ImportError:
    pass

# Configure logging
logging.basicConfig(
    level=getattr(logging, config.LOG_LEVEL),
    format=config.LOG_FORMAT
)
logger = logging.getLogger(__name__)


class URLFrontier:
    """
    URL Frontier implementation with prioritization and politeness
    
    Architecture:
    - Front queues: Priority-based queues
    - Back queues: Host-based queues for politeness
    
    This uses Redis for persistent storage to handle large number of URLs
    and enable distributed crawling. In deployment mode, it can also use
    in-memory storage.
    """
    
    def __init__(self, redis_client: Optional[redis.Redis] = None, use_memory: bool = False):
        """Initialize the URL Frontier"""
        self.use_memory = use_memory
        if use_memory:
            # Initialize in-memory storage
            self.memory_storage = {
                'seen_urls': set(),
                'priority_queues': [[] for _ in range(config.PRIORITY_QUEUE_NUM)],
                'host_queues': [[] for _ in range(config.HOST_QUEUE_NUM)]
            }
        else:
            # Use Redis
            self.redis = redis_client or redis.from_url(config.REDIS_URI)
            
        self.priority_count = config.PRIORITY_QUEUE_NUM  # Number of priority queues
        self.host_count = config.HOST_QUEUE_NUM  # Number of host queues
        self.url_seen_key = "webcrawler:url_seen"  # Bloom filter for seen URLs
        self.priority_queue_key_prefix = "webcrawler:priority_queue:"
        self.host_queue_key_prefix = "webcrawler:host_queue:"
        self.lock = threading.RLock()  # Thread-safe operations
        
        # Simple mode uses Redis Set instead of Bloom filter
        self.use_simple_mode = getattr(config, 'USE_SIMPLE_URL_SEEN', False)
        logger.info(f"URLFrontier using simple mode: {self.use_simple_mode}")
        
        # Ensure directory for checkpoint exists
        if not os.path.exists(config.STORAGE_PATH):
            os.makedirs(config.STORAGE_PATH)
        
        # Initialize URL seen storage
        if not self.use_memory:
            self._init_url_seen()
    
    def _init_url_seen(self):
        """Initialize URL seen storage based on configuration"""
        try:
            # If using simple mode, just use a Redis set
            if self.use_simple_mode:
                if not self.redis.exists(self.url_seen_key):
                    logger.info("Initializing URL seen set")
                    self.redis.sadd(self.url_seen_key, "initialized")
                return
                
            # Try to use Bloom filter
            if not self.redis.exists(self.url_seen_key):
                logger.info("Initializing URL seen bloom filter")
                try:
                    # Use a bloom filter with 100 million items and 0.01 false positive rate
                    # This requires approximately 119.5 MB of memory
                    self.redis.execute_command("BF.RESERVE", self.url_seen_key, 0.01, 100000000)
                except RedisError as e:
                    logger.error(f"Failed to initialize bloom filter: {e}")
                    logger.info("Falling back to simple set for URL seen detection")
                    self.use_simple_mode = True
                    # Initialize a set instead
                    if not self.redis.exists(self.url_seen_key):
                        self.redis.sadd(self.url_seen_key, "initialized")
        except RedisError as e:
            logger.error(f"Error initializing URL seen: {e}")
            # Fallback to set if bloom filter is not available
            self.use_simple_mode = True
            if not self.redis.exists(self.url_seen_key):
                self.redis.sadd(self.url_seen_key, "initialized")
    
    def add_url(self, url_obj: URL) -> bool:
        """Add a URL to the frontier"""
        with self.lock:
            url = url_obj.url
            
            # Check if URL has been seen
            if self.use_memory:
                if url in self.memory_storage['seen_urls']:
                    return False
                self.memory_storage['seen_urls'].add(url)
            else:
                if self.use_simple_mode:
                    if self.redis.sismember(self.url_seen_key, url):
                        return False
                    self.redis.sadd(self.url_seen_key, url)
                else:
                    if self._check_url_seen(url):
                        return False
                    self._mark_url_seen(url)
            
            # Add to priority queue
            priority_index = url_obj.priority.value % self.priority_count
            if self.use_memory:
                self.memory_storage['priority_queues'][priority_index].append(url_obj)
            else:
                priority_key = f"{self.priority_queue_key_prefix}{priority_index}"
                self.redis.rpush(priority_key, url_obj.json())
            
            return True
    
    def get_next_url(self) -> Optional[URL]:
        """Get the next URL to crawl"""
        with self.lock:
            # Try each priority queue
            for i in range(self.priority_count):
                if self.use_memory:
                    queue = self.memory_storage['priority_queues'][i]
                    if queue:
                        return queue.pop(0)
                else:
                    priority_key = f"{self.priority_queue_key_prefix}{i}"
                    url_data = self.redis.lpop(priority_key)
                    if url_data:
                        return URL.parse_raw(url_data)
            return None
    
    def _check_url_seen(self, url: str) -> bool:
        """Check if URL has been seen"""
        if self.use_memory:
            return url in self.memory_storage['seen_urls']
        elif self.use_simple_mode:
            return self.redis.sismember(self.url_seen_key, url)
        else:
            # Using Redis Bloom filter
            return bool(self.redis.getbit(self.url_seen_key, self._hash_url(url)))
    
    def _mark_url_seen(self, url: str) -> None:
        """Mark URL as seen"""
        if self.use_memory:
            self.memory_storage['seen_urls'].add(url)
        elif self.use_simple_mode:
            self.redis.sadd(self.url_seen_key, url)
        else:
            # Using Redis Bloom filter
            self.redis.setbit(self.url_seen_key, self._hash_url(url), 1)
    
    def _hash_url(self, url: str) -> int:
        """Hash URL for Bloom filter"""
        return hash(url) % (1 << 32)  # 32-bit hash
    
    def size(self) -> int:
        """Get the total size of all queues"""
        if self.use_memory:
            return sum(len(q) for q in self.memory_storage['priority_queues'])
        else:
            total = 0
            for i in range(self.priority_count):
                priority_key = f"{self.priority_queue_key_prefix}{i}"
                total += self.redis.llen(priority_key)
            return total
    
    def get_stats(self) -> Dict[str, Any]:
        """Get frontier statistics"""
        stats = {
            "size": self.size(),
            "priority_queues": {},
            "host_queues": {},
        }
        
        try:
            # Get priority queue stats
            for priority in range(1, self.priority_count + 1):
                queue_key = f"{self.priority_queue_key_prefix}{priority}"
                stats["priority_queues"][f"priority_{priority}"] = self.redis.llen(queue_key)
            
            # Get host queue stats (just count total host queues with items)
            host_queue_count = 0
            for host_id in range(self.host_count):
                queue_key = f"{self.host_queue_key_prefix}{host_id}"
                if self.redis.llen(queue_key) > 0:
                    host_queue_count += 1
            
            stats["host_queues"]["count_with_items"] = host_queue_count
            
            # Add URLs seen count if using simple mode
            if self.use_simple_mode:
                stats["urls_seen"] = self.redis.scard(self.url_seen_key)
            
            return stats
        except RedisError as e:
            logger.error(f"Error getting frontier stats: {e}")
            return stats
    
    def checkpoint(self) -> bool:
        """Save frontier state"""
        if self.use_memory:
            # No need to checkpoint in-memory storage
            return True
            
        try:
            # Save priority queues
            for i in range(self.priority_count):
                priority_key = f"{self.priority_queue_key_prefix}{i}"
                queue_data = []
                while True:
                    url_data = self.redis.lpop(priority_key)
                    if not url_data:
                        break
                    queue_data.append(url_data)
                
                # Save to file
                checkpoint_file = os.path.join(config.STORAGE_PATH, f"priority_queue_{i}.json")
                with open(checkpoint_file, 'w') as f:
                    json.dump(queue_data, f)
                
                # Restore queue
                for url_data in reversed(queue_data):
                    self.redis.rpush(priority_key, url_data)
            
            return True
        except Exception as e:
            logger.error(f"Error creating frontier checkpoint: {e}")
            return False
    
    def restore(self) -> bool:
        """Restore frontier state"""
        if self.use_memory:
            # No need to restore in-memory storage
            return True
            
        try:
            # Restore priority queues
            for i in range(self.priority_count):
                checkpoint_file = os.path.join(config.STORAGE_PATH, f"priority_queue_{i}.json")
                if os.path.exists(checkpoint_file):
                    with open(checkpoint_file, 'r') as f:
                        queue_data = json.load(f)
                    
                    # Clear existing queue
                    priority_key = f"{self.priority_queue_key_prefix}{i}"
                    self.redis.delete(priority_key)
                    
                    # Restore queue
                    for url_data in queue_data:
                        self.redis.rpush(priority_key, url_data)
            
            return True
        except Exception as e:
            logger.error(f"Error restoring frontier checkpoint: {e}")
            return False
    
    def clear(self) -> bool:
        """
        Clear all queues in the frontier
        
        Returns:
            bool: True if successful, False otherwise
        """
        try:
            # Delete all queue keys
            keys = []
            for priority in range(1, self.priority_count + 1):
                keys.append(f"{self.priority_queue_key_prefix}{priority}")
            
            for host_id in range(self.host_count):
                keys.append(f"{self.host_queue_key_prefix}{host_id}")
            
            if keys:
                self.redis.delete(*keys)
            
            # Reset URL seen filter (optional)
            self.redis.delete(self.url_seen_key)
            
            logger.info("Frontier cleared")
            return True
        except Exception as e:
            logger.error(f"Error clearing frontier: {e}")
            return False