""" Vector Database Module This module is responsible for storing and indexing vector embeddings for efficient retrieval using Pinecone with complete functionality. Technology: Pinecone """ import logging import os import time import uuid import hashlib from datetime import datetime from typing import Dict, List, Any, Optional, Union # Import Pinecone and related libraries try: import pinecone from pinecone import Pinecone, ServerlessSpec except ImportError as e: logging.warning(f"Pinecone library not installed: {e}") from utils.error_handler import VectorStorageError, error_handler, ErrorType class VectorDB: """ Stores and indexes vector embeddings for efficient retrieval using Pinecone with full functionality. Features: - Complete Pinecone integration - Index management (create, update, delete) - Batch upsert operations with optimization - Advanced similarity search with metadata filtering - Statistics and monitoring """ def __init__(self, config: Optional[Dict[str, Any]] = None): """ Initialize the VectorDB with configuration. Args: config: Configuration dictionary with Pinecone parameters """ self.config = config or {} self.logger = logging.getLogger(__name__) # Configuration settings self.api_key = self.config.get("api_key", os.environ.get("PINECONE_API_KEY")) self.environment = self.config.get("environment", "us-west1-gcp") self.index_name = self.config.get("index_name", "rag-ai-index") self.dimension = self.config.get( "dimension", 3072 ) # ✅ Fixed: Match Gemini embedding dimension self.metric = self.config.get("metric", "cosine") self.batch_size = self.config.get("batch_size", 100) # Performance settings self.max_metadata_size = self.config.get( "max_metadata_size", 40960 ) # 40KB limit self.upsert_timeout = self.config.get("upsert_timeout", 60) self.query_timeout = self.config.get("query_timeout", 30) # Statistics tracking self.stats = { "vectors_stored": 0, "vectors_queried": 0, "vectors_deleted": 0, "batch_operations": 0, "failed_operations": 0, "start_time": datetime.now(), } # Initialize Pinecone client self.pc = None self.index = None self._initialize_client() def _initialize_client(self): """Initialize Pinecone client and index with validation.""" if not self.api_key: self.logger.warning( "No Pinecone API key provided. Vector storage will not be available." ) return try: # Initialize Pinecone client self.pc = Pinecone(api_key=self.api_key) # Check if index exists, create if not self._ensure_index_exists() # Connect to index self.index = self.pc.Index(self.index_name) # Test connection self._test_connection() self.logger.info( f"Pinecone client initialized successfully with index: {self.index_name}" ) except Exception as e: self.logger.error(f" Failed to initialize Pinecone client: {str(e)}") self.pc = None self.index = None def _ensure_index_exists(self): """Ensure the Pinecone index exists, create if necessary.""" try: # List existing indexes existing_indexes = [index.name for index in self.pc.list_indexes()] if self.index_name not in existing_indexes: self.logger.info(f"Creating new Pinecone index: {self.index_name}") # Create index with serverless spec self.pc.create_index( name=self.index_name, dimension=self.dimension, metric=self.metric, spec=ServerlessSpec(cloud="aws", region=self.environment), ) # Wait for index to be ready self._wait_for_index_ready() self.logger.info(f"Index {self.index_name} created successfully") else: self.logger.info(f"Index {self.index_name} already exists") except Exception as e: raise VectorStorageError(f"Failed to ensure index exists: {str(e)}") def _wait_for_index_ready(self, max_wait_time: int = 300): """Wait for index to be ready for operations.""" start_time = time.time() while time.time() - start_time < max_wait_time: try: index_stats = self.pc.describe_index(self.index_name) if index_stats.status.ready: self.logger.info(f"Index {self.index_name} is ready") return self.logger.info(f"Waiting for index to be ready...") time.sleep(10) except Exception as e: self.logger.warning(f"Error checking index status: {str(e)}") time.sleep(5) raise VectorStorageError( f"Index {self.index_name} not ready after {max_wait_time}s" ) def _test_connection(self): """Test connection to Pinecone index.""" try: # Get index stats stats = self.index.describe_index_stats() self.logger.info(f"Connection test successful. Index stats: {stats}") except Exception as e: raise VectorStorageError(f"Connection test failed: {str(e)}") @error_handler(ErrorType.VECTOR_STORAGE) def store_embeddings(self, items: List[Dict[str, Any]]) -> bool: """ Store embeddings in the vector database with full functionality. Args: items: List of dictionaries containing content, metadata, and embeddings Returns: True if successful, False otherwise """ if not self.index or not items: self.logger.warning("No index available or empty items list") return False # Filter and validate items valid_items = self._validate_items(items) if not valid_items: self.logger.warning("No valid embeddings to store") return False self.logger.info(f"Storing {len(valid_items)} embeddings in Pinecone") start_time = time.time() try: # Process in batches total_batches = (len(valid_items) + self.batch_size - 1) // self.batch_size successful_batches = 0 for i in range(0, len(valid_items), self.batch_size): batch_num = (i // self.batch_size) + 1 batch = valid_items[i : i + self.batch_size] self.logger.info( f"Processing batch {batch_num}/{total_batches} ({len(batch)} vectors)" ) success = self._store_batch(batch) if success: successful_batches += 1 self.stats["vectors_stored"] += len(batch) else: self.stats["failed_operations"] += 1 self.logger.error(f" Batch {batch_num} failed") # Rate limiting between batches if i + self.batch_size < len(valid_items): time.sleep(0.1) self.stats["batch_operations"] += total_batches processing_time = time.time() - start_time success_rate = successful_batches / total_batches * 100 self.logger.info( f"Storage completed: {successful_batches}/{total_batches} batches successful ({success_rate:.1f}%) in {processing_time:.2f}s" ) return successful_batches > 0 except Exception as e: self.stats["failed_operations"] += 1 raise VectorStorageError(f"Failed to store embeddings: {str(e)}") def _validate_items(self, items: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Validate and prepare items for storage. Args: items: List of items to validate Returns: List of valid items """ valid_items = [] for i, item in enumerate(items): try: # Check required fields if not isinstance(item, dict): self.logger.warning(f"Item {i} is not a dictionary") continue if "embedding" not in item or not item["embedding"]: self.logger.warning(f"Item {i} missing embedding") continue embedding = item["embedding"] if not isinstance(embedding, list) or len(embedding) != self.dimension: self.logger.warning( f"Item {i} has invalid embedding dimension: {len(embedding)} != {self.dimension}" ) continue # Prepare item processed_item = self._prepare_item_for_storage(item, i) valid_items.append(processed_item) except Exception as e: self.logger.warning(f"Error validating item {i}: {str(e)}") continue return valid_items def _prepare_item_for_storage( self, item: Dict[str, Any], index: int ) -> Dict[str, Any]: """ Prepare item for Pinecone storage. Args: item: Item to prepare index: Item index for ID generation Returns: Prepared item """ # 🆔 Generate unique ID item_id = item.get("id") if not item_id: # Create ID from content hash + timestamp content = item.get("content", "") timestamp = str(int(time.time() * 1000)) content_hash = hashlib.md5(content.encode()).hexdigest()[:8] item_id = f"doc_{content_hash}_{timestamp}_{index}" # Prepare metadata metadata = item.get("metadata", {}).copy() # Add essential fields to metadata metadata.update( { "content_preview": item.get("content", "")[:500], # First 500 chars "content_length": len(item.get("content", "")), "stored_at": datetime.now().isoformat(), "source": item.get("source", "unknown"), "document_type": item.get("document_type", "text"), } ) # Ensure metadata size limit metadata = self._truncate_metadata(metadata) return {"id": item_id, "values": item["embedding"], "metadata": metadata} def _truncate_metadata(self, metadata: Dict[str, Any]) -> Dict[str, Any]: """ Truncate metadata to fit Pinecone size limits. Args: metadata: Original metadata Returns: Truncated metadata """ import json # 📏 Check current size metadata_str = json.dumps(metadata, default=str) if len(metadata_str.encode()) <= self.max_metadata_size: return metadata # Truncate large fields truncated = metadata.copy() # Truncate text fields progressively text_fields = ["content_preview", "text", "description", "summary"] for field in text_fields: if field in truncated: while ( len(json.dumps(truncated, default=str).encode()) > self.max_metadata_size ): current_length = len(str(truncated[field])) if current_length <= 50: break truncated[field] = ( str(truncated[field])[: current_length // 2] + "..." ) return truncated def _store_batch(self, batch: List[Dict[str, Any]]) -> bool: """ Store a batch of embeddings in Pinecone. Args: batch: List of prepared items Returns: True if successful """ try: # Upsert vectors to Pinecone upsert_response = self.index.upsert( vectors=batch, timeout=self.upsert_timeout ) # Verify upsert success if hasattr(upsert_response, "upserted_count"): expected_count = len(batch) actual_count = upsert_response.upserted_count if actual_count != expected_count: self.logger.warning( f"Upsert count mismatch: {actual_count}/{expected_count}" ) return False self.logger.info(f"Successfully stored batch of {len(batch)} vectors") return True except Exception as e: self.logger.error(f" Error storing batch: {str(e)}") return False @error_handler(ErrorType.VECTOR_STORAGE) def search( self, query_embedding: List[float], top_k: int = 5, filter: Optional[Dict[str, Any]] = None, include_metadata: bool = True, include_values: bool = False, ) -> List[Dict[str, Any]]: """ Search for similar vectors with advanced filtering. Args: query_embedding: Query vector to search for top_k: Number of results to return filter: Optional metadata filter include_metadata: Whether to include metadata in results include_values: Whether to include vector values in results Returns: List of search results with scores and metadata """ if not self.index or not query_embedding: self.logger.warning("No index available or empty query embedding") return [] # Validate query embedding if len(query_embedding) != self.dimension: raise VectorStorageError( f"Query embedding dimension {len(query_embedding)} != {self.dimension}" ) self.logger.info(f"Searching for similar vectors (top_k={top_k})") start_time = time.time() try: # Perform similarity search search_response = self.index.query( vector=query_embedding, top_k=top_k, filter=filter, include_metadata=include_metadata, include_values=include_values, timeout=self.query_timeout, ) # Process results results = [] if hasattr(search_response, "matches"): for match in search_response.matches: result = { "id": match.id, "score": float(match.score), } if include_metadata and hasattr(match, "metadata"): result["metadata"] = ( dict(match.metadata) if match.metadata else {} ) if include_values and hasattr(match, "values"): result["values"] = match.values results.append(result) self.stats["vectors_queried"] += len(results) search_time = time.time() - start_time self.logger.info( f"Search completed: {len(results)} results in {search_time:.3f}s" ) return results except Exception as e: self.stats["failed_operations"] += 1 raise VectorStorageError(f"Search failed: {str(e)}") @error_handler(ErrorType.VECTOR_STORAGE) def delete( self, ids: Optional[List[str]] = None, filter: Optional[Dict[str, Any]] = None, delete_all: bool = False, ) -> bool: """ Delete vectors from the database. Args: ids: Optional list of vector IDs to delete filter: Optional metadata filter for vectors to delete delete_all: Whether to delete all vectors Returns: True if successful """ if not self.index: self.logger.warning("No index available") return False try: if delete_all: # Delete all vectors self.index.delete(delete_all=True) self.logger.info("Deleted all vectors from index") self.stats["vectors_deleted"] += 1 # Approximate elif ids: # Delete by IDs self.index.delete(ids=ids) self.logger.info(f"Deleted {len(ids)} vectors by ID") self.stats["vectors_deleted"] += len(ids) elif filter: # Delete by filter self.index.delete(filter=filter) self.logger.info(f"Deleted vectors by filter: {filter}") self.stats["vectors_deleted"] += 1 # Approximate else: self.logger.warning("No deletion criteria provided") return False return True except Exception as e: self.stats["failed_operations"] += 1 raise VectorStorageError(f"Delete operation failed: {str(e)}") def get_index_stats(self) -> Dict[str, Any]: """ Get comprehensive index statistics. Returns: Dictionary with index statistics """ if not self.index: return {} try: # Get Pinecone index stats pinecone_stats = self.index.describe_index_stats() # Combine with internal stats runtime = datetime.now() - self.stats["start_time"] return { "pinecone_stats": { "total_vector_count": pinecone_stats.total_vector_count, "dimension": pinecone_stats.dimension, "index_fullness": pinecone_stats.index_fullness, "namespaces": ( dict(pinecone_stats.namespaces) if pinecone_stats.namespaces else {} ), }, "internal_stats": { **self.stats, "runtime_seconds": runtime.total_seconds(), "avg_vectors_per_batch": ( self.stats["vectors_stored"] / max(1, self.stats["batch_operations"]) ), "success_rate": ( ( self.stats["batch_operations"] - self.stats["failed_operations"] ) / max(1, self.stats["batch_operations"]) * 100 ), }, "configuration": { "index_name": self.index_name, "dimension": self.dimension, "metric": self.metric, "batch_size": self.batch_size, }, } except Exception as e: self.logger.error(f" Error getting index stats: {str(e)}") return {"error": str(e)} def health_check(self) -> Dict[str, Any]: """ Perform health check on the vector database. Returns: Health check results """ health = { "status": "unknown", "timestamp": datetime.now().isoformat(), "checks": {}, } try: # Check API connection if self.pc: health["checks"]["api_connection"] = "Connected" else: health["checks"]["api_connection"] = " Not connected" health["status"] = "unhealthy" return health # Check index availability if self.index: health["checks"]["index_available"] = "Available" else: health["checks"]["index_available"] = " Not available" health["status"] = "unhealthy" return health # Test query operation try: test_vector = [0.1] * self.dimension self.index.query(vector=test_vector, top_k=1, timeout=5) health["checks"]["query_operation"] = "Working" except Exception as e: health["checks"]["query_operation"] = f" Failed: {str(e)}" health["status"] = "degraded" # Check index stats try: stats = self.index.describe_index_stats() health["checks"]["index_stats"] = f"{stats.total_vector_count} vectors" except Exception as e: health["checks"]["index_stats"] = f" Failed: {str(e)}" # 🎯 Overall status if health["status"] == "unknown": health["status"] = "healthy" except Exception as e: health["status"] = "unhealthy" health["error"] = str(e) return health def reset_stats(self): """Reset internal statistics.""" self.stats = { "vectors_stored": 0, "vectors_queried": 0, "vectors_deleted": 0, "batch_operations": 0, "failed_operations": 0, "start_time": datetime.now(), } self.logger.info("Statistics reset") def get_stats(self) -> Dict[str, Any]: """ Get simplified stats for UI display. Returns: Dictionary with basic statistics """ try: if not self.index: return {"total_vectors": 0, "status": "disconnected"} # Get Pinecone stats pinecone_stats = self.index.describe_index_stats() return { "total_vectors": pinecone_stats.total_vector_count, "dimension": pinecone_stats.dimension, "index_fullness": pinecone_stats.index_fullness, "status": "connected", } except Exception as e: self.logger.warning(f"Could not get stats: {e}") return {"total_vectors": 0, "status": "error", "error": str(e)} def get_unique_sources(self) -> List[Dict[str, Any]]: """ Get unique sources from stored vectors. Returns: List of unique sources with metadata """ try: if not self.index: return [] # This is a simplified approach - in a real implementation, # you might want to maintain a separate metadata index # For now, we'll return mock data based on what might be stored # Try to get some sample vectors to extract sources test_vector = [0.1] * self.dimension results = self.index.query( vector=test_vector, top_k=100, # Get more results to find unique sources include_metadata=True, ) sources = {} for match in results.matches: if hasattr(match, "metadata") and match.metadata: source = match.metadata.get("source", "Unknown") if source not in sources: sources[source] = { "source": source, "chunk_count": 1, "added_date": match.metadata.get("stored_at", "Unknown"), } else: sources[source]["chunk_count"] += 1 return list(sources.values()) except Exception as e: self.logger.warning(f"Could not get unique sources: {e}") return [] def list_documents(self) -> List[Dict[str, Any]]: """ List all documents in the vector database. Returns: List of document information """ try: # Get unique sources and format as documents sources = self.get_unique_sources() documents = [] for source_info in sources: documents.append( { "name": source_info["source"], "chunks": source_info["chunk_count"], "date": source_info["added_date"], } ) return documents except Exception as e: self.logger.warning(f"Could not list documents: {e}") return []