Pix-Agent / app /utils /pinecone_fix.py
ManTea's picture
QA version persionality
c8b8c9b
raw
history blame
7.49 kB
"""
Improved Pinecone connection handling with dimension validation.
This module provides more robust connection and error handling for Pinecone operations.
"""
import logging
import time
from typing import Optional, Dict, Any, Tuple, List
import pinecone
from pinecone import Pinecone, ServerlessSpec, PodSpec
logger = logging.getLogger(__name__)
# Default retry settings
DEFAULT_MAX_RETRIES = 3
DEFAULT_RETRY_DELAY = 2
class PineconeConnectionManager:
"""
Manages Pinecone connections with enhanced error handling and dimension validation.
This class centralizes Pinecone connection logic, providing:
- Connection pooling/reuse
- Automatic retries with exponential backoff
- Dimension validation before operations
- Detailed error logging for better debugging
"""
# Class-level cache of Pinecone clients
_clients = {}
@classmethod
def get_client(cls, api_key: str) -> Pinecone:
"""
Returns a Pinecone client for the given API key, creating one if needed.
Args:
api_key: Pinecone API key
Returns:
Initialized Pinecone client
"""
if not api_key:
raise ValueError("Pinecone API key cannot be empty")
# Return cached client if it exists
if api_key in cls._clients:
return cls._clients[api_key]
# Log client creation (but hide full API key)
key_prefix = api_key[:4] + "..." if len(api_key) > 4 else "invalid"
logger.info(f"Creating new Pinecone client with API key (first 4 chars: {key_prefix}...)")
try:
# Initialize Pinecone client
client = Pinecone(api_key=api_key)
cls._clients[api_key] = client
logger.info("Pinecone client created successfully")
return client
except Exception as e:
logger.error(f"Failed to create Pinecone client: {str(e)}")
raise RuntimeError(f"Pinecone client initialization failed: {str(e)}") from e
@classmethod
def get_index(cls,
api_key: str,
index_name: str,
max_retries: int = DEFAULT_MAX_RETRIES) -> Any:
"""
Get a Pinecone index with retry logic.
Args:
api_key: Pinecone API key
index_name: Name of the index to connect to
max_retries: Maximum number of retry attempts
Returns:
Pinecone index
"""
client = cls.get_client(api_key)
# Retry logic for connection issues
for attempt in range(max_retries):
try:
index = client.Index(index_name)
# Test the connection
_ = index.describe_index_stats()
logger.info(f"Connected to Pinecone index: {index_name}")
return index
except Exception as e:
if attempt < max_retries - 1:
wait_time = DEFAULT_RETRY_DELAY * (2 ** attempt) # Exponential backoff
logger.warning(f"Pinecone connection attempt {attempt+1} failed: {e}. Retrying in {wait_time}s...")
time.sleep(wait_time)
else:
logger.error(f"Failed to connect to Pinecone index after {max_retries} attempts: {e}")
raise RuntimeError(f"Pinecone index connection failed: {str(e)}") from e
@classmethod
def validate_dimensions(cls,
index: Any,
vector_dimensions: int) -> Tuple[bool, Optional[str]]:
"""
Validate that the vector dimensions match the Pinecone index configuration.
Args:
index: Pinecone index
vector_dimensions: Dimensions of the vectors to be uploaded
Returns:
Tuple of (is_valid, error_message)
"""
try:
# Get index stats
stats = index.describe_index_stats()
index_dimensions = stats.dimension
if index_dimensions != vector_dimensions:
error_msg = (f"Vector dimensions mismatch: Your vectors have {vector_dimensions} dimensions, "
f"but Pinecone index expects {index_dimensions} dimensions")
logger.error(error_msg)
return False, error_msg
return True, None
except Exception as e:
error_msg = f"Failed to validate dimensions: {str(e)}"
logger.error(error_msg)
return False, error_msg
@classmethod
def upsert_vectors_with_validation(cls,
index: Any,
vectors: List[Dict[str, Any]],
namespace: str = "",
batch_size: int = 100) -> Dict[str, Any]:
"""
Upsert vectors with dimension validation and batching.
Args:
index: Pinecone index
vectors: List of vectors to upsert, each with 'id', 'values', and optional 'metadata'
namespace: Namespace to upsert to
batch_size: Size of batches for upserting
Returns:
Result of upsert operation
"""
if not vectors:
return {"upserted_count": 0, "success": True}
# Validate dimensions with the first vector
if "values" in vectors[0] and len(vectors[0]["values"]) > 0:
vector_dim = len(vectors[0]["values"])
is_valid, error_msg = cls.validate_dimensions(index, vector_dim)
if not is_valid:
logger.error(f"Dimension validation failed: {error_msg}")
raise ValueError(f"Vector dimensions do not match Pinecone index configuration: {error_msg}")
# Batch upsert
total_upserted = 0
for i in range(0, len(vectors), batch_size):
batch = vectors[i:i+batch_size]
try:
result = index.upsert(vectors=batch, namespace=namespace)
batch_upserted = result.get("upserted_count", len(batch))
total_upserted += batch_upserted
logger.info(f"Upserted batch {i//batch_size + 1}: {batch_upserted} vectors")
except Exception as e:
logger.error(f"Failed to upsert batch {i//batch_size + 1}: {str(e)}")
raise RuntimeError(f"Vector upsert failed: {str(e)}") from e
return {"upserted_count": total_upserted, "success": True}
# Simplified function to check connection
def check_connection(api_key: str, index_name: str) -> bool:
"""
Test Pinecone connection and validate index exists.
Args:
api_key: Pinecone API key
index_name: Name of index to test
Returns:
True if connection successful, False otherwise
"""
try:
index = PineconeConnectionManager.get_index(api_key, index_name)
stats = index.describe_index_stats()
total_vectors = stats.total_vector_count
logger.info(f"Pinecone connection is working. Total vectors: {total_vectors}")
return True
except Exception as e:
logger.error(f"Pinecone connection failed: {str(e)}")
return False