|
""" |
|
Improved Pinecone connection handling with dimension validation. |
|
This module provides more robust connection and error handling for Pinecone operations. |
|
""" |
|
import logging |
|
import time |
|
from typing import Optional, Dict, Any, Tuple, List |
|
import pinecone |
|
from pinecone import Pinecone, ServerlessSpec, PodSpec |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
DEFAULT_MAX_RETRIES = 3 |
|
DEFAULT_RETRY_DELAY = 2 |
|
|
|
class PineconeConnectionManager: |
|
""" |
|
Manages Pinecone connections with enhanced error handling and dimension validation. |
|
|
|
This class centralizes Pinecone connection logic, providing: |
|
- Connection pooling/reuse |
|
- Automatic retries with exponential backoff |
|
- Dimension validation before operations |
|
- Detailed error logging for better debugging |
|
""" |
|
|
|
|
|
_clients = {} |
|
|
|
@classmethod |
|
def get_client(cls, api_key: str) -> Pinecone: |
|
""" |
|
Returns a Pinecone client for the given API key, creating one if needed. |
|
|
|
Args: |
|
api_key: Pinecone API key |
|
|
|
Returns: |
|
Initialized Pinecone client |
|
""" |
|
if not api_key: |
|
raise ValueError("Pinecone API key cannot be empty") |
|
|
|
|
|
if api_key in cls._clients: |
|
return cls._clients[api_key] |
|
|
|
|
|
key_prefix = api_key[:4] + "..." if len(api_key) > 4 else "invalid" |
|
logger.info(f"Creating new Pinecone client with API key (first 4 chars: {key_prefix}...)") |
|
|
|
try: |
|
|
|
client = Pinecone(api_key=api_key) |
|
cls._clients[api_key] = client |
|
logger.info("Pinecone client created successfully") |
|
return client |
|
except Exception as e: |
|
logger.error(f"Failed to create Pinecone client: {str(e)}") |
|
raise RuntimeError(f"Pinecone client initialization failed: {str(e)}") from e |
|
|
|
@classmethod |
|
def get_index(cls, |
|
api_key: str, |
|
index_name: str, |
|
max_retries: int = DEFAULT_MAX_RETRIES) -> Any: |
|
""" |
|
Get a Pinecone index with retry logic. |
|
|
|
Args: |
|
api_key: Pinecone API key |
|
index_name: Name of the index to connect to |
|
max_retries: Maximum number of retry attempts |
|
|
|
Returns: |
|
Pinecone index |
|
""" |
|
client = cls.get_client(api_key) |
|
|
|
|
|
for attempt in range(max_retries): |
|
try: |
|
index = client.Index(index_name) |
|
|
|
_ = index.describe_index_stats() |
|
logger.info(f"Connected to Pinecone index: {index_name}") |
|
return index |
|
except Exception as e: |
|
if attempt < max_retries - 1: |
|
wait_time = DEFAULT_RETRY_DELAY * (2 ** attempt) |
|
logger.warning(f"Pinecone connection attempt {attempt+1} failed: {e}. Retrying in {wait_time}s...") |
|
time.sleep(wait_time) |
|
else: |
|
logger.error(f"Failed to connect to Pinecone index after {max_retries} attempts: {e}") |
|
raise RuntimeError(f"Pinecone index connection failed: {str(e)}") from e |
|
|
|
@classmethod |
|
def validate_dimensions(cls, |
|
index: Any, |
|
vector_dimensions: int) -> Tuple[bool, Optional[str]]: |
|
""" |
|
Validate that the vector dimensions match the Pinecone index configuration. |
|
|
|
Args: |
|
index: Pinecone index |
|
vector_dimensions: Dimensions of the vectors to be uploaded |
|
|
|
Returns: |
|
Tuple of (is_valid, error_message) |
|
""" |
|
try: |
|
|
|
stats = index.describe_index_stats() |
|
index_dimensions = stats.dimension |
|
|
|
if index_dimensions != vector_dimensions: |
|
error_msg = (f"Vector dimensions mismatch: Your vectors have {vector_dimensions} dimensions, " |
|
f"but Pinecone index expects {index_dimensions} dimensions") |
|
logger.error(error_msg) |
|
return False, error_msg |
|
|
|
return True, None |
|
except Exception as e: |
|
error_msg = f"Failed to validate dimensions: {str(e)}" |
|
logger.error(error_msg) |
|
return False, error_msg |
|
|
|
@classmethod |
|
def upsert_vectors_with_validation(cls, |
|
index: Any, |
|
vectors: List[Dict[str, Any]], |
|
namespace: str = "", |
|
batch_size: int = 100) -> Dict[str, Any]: |
|
""" |
|
Upsert vectors with dimension validation and batching. |
|
|
|
Args: |
|
index: Pinecone index |
|
vectors: List of vectors to upsert, each with 'id', 'values', and optional 'metadata' |
|
namespace: Namespace to upsert to |
|
batch_size: Size of batches for upserting |
|
|
|
Returns: |
|
Result of upsert operation |
|
""" |
|
if not vectors: |
|
return {"upserted_count": 0, "success": True} |
|
|
|
|
|
if "values" in vectors[0] and len(vectors[0]["values"]) > 0: |
|
vector_dim = len(vectors[0]["values"]) |
|
is_valid, error_msg = cls.validate_dimensions(index, vector_dim) |
|
|
|
if not is_valid: |
|
logger.error(f"Dimension validation failed: {error_msg}") |
|
raise ValueError(f"Vector dimensions do not match Pinecone index configuration: {error_msg}") |
|
|
|
|
|
total_upserted = 0 |
|
for i in range(0, len(vectors), batch_size): |
|
batch = vectors[i:i+batch_size] |
|
try: |
|
result = index.upsert(vectors=batch, namespace=namespace) |
|
batch_upserted = result.get("upserted_count", len(batch)) |
|
total_upserted += batch_upserted |
|
logger.info(f"Upserted batch {i//batch_size + 1}: {batch_upserted} vectors") |
|
except Exception as e: |
|
logger.error(f"Failed to upsert batch {i//batch_size + 1}: {str(e)}") |
|
raise RuntimeError(f"Vector upsert failed: {str(e)}") from e |
|
|
|
return {"upserted_count": total_upserted, "success": True} |
|
|
|
|
|
def check_connection(api_key: str, index_name: str) -> bool: |
|
""" |
|
Test Pinecone connection and validate index exists. |
|
|
|
Args: |
|
api_key: Pinecone API key |
|
index_name: Name of index to test |
|
|
|
Returns: |
|
True if connection successful, False otherwise |
|
""" |
|
try: |
|
index = PineconeConnectionManager.get_index(api_key, index_name) |
|
stats = index.describe_index_stats() |
|
total_vectors = stats.total_vector_count |
|
logger.info(f"Pinecone connection is working. Total vectors: {total_vectors}") |
|
return True |
|
except Exception as e: |
|
logger.error(f"Pinecone connection failed: {str(e)}") |
|
return False |