Spaces:
Sleeping
Sleeping
""" | |
Content deduplication component for the web crawler. | |
Provides functionality to detect duplicate pages efficiently | |
1. Exact content hashing | |
2. Shingling and MinHash for near-duplicate detection | |
3. SimHash for fuzzy matching | |
""" | |
import hashlib | |
import logging | |
import time | |
from typing import Set, List, Dict, Tuple, Optional, Union | |
import random | |
import numpy as np | |
from collections import defaultdict | |
import re | |
import config | |
# Configure logging | |
logging.basicConfig( | |
level=getattr(logging, config.LOG_LEVEL), | |
format=config.LOG_FORMAT | |
) | |
logger = logging.getLogger(__name__) | |
class ContentDeduplicator: | |
""" | |
Content deduplication using multiple techniques: | |
- Exact match (MD5 hash) | |
- Near-duplicate detection (MinHash) | |
- Fuzzy matching (SimHash) | |
""" | |
def __init__(self): | |
"""Initialize the deduplicator""" | |
# Exact content hashing | |
self.content_hashes = set() | |
# MinHash parameters | |
self.num_hashes = 100 | |
self.minhash_signatures = {} # URL -> MinHash signature | |
self.minhash_bands = defaultdict(set) # band_id -> set of URLs | |
self.band_size = 5 # Each band contains 5 signatures | |
self.shingle_size = 3 # k-shingles of 3 consecutive tokens | |
# SimHash parameters | |
self.simhash_dim = 64 | |
self.simhash_values = {} # URL -> SimHash value | |
self.hamming_threshold = 3 # Maximum Hamming distance for similarity | |
# Cache of previously computed duplicates for quick lookups | |
self.duplicate_cache = {} # URL -> set of duplicate URLs | |
# Token preprocessing | |
self.token_pattern = re.compile(r'\w+') | |
self.stop_words = set(['the', 'and', 'a', 'to', 'of', 'in', 'is', 'that', 'for', 'on', 'with']) | |
# Statistics | |
self.stats = { | |
'exact_duplicates': 0, | |
'near_duplicates': 0, | |
'fuzzy_duplicates': 0, | |
'processing_time': 0, | |
'total_documents': 0, | |
} | |
def is_duplicate(self, url: str, content: str) -> Tuple[bool, Optional[str]]: | |
""" | |
Check if content is a duplicate | |
Args: | |
url: URL of the page | |
content: Page content | |
Returns: | |
(is_duplicate, duplicate_url): Tuple indicating if content is duplicate and what it duplicates | |
""" | |
start_time = time.time() | |
# Check exact match first (fastest) | |
content_hash = self._hash_content(content) | |
if content_hash in self.content_hashes: | |
self.stats['exact_duplicates'] += 1 | |
processing_time = time.time() - start_time | |
self.stats['processing_time'] += processing_time | |
# Find the URL with the same hash | |
for existing_url, existing_hash in self._get_hash_map().items(): | |
if existing_hash == content_hash and existing_url != url: | |
logger.debug(f"Exact duplicate detected: {url} duplicates {existing_url}") | |
return True, existing_url | |
return True, None | |
# Check cache for quick lookup | |
if url in self.duplicate_cache: | |
duplicate_url = next(iter(self.duplicate_cache[url])) | |
logger.debug(f"Duplicate found in cache: {url} duplicates {duplicate_url}") | |
return True, duplicate_url | |
# Only perform more expensive checks if configured to do so | |
if config.NEAR_DUPLICATE_DETECTION: | |
# Check for near-duplicates using MinHash | |
near_duplicate = self._check_minhash(url, content) | |
if near_duplicate: | |
self.stats['near_duplicates'] += 1 | |
processing_time = time.time() - start_time | |
self.stats['processing_time'] += processing_time | |
logger.debug(f"Near-duplicate detected: {url} is similar to {near_duplicate}") | |
self._add_to_duplicate_cache(url, near_duplicate) | |
return True, near_duplicate | |
if config.FUZZY_DUPLICATE_DETECTION: | |
# Check for fuzzy matches using SimHash | |
fuzzy_duplicate = self._check_simhash(url, content) | |
if fuzzy_duplicate: | |
self.stats['fuzzy_duplicates'] += 1 | |
processing_time = time.time() - start_time | |
self.stats['processing_time'] += processing_time | |
logger.debug(f"Fuzzy duplicate detected: {url} is similar to {fuzzy_duplicate}") | |
self._add_to_duplicate_cache(url, fuzzy_duplicate) | |
return True, fuzzy_duplicate | |
# Not a duplicate, add to index | |
self._add_to_index(url, content, content_hash) | |
self.stats['total_documents'] += 1 | |
processing_time = time.time() - start_time | |
self.stats['processing_time'] += processing_time | |
return False, None | |
def _add_to_duplicate_cache(self, url: str, duplicate_url: str) -> None: | |
"""Add URL to duplicate cache for faster lookups""" | |
if url not in self.duplicate_cache: | |
self.duplicate_cache[url] = set() | |
self.duplicate_cache[url].add(duplicate_url) | |
# Also add reverse relationship | |
if duplicate_url not in self.duplicate_cache: | |
self.duplicate_cache[duplicate_url] = set() | |
self.duplicate_cache[duplicate_url].add(url) | |
def _get_hash_map(self) -> Dict[str, str]: | |
"""Get mapping of URLs to their content hashes""" | |
return {url: signature for url, signature in self.simhash_values.items()} | |
def _hash_content(self, content: str) -> str: | |
"""Create MD5 hash of content""" | |
return hashlib.md5(content.encode('utf-8')).hexdigest() | |
def _preprocess_content(self, content: str) -> List[str]: | |
""" | |
Preprocess content for tokenization: | |
1. Convert to lowercase | |
2. Remove HTML tags | |
3. Extract tokens | |
4. Remove stop words | |
""" | |
# Remove HTML tags | |
content = re.sub(r'<[^>]+>', ' ', content) | |
# Tokenize | |
tokens = self.token_pattern.findall(content.lower()) | |
# Remove stop words | |
tokens = [token for token in tokens if token not in self.stop_words] | |
return tokens | |
def _add_to_index(self, url: str, content: str, content_hash: Optional[str] = None) -> None: | |
""" | |
Add content to the deduplication index | |
Args: | |
url: URL of the page | |
content: Page content | |
content_hash: Optional pre-computed hash | |
""" | |
# Add exact hash | |
if content_hash is None: | |
content_hash = self._hash_content(content) | |
self.content_hashes.add(content_hash) | |
# Add MinHash signature | |
if config.NEAR_DUPLICATE_DETECTION: | |
signature = self._compute_minhash(content) | |
self.minhash_signatures[url] = signature | |
# Add to LSH bands | |
for i in range(0, self.num_hashes, self.band_size): | |
band = tuple(signature[i:i+self.band_size]) | |
band_id = hash(band) | |
self.minhash_bands[band_id].add(url) | |
# Add SimHash | |
if config.FUZZY_DUPLICATE_DETECTION: | |
simhash_value = self._compute_simhash(content) | |
self.simhash_values[url] = simhash_value | |
def _create_shingles(self, tokens: List[str], k: int = 3) -> Set[str]: | |
""" | |
Create k-shingles from tokens | |
Args: | |
tokens: List of tokens | |
k: Size of shingles | |
Returns: | |
Set of shingles | |
""" | |
return set(' '.join(tokens[i:i+k]) for i in range(len(tokens) - k + 1)) | |
def _compute_minhash(self, content: str) -> List[int]: | |
""" | |
Compute MinHash signature for content | |
Args: | |
content: Page content | |
Returns: | |
MinHash signature (list of hash values) | |
""" | |
tokens = self._preprocess_content(content) | |
shingles = self._create_shingles(tokens, self.shingle_size) | |
# Generate random hash functions | |
max_hash = 2**32 - 1 | |
# Create signature | |
signature = [max_hash] * self.num_hashes | |
# For each shingle, compute hashes and keep minimum values | |
for shingle in shingles: | |
# Use shingle as seed for random hash functions | |
shingle_hash = hash(shingle) | |
for i in range(self.num_hashes): | |
# Simple linear hash function: (a*x + b) mod c | |
a = i + 1 # Different 'a' for each hash function | |
b = i * i # Different 'b' for each hash function | |
hash_value = (a * shingle_hash + b) % max_hash | |
# Keep the minimum hash value | |
signature[i] = min(signature[i], hash_value) | |
return signature | |
def _check_minhash(self, url: str, content: str) -> Optional[str]: | |
""" | |
Check for near-duplicates using MinHash and LSH | |
Args: | |
url: URL of the page | |
content: Page content | |
Returns: | |
URL of duplicate page if found, None otherwise | |
""" | |
# Compute MinHash signature | |
signature = self._compute_minhash(content) | |
# Check each band for potential matches | |
candidate_urls = set() | |
for i in range(0, self.num_hashes, self.band_size): | |
band = tuple(signature[i:i+self.band_size]) | |
band_id = hash(band) | |
# Get URLs that share this band | |
if band_id in self.minhash_bands: | |
candidate_urls.update(self.minhash_bands[band_id]) | |
# Check Jaccard similarity with candidates | |
for candidate_url in candidate_urls: | |
if candidate_url == url: | |
continue | |
candidate_signature = self.minhash_signatures[candidate_url] | |
similarity = self._jaccard_similarity(signature, candidate_signature) | |
if similarity >= config.SIMILARITY_THRESHOLD: | |
return candidate_url | |
return None | |
def _jaccard_similarity(self, sig1: List[int], sig2: List[int]) -> float: | |
""" | |
Estimate Jaccard similarity from MinHash signatures | |
Args: | |
sig1: First signature | |
sig2: Second signature | |
Returns: | |
Estimated Jaccard similarity (0-1) | |
""" | |
if len(sig1) != len(sig2): | |
raise ValueError("Signatures must have the same length") | |
# Count matching hash values | |
matches = sum(1 for i in range(len(sig1)) if sig1[i] == sig2[i]) | |
# Estimate similarity | |
return matches / len(sig1) | |
def _compute_simhash(self, content: str) -> int: | |
""" | |
Compute SimHash for content | |
Args: | |
content: Page content | |
Returns: | |
SimHash value | |
""" | |
tokens = self._preprocess_content(content) | |
# Initialize vector | |
v = [0] * self.simhash_dim | |
# For each token, compute hash and update vector | |
for token in tokens: | |
# Compute hash of token | |
token_hash = hashlib.md5(token.encode('utf-8')).digest() | |
# Convert to binary representation | |
token_bits = ''.join(format(byte, '08b') for byte in token_hash) | |
# Use first self.simhash_dim bits | |
token_bits = token_bits[:self.simhash_dim] | |
# Update vector | |
for i, bit in enumerate(token_bits): | |
if bit == '1': | |
v[i] += 1 | |
else: | |
v[i] -= 1 | |
# Create fingerprint | |
fingerprint = 0 | |
for i, val in enumerate(v): | |
if val > 0: | |
fingerprint |= (1 << i) | |
return fingerprint | |
def _check_simhash(self, url: str, content: str) -> Optional[str]: | |
""" | |
Check for fuzzy duplicates using SimHash | |
Args: | |
url: URL of the page | |
content: Page content | |
Returns: | |
URL of duplicate page if found, None otherwise | |
""" | |
# Compute SimHash | |
simhash_value = self._compute_simhash(content) | |
# Compare with existing SimHash values | |
for existing_url, existing_simhash in self.simhash_values.items(): | |
if existing_url == url: | |
continue | |
# Calculate Hamming distance | |
hamming_distance = bin(simhash_value ^ existing_simhash).count('1') | |
if hamming_distance <= self.hamming_threshold: | |
return existing_url | |
return None | |
def clear(self) -> None: | |
"""Clear all indexes and caches""" | |
self.content_hashes.clear() | |
self.minhash_signatures.clear() | |
self.minhash_bands.clear() | |
self.simhash_values.clear() | |
self.duplicate_cache.clear() | |
# Reset statistics | |
self.stats = { | |
'exact_duplicates': 0, | |
'near_duplicates': 0, | |
'fuzzy_duplicates': 0, | |
'processing_time': 0, | |
'total_documents': 0, | |
} | |
def get_stats(self) -> Dict[str, Union[int, float]]: | |
"""Get deduplication statistics""" | |
stats_copy = self.stats.copy() | |
# Calculate average processing time | |
total_docs = self.stats['total_documents'] | |
if total_docs > 0: | |
avg_time = self.stats['processing_time'] / total_docs | |
stats_copy['avg_processing_time'] = avg_time | |
else: | |
stats_copy['avg_processing_time'] = 0 | |
# Calculate total duplicates | |
total_duplicates = (self.stats['exact_duplicates'] + | |
self.stats['near_duplicates'] + | |
self.stats['fuzzy_duplicates']) | |
stats_copy['total_duplicates'] = total_duplicates | |
# Calculate duplicate percentage | |
if total_docs > 0: | |
duplicate_percentage = (total_duplicates / total_docs) * 100 | |
stats_copy['duplicate_percentage'] = duplicate_percentage | |
else: | |
stats_copy['duplicate_percentage'] = 0 | |
return stats_copy |