Spaces:
Sleeping
Sleeping
""" | |
Storage component for the web crawler. | |
Handles storing and retrieving crawled web pages using: | |
1. MongoDB for metadata, URL information, and crawl stats | |
2. Disk-based storage for HTML content | |
3. Optional Amazon S3 integration for scalable storage | |
""" | |
import os | |
import logging | |
import time | |
import datetime | |
import hashlib | |
import json | |
import gzip | |
import shutil | |
from typing import Dict, List, Optional, Union, Any, Tuple | |
from urllib.parse import urlparse | |
import pymongo | |
from pymongo import MongoClient, UpdateOne | |
from pymongo.errors import PyMongoError, BulkWriteError | |
import boto3 | |
from botocore.exceptions import ClientError | |
from models import Page, URL | |
import config | |
# Configure logging | |
logging.basicConfig( | |
level=getattr(logging, config.LOG_LEVEL), | |
format=config.LOG_FORMAT | |
) | |
logger = logging.getLogger(__name__) | |
class StorageManager: | |
""" | |
Storage manager for web crawler data | |
Handles: | |
- MongoDB for metadata, URL information, and stats | |
- Disk-based storage for HTML content | |
- Optional Amazon S3 integration | |
""" | |
def __init__(self, | |
mongo_uri: Optional[str] = None, | |
use_s3: bool = False, | |
compress_html: bool = True, | |
max_disk_usage_gb: float = 100.0): | |
""" | |
Initialize the storage manager | |
Args: | |
mongo_uri: MongoDB connection URI | |
use_s3: Whether to use Amazon S3 for HTML storage | |
compress_html: Whether to compress HTML content | |
max_disk_usage_gb: Maximum disk space to use in GB | |
""" | |
self.mongo_uri = mongo_uri or config.MONGODB_URI | |
self.use_s3 = use_s3 | |
self.compress_html = compress_html | |
self.max_disk_usage_gb = max_disk_usage_gb | |
# Connect to MongoDB | |
self.mongo_client = MongoClient(self.mongo_uri) | |
self.db = self.mongo_client[config.MONGODB_DB] | |
# MongoDB collections | |
self.pages_collection = self.db['pages'] | |
self.urls_collection = self.db['urls'] | |
self.stats_collection = self.db['stats'] | |
# Create necessary indexes | |
self._create_indexes() | |
# S3 client (if enabled) | |
self.s3_client = None | |
if self.use_s3: | |
self._init_s3_client() | |
# Ensure storage directories exist | |
self._ensure_directories() | |
# Bulk operation buffers | |
self.page_buffer = [] | |
self.url_buffer = [] | |
self.max_buffer_size = 100 | |
# Statistics | |
self.stats = { | |
'pages_stored': 0, | |
'pages_retrieved': 0, | |
'urls_stored': 0, | |
'urls_retrieved': 0, | |
'disk_space_used': 0, | |
's3_objects_stored': 0, | |
'mongodb_size': 0, | |
'storage_errors': 0, | |
'start_time': time.time() | |
} | |
def _create_indexes(self) -> None: | |
"""Create necessary indexes in MongoDB collections""" | |
try: | |
# Pages collection indexes | |
self.pages_collection.create_index('url', unique=True) | |
self.pages_collection.create_index('content_hash') | |
self.pages_collection.create_index('crawled_at') | |
self.pages_collection.create_index('domain') | |
# URLs collection indexes | |
self.urls_collection.create_index('url', unique=True) | |
self.urls_collection.create_index('normalized_url') | |
self.urls_collection.create_index('domain') | |
self.urls_collection.create_index('status') | |
self.urls_collection.create_index('priority') | |
self.urls_collection.create_index('last_crawled') | |
logger.info("MongoDB indexes created") | |
except PyMongoError as e: | |
logger.error(f"Error creating MongoDB indexes: {e}") | |
self.stats['storage_errors'] += 1 | |
def _init_s3_client(self) -> None: | |
"""Initialize AWS S3 client""" | |
try: | |
self.s3_client = boto3.client( | |
's3', | |
aws_access_key_id=config.AWS_ACCESS_KEY, | |
aws_secret_access_key=config.AWS_SECRET_KEY, | |
region_name=config.AWS_REGION | |
) | |
logger.info("S3 client initialized") | |
# Create bucket if it doesn't exist | |
self._ensure_s3_bucket() | |
except Exception as e: | |
logger.error(f"Error initializing S3 client: {e}") | |
self.use_s3 = False | |
self.stats['storage_errors'] += 1 | |
def _ensure_s3_bucket(self) -> None: | |
"""Create S3 bucket if it doesn't exist""" | |
if not self.s3_client: | |
return | |
try: | |
# Check if bucket exists | |
self.s3_client.head_bucket(Bucket=config.S3_BUCKET) | |
logger.info(f"S3 bucket '{config.S3_BUCKET}' exists") | |
except ClientError as e: | |
error_code = e.response.get('Error', {}).get('Code') | |
if error_code == '404': | |
# Bucket doesn't exist, create it | |
try: | |
self.s3_client.create_bucket( | |
Bucket=config.S3_BUCKET, | |
CreateBucketConfiguration={ | |
'LocationConstraint': config.AWS_REGION | |
} | |
) | |
logger.info(f"Created S3 bucket '{config.S3_BUCKET}'") | |
except ClientError as ce: | |
logger.error(f"Error creating S3 bucket: {ce}") | |
self.use_s3 = False | |
self.stats['storage_errors'] += 1 | |
else: | |
logger.error(f"Error checking S3 bucket: {e}") | |
self.use_s3 = False | |
self.stats['storage_errors'] += 1 | |
def _ensure_directories(self) -> None: | |
"""Ensure storage directories exist""" | |
# Create main storage directory | |
os.makedirs(config.STORAGE_PATH, exist_ok=True) | |
# Create HTML storage directory | |
os.makedirs(config.HTML_STORAGE_PATH, exist_ok=True) | |
# Create log directory | |
os.makedirs(config.LOG_PATH, exist_ok=True) | |
logger.info("Storage directories created") | |
def store_page(self, page: Page, flush: bool = False) -> bool: | |
""" | |
Store a crawled page | |
Args: | |
page: Page object to store | |
flush: Whether to flush page buffer immediately | |
Returns: | |
True if successful, False otherwise | |
""" | |
try: | |
# Store page content based on configuration | |
if self.use_s3: | |
content_stored = self._store_content_s3(page) | |
else: | |
content_stored = self._store_content_disk(page) | |
if not content_stored: | |
logger.warning(f"Failed to store content for {page.url}") | |
self.stats['storage_errors'] += 1 | |
return False | |
# Remove HTML content from page object for MongoDB storage | |
page_dict = page.dict(exclude={'content'}) | |
# Convert datetime fields to proper format | |
if page.crawled_at: | |
page_dict['crawled_at'] = page.crawled_at | |
# Add to buffer | |
self.page_buffer.append( | |
UpdateOne( | |
{'url': page.url}, | |
{'$set': page_dict}, | |
upsert=True | |
) | |
) | |
# Update statistics | |
self.stats['pages_stored'] += 1 | |
# Check if buffer should be flushed | |
if flush or len(self.page_buffer) >= self.max_buffer_size: | |
return self.flush_page_buffer() | |
return True | |
except Exception as e: | |
logger.error(f"Error storing page {page.url}: {e}") | |
self.stats['storage_errors'] += 1 | |
return False | |
def _store_content_disk(self, page: Page) -> bool: | |
""" | |
Store page content on disk | |
Args: | |
page: Page to store | |
Returns: | |
True if successful, False otherwise | |
""" | |
try: | |
# Check disk space | |
if not self._check_disk_space(): | |
logger.warning("Disk space limit exceeded") | |
return False | |
# Create directory for domain if it doesn't exist | |
domain = self._extract_domain(page.url) | |
domain_dir = os.path.join(config.HTML_STORAGE_PATH, domain) | |
os.makedirs(domain_dir, exist_ok=True) | |
# Create filename | |
filename = self._url_to_filename(page.url) | |
# Full path for the file | |
if self.compress_html: | |
filepath = os.path.join(domain_dir, f"{filename}.gz") | |
# Compress and write HTML to file | |
with gzip.open(filepath, 'wt', encoding='utf-8') as f: | |
f.write(page.content) | |
else: | |
filepath = os.path.join(domain_dir, f"{filename}.html") | |
# Write HTML to file | |
with open(filepath, 'w', encoding='utf-8') as f: | |
f.write(page.content) | |
# Update disk space used | |
file_size = os.path.getsize(filepath) | |
self.stats['disk_space_used'] += file_size | |
logger.debug(f"Stored HTML content for {page.url} at {filepath}") | |
return True | |
except Exception as e: | |
logger.error(f"Error storing content on disk for {page.url}: {e}") | |
self.stats['storage_errors'] += 1 | |
return False | |
def _store_content_s3(self, page: Page) -> bool: | |
""" | |
Store page content in S3 | |
Args: | |
page: Page to store | |
Returns: | |
True if successful, False otherwise | |
""" | |
if not self.s3_client: | |
logger.warning("S3 client not initialized, falling back to disk storage") | |
return self._store_content_disk(page) | |
try: | |
# Create key for S3 object | |
domain = self._extract_domain(page.url) | |
filename = self._url_to_filename(page.url) | |
# S3 key | |
s3_key = f"{domain}/{filename}" | |
if self.compress_html: | |
s3_key += ".gz" | |
# Compress content | |
content_bytes = gzip.compress(page.content.encode('utf-8')) | |
content_type = 'application/gzip' | |
else: | |
s3_key += ".html" | |
content_bytes = page.content.encode('utf-8') | |
content_type = 'text/html' | |
# Upload to S3 | |
self.s3_client.put_object( | |
Bucket=config.S3_BUCKET, | |
Key=s3_key, | |
Body=content_bytes, | |
ContentType=content_type, | |
Metadata={ | |
'url': page.url, | |
'crawled_at': page.crawled_at.isoformat() if page.crawled_at else '', | |
'content_hash': page.content_hash or '' | |
} | |
) | |
# Update statistics | |
self.stats['s3_objects_stored'] += 1 | |
logger.debug(f"Stored HTML content for {page.url} in S3 at {s3_key}") | |
return True | |
except Exception as e: | |
logger.error(f"Error storing content in S3 for {page.url}: {e}") | |
self.stats['storage_errors'] += 1 | |
# Fall back to disk storage | |
logger.info(f"Falling back to disk storage for {page.url}") | |
return self._store_content_disk(page) | |
def store_url(self, url_obj: URL, flush: bool = False) -> bool: | |
""" | |
Store URL information | |
Args: | |
url_obj: URL object to store | |
flush: Whether to flush URL buffer immediately | |
Returns: | |
True if successful, False otherwise | |
""" | |
try: | |
# Convert URL object to dict | |
url_dict = url_obj.dict() | |
# Add to buffer | |
self.url_buffer.append( | |
UpdateOne( | |
{'url': url_obj.url}, | |
{'$set': url_dict}, | |
upsert=True | |
) | |
) | |
# Update statistics | |
self.stats['urls_stored'] += 1 | |
# Check if buffer should be flushed | |
if flush or len(self.url_buffer) >= self.max_buffer_size: | |
return self.flush_url_buffer() | |
return True | |
except Exception as e: | |
logger.error(f"Error storing URL {url_obj.url}: {e}") | |
self.stats['storage_errors'] += 1 | |
return False | |
def flush_page_buffer(self) -> bool: | |
""" | |
Flush page buffer to MongoDB | |
Returns: | |
True if successful, False otherwise | |
""" | |
if not self.page_buffer: | |
return True | |
try: | |
# Execute bulk operation | |
result = self.pages_collection.bulk_write(self.page_buffer, ordered=False) | |
# Clear buffer | |
buffer_size = len(self.page_buffer) | |
self.page_buffer = [] | |
logger.debug(f"Flushed {buffer_size} pages to MongoDB") | |
return True | |
except BulkWriteError as e: | |
logger.error(f"Error in bulk write for pages: {e.details}") | |
self.stats['storage_errors'] += 1 | |
# Clear buffer | |
self.page_buffer = [] | |
return False | |
except Exception as e: | |
logger.error(f"Error flushing page buffer: {e}") | |
self.stats['storage_errors'] += 1 | |
# Clear buffer | |
self.page_buffer = [] | |
return False | |
def flush_url_buffer(self) -> bool: | |
""" | |
Flush URL buffer to MongoDB | |
Returns: | |
True if successful, False otherwise | |
""" | |
if not self.url_buffer: | |
return True | |
try: | |
# Execute bulk operation | |
result = self.urls_collection.bulk_write(self.url_buffer, ordered=False) | |
# Clear buffer | |
buffer_size = len(self.url_buffer) | |
self.url_buffer = [] | |
logger.debug(f"Flushed {buffer_size} URLs to MongoDB") | |
return True | |
except BulkWriteError as e: | |
logger.error(f"Error in bulk write for URLs: {e.details}") | |
self.stats['storage_errors'] += 1 | |
# Clear buffer | |
self.url_buffer = [] | |
return False | |
except Exception as e: | |
logger.error(f"Error flushing URL buffer: {e}") | |
self.stats['storage_errors'] += 1 | |
# Clear buffer | |
self.url_buffer = [] | |
return False | |
def get_page(self, url: str) -> Optional[Page]: | |
""" | |
Retrieve a page by URL | |
Args: | |
url: URL of the page to retrieve | |
Returns: | |
Page object if found, None otherwise | |
""" | |
try: | |
# Get page metadata from MongoDB | |
page_doc = self.pages_collection.find_one({'url': url}) | |
if not page_doc: | |
return None | |
# Create Page object from document | |
page = Page(**page_doc) | |
# Load content based on configuration | |
if self.use_s3: | |
content = self._load_content_s3(url) | |
else: | |
content = self._load_content_disk(url) | |
if content: | |
page.content = content | |
# Update statistics | |
self.stats['pages_retrieved'] += 1 | |
return page | |
except Exception as e: | |
logger.error(f"Error retrieving page {url}: {e}") | |
self.stats['storage_errors'] += 1 | |
return None | |
def _load_content_disk(self, url: str) -> Optional[str]: | |
""" | |
Load page content from disk | |
Args: | |
url: URL of the page | |
Returns: | |
Page content if found, None otherwise | |
""" | |
try: | |
# Get domain and filename | |
domain = self._extract_domain(url) | |
filename = self._url_to_filename(url) | |
# Check for compressed file first | |
compressed_path = os.path.join(config.HTML_STORAGE_PATH, domain, f"{filename}.gz") | |
uncompressed_path = os.path.join(config.HTML_STORAGE_PATH, domain, f"{filename}.html") | |
if os.path.exists(compressed_path): | |
# Load compressed content | |
with gzip.open(compressed_path, 'rt', encoding='utf-8') as f: | |
return f.read() | |
elif os.path.exists(uncompressed_path): | |
# Load uncompressed content | |
with open(uncompressed_path, 'r', encoding='utf-8') as f: | |
return f.read() | |
else: | |
logger.warning(f"Content file not found for {url}") | |
return None | |
except Exception as e: | |
logger.error(f"Error loading content from disk for {url}: {e}") | |
self.stats['storage_errors'] += 1 | |
return None | |
def _load_content_s3(self, url: str) -> Optional[str]: | |
""" | |
Load page content from S3 | |
Args: | |
url: URL of the page | |
Returns: | |
Page content if found, None otherwise | |
""" | |
if not self.s3_client: | |
logger.warning("S3 client not initialized, falling back to disk loading") | |
return self._load_content_disk(url) | |
try: | |
# Get domain and filename | |
domain = self._extract_domain(url) | |
filename = self._url_to_filename(url) | |
# Try both compressed and uncompressed keys | |
s3_key_compressed = f"{domain}/{filename}.gz" | |
s3_key_uncompressed = f"{domain}/{filename}.html" | |
try: | |
# Try compressed file first | |
response = self.s3_client.get_object( | |
Bucket=config.S3_BUCKET, | |
Key=s3_key_compressed | |
) | |
# Decompress content | |
content_bytes = response['Body'].read() | |
return gzip.decompress(content_bytes).decode('utf-8') | |
except ClientError as e: | |
if e.response['Error']['Code'] == 'NoSuchKey': | |
# Try uncompressed file | |
try: | |
response = self.s3_client.get_object( | |
Bucket=config.S3_BUCKET, | |
Key=s3_key_uncompressed | |
) | |
content_bytes = response['Body'].read() | |
return content_bytes.decode('utf-8') | |
except ClientError as e2: | |
if e2.response['Error']['Code'] == 'NoSuchKey': | |
logger.warning(f"Content not found in S3 for {url}") | |
# Try loading from disk as fallback | |
return self._load_content_disk(url) | |
else: | |
raise e2 | |
else: | |
raise e | |
except Exception as e: | |
logger.error(f"Error loading content from S3 for {url}: {e}") | |
self.stats['storage_errors'] += 1 | |
# Try loading from disk as fallback | |
return self._load_content_disk(url) | |
def get_url(self, url: str) -> Optional[URL]: | |
""" | |
Retrieve URL information by URL | |
Args: | |
url: URL to retrieve | |
Returns: | |
URL object if found, None otherwise | |
""" | |
try: | |
# Get URL information from MongoDB | |
url_doc = self.urls_collection.find_one({'url': url}) | |
if not url_doc: | |
return None | |
# Create URL object from document | |
url_obj = URL(**url_doc) | |
# Update statistics | |
self.stats['urls_retrieved'] += 1 | |
return url_obj | |
except Exception as e: | |
logger.error(f"Error retrieving URL {url}: {e}") | |
self.stats['storage_errors'] += 1 | |
return None | |
def get_urls_by_status(self, status: str, limit: int = 100) -> List[URL]: | |
""" | |
Retrieve URLs by status | |
Args: | |
status: Status of URLs to retrieve | |
limit: Maximum number of URLs to retrieve | |
Returns: | |
List of URL objects | |
""" | |
try: | |
# Get URLs from MongoDB | |
url_docs = list(self.urls_collection.find({'status': status}).limit(limit)) | |
# Create URL objects from documents | |
url_objs = [URL(**doc) for doc in url_docs] | |
# Update statistics | |
self.stats['urls_retrieved'] += len(url_objs) | |
return url_objs | |
except Exception as e: | |
logger.error(f"Error retrieving URLs by status {status}: {e}") | |
self.stats['storage_errors'] += 1 | |
return [] | |
def get_urls_by_domain(self, domain: str, limit: int = 100) -> List[URL]: | |
""" | |
Retrieve URLs by domain | |
Args: | |
domain: Domain of URLs to retrieve | |
limit: Maximum number of URLs to retrieve | |
Returns: | |
List of URL objects | |
""" | |
try: | |
# Get URLs from MongoDB | |
url_docs = list(self.urls_collection.find({'domain': domain}).limit(limit)) | |
# Create URL objects from documents | |
url_objs = [URL(**doc) for doc in url_docs] | |
# Update statistics | |
self.stats['urls_retrieved'] += len(url_objs) | |
return url_objs | |
except Exception as e: | |
logger.error(f"Error retrieving URLs by domain {domain}: {e}") | |
self.stats['storage_errors'] += 1 | |
return [] | |
def store_stats(self, stats: Dict[str, Any]) -> bool: | |
""" | |
Store crawler statistics | |
Args: | |
stats: Statistics to store | |
Returns: | |
True if successful, False otherwise | |
""" | |
try: | |
# Create statistics document | |
stats_doc = stats.copy() | |
stats_doc['timestamp'] = datetime.datetime.now() | |
# Convert sets to lists for MongoDB | |
for key, value in stats_doc.items(): | |
if isinstance(value, set): | |
stats_doc[key] = list(value) | |
# Store in MongoDB | |
self.stats_collection.insert_one(stats_doc) | |
return True | |
except Exception as e: | |
logger.error(f"Error storing statistics: {e}") | |
self.stats['storage_errors'] += 1 | |
return False | |
def _check_disk_space(self) -> bool: | |
""" | |
Check if disk space limit is exceeded | |
Returns: | |
True if space is available, False otherwise | |
""" | |
# Convert max disk usage to bytes | |
max_bytes = self.max_disk_usage_gb * 1024 * 1024 * 1024 | |
# Check if limit is exceeded | |
return self.stats['disk_space_used'] < max_bytes | |
def _extract_domain(self, url: str) -> str: | |
"""Extract domain from URL""" | |
parsed = urlparse(url) | |
return parsed.netloc.replace(':', '_') | |
def _url_to_filename(self, url: str) -> str: | |
"""Convert URL to filename""" | |
# Hash the URL to create a safe filename | |
return hashlib.md5(url.encode('utf-8')).hexdigest() | |
def clean_old_pages(self, days: int = 90) -> int: | |
""" | |
Remove pages older than a specified number of days | |
Args: | |
days: Number of days after which pages are considered old | |
Returns: | |
Number of pages removed | |
""" | |
try: | |
# Calculate cutoff date | |
cutoff_date = datetime.datetime.now() - datetime.timedelta(days=days) | |
# Find old pages | |
old_pages = list(self.pages_collection.find({ | |
'crawled_at': {'$lt': cutoff_date} | |
}, {'url': 1})) | |
if not old_pages: | |
logger.info(f"No pages older than {days} days found") | |
return 0 | |
# Remove from database | |
delete_result = self.pages_collection.delete_many({ | |
'crawled_at': {'$lt': cutoff_date} | |
}) | |
# Remove content files | |
count = 0 | |
for page in old_pages: | |
url = page['url'] | |
domain = self._extract_domain(url) | |
filename = self._url_to_filename(url) | |
# Check disk | |
compressed_path = os.path.join(config.HTML_STORAGE_PATH, domain, f"{filename}.gz") | |
uncompressed_path = os.path.join(config.HTML_STORAGE_PATH, domain, f"{filename}.html") | |
if os.path.exists(compressed_path): | |
os.remove(compressed_path) | |
count += 1 | |
if os.path.exists(uncompressed_path): | |
os.remove(uncompressed_path) | |
count += 1 | |
# Check S3 | |
if self.s3_client: | |
s3_key_compressed = f"{domain}/{filename}.gz" | |
s3_key_uncompressed = f"{domain}/{filename}.html" | |
try: | |
self.s3_client.delete_object( | |
Bucket=config.S3_BUCKET, | |
Key=s3_key_compressed | |
) | |
count += 1 | |
except: | |
pass | |
try: | |
self.s3_client.delete_object( | |
Bucket=config.S3_BUCKET, | |
Key=s3_key_uncompressed | |
) | |
count += 1 | |
except: | |
pass | |
logger.info(f"Removed {delete_result.deleted_count} old pages and {count} content files") | |
return delete_result.deleted_count | |
except Exception as e: | |
logger.error(f"Error cleaning old pages: {e}") | |
self.stats['storage_errors'] += 1 | |
return 0 | |
def clean_failed_urls(self, retries: int = 3) -> int: | |
""" | |
Remove URLs that have failed repeatedly | |
Args: | |
retries: Number of retries after which a URL is considered permanently failed | |
Returns: | |
Number of URLs removed | |
""" | |
try: | |
# Delete failed URLs with too many retries | |
delete_result = self.urls_collection.delete_many({ | |
'status': 'FAILED', | |
'retries': {'$gte': retries} | |
}) | |
logger.info(f"Removed {delete_result.deleted_count} permanently failed URLs") | |
return delete_result.deleted_count | |
except Exception as e: | |
logger.error(f"Error cleaning failed URLs: {e}") | |
self.stats['storage_errors'] += 1 | |
return 0 | |
def calculate_storage_stats(self) -> Dict[str, Any]: | |
""" | |
Calculate storage statistics | |
Returns: | |
Dictionary of storage statistics | |
""" | |
stats = { | |
'timestamp': datetime.datetime.now(), | |
'pages_count': 0, | |
'urls_count': 0, | |
'disk_space_used_mb': 0, | |
's3_objects_count': 0, | |
'mongodb_size_mb': 0, | |
} | |
try: | |
# Count pages and URLs | |
stats['pages_count'] = self.pages_collection.count_documents({}) | |
stats['urls_count'] = self.urls_collection.count_documents({}) | |
# Calculate disk space used | |
total_size = 0 | |
for root, _, files in os.walk(config.HTML_STORAGE_PATH): | |
total_size += sum(os.path.getsize(os.path.join(root, name)) for name in files) | |
stats['disk_space_used_mb'] = total_size / (1024 * 1024) | |
# Calculate MongoDB size | |
db_stats = self.db.command('dbStats') | |
stats['mongodb_size_mb'] = db_stats['storageSize'] / (1024 * 1024) | |
# Count S3 objects if enabled | |
if self.s3_client: | |
try: | |
s3_objects = 0 | |
paginator = self.s3_client.get_paginator('list_objects_v2') | |
for page in paginator.paginate(Bucket=config.S3_BUCKET): | |
if 'Contents' in page: | |
s3_objects += len(page['Contents']) | |
stats['s3_objects_count'] = s3_objects | |
except Exception as e: | |
logger.error(f"Error counting S3 objects: {e}") | |
# Update internal statistics | |
self.stats['disk_space_used'] = total_size | |
self.stats['mongodb_size'] = db_stats['storageSize'] | |
return stats | |
except Exception as e: | |
logger.error(f"Error calculating storage statistics: {e}") | |
self.stats['storage_errors'] += 1 | |
return stats | |
def close(self) -> None: | |
"""Close connections and perform cleanup""" | |
# Flush any pending buffers | |
self.flush_page_buffer() | |
self.flush_url_buffer() | |
# Close MongoDB connection | |
if self.mongo_client: | |
self.mongo_client.close() | |
logger.info("MongoDB connection closed") | |
# Log final statistics | |
logger.info(f"Storage manager closed. Pages stored: {self.stats['pages_stored']}, URLs stored: {self.stats['urls_stored']}") |