|
import os |
|
from pinecone import Pinecone |
|
from dotenv import load_dotenv |
|
import logging |
|
from typing import Optional, List, Dict, Any, Union, Tuple |
|
import time |
|
from langchain_google_genai import GoogleGenerativeAIEmbeddings |
|
import google.generativeai as genai |
|
from langchain_core.retrievers import BaseRetriever |
|
from langchain.callbacks.manager import Callbacks |
|
from langchain_core.documents import Document |
|
from langchain_core.pydantic_v1 import Field |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY") |
|
PINECONE_INDEX_NAME = os.getenv("PINECONE_INDEX_NAME") |
|
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") |
|
|
|
|
|
DEFAULT_LIMIT_K = int(os.getenv("PINECONE_DEFAULT_LIMIT_K", "10")) |
|
DEFAULT_TOP_K = int(os.getenv("PINECONE_DEFAULT_TOP_K", "6")) |
|
DEFAULT_SIMILARITY_METRIC = os.getenv("PINECONE_DEFAULT_SIMILARITY_METRIC", "cosine") |
|
DEFAULT_SIMILARITY_THRESHOLD = float(os.getenv("PINECONE_DEFAULT_SIMILARITY_THRESHOLD", "0.75")) |
|
ALLOWED_METRICS = os.getenv("PINECONE_ALLOWED_METRICS", "cosine,dotproduct,euclidean").split(",") |
|
|
|
|
|
__all__ = [ |
|
'get_pinecone_index', |
|
'check_db_connection', |
|
'search_vectors', |
|
'upsert_vectors', |
|
'delete_vectors', |
|
'fetch_metadata', |
|
'get_chain', |
|
'DEFAULT_TOP_K', |
|
'DEFAULT_LIMIT_K', |
|
'DEFAULT_SIMILARITY_METRIC', |
|
'DEFAULT_SIMILARITY_THRESHOLD', |
|
'ALLOWED_METRICS', |
|
'ThresholdRetriever' |
|
] |
|
|
|
|
|
if GOOGLE_API_KEY: |
|
genai.configure(api_key=GOOGLE_API_KEY) |
|
|
|
|
|
pc = None |
|
index = None |
|
_retriever_instance = None |
|
|
|
|
|
if not PINECONE_API_KEY: |
|
logger.error("PINECONE_API_KEY is not set in environment variables") |
|
|
|
if not PINECONE_INDEX_NAME: |
|
logger.error("PINECONE_INDEX_NAME is not set in environment variables") |
|
|
|
|
|
def init_pinecone(): |
|
"""Initialize pinecone connection using new API""" |
|
global pc, index |
|
|
|
try: |
|
|
|
if pc is None: |
|
logger.info(f"Initializing Pinecone connection to index {PINECONE_INDEX_NAME}...") |
|
|
|
|
|
if not PINECONE_API_KEY: |
|
logger.error("PINECONE_API_KEY is not set in environment variables") |
|
return None |
|
|
|
if not PINECONE_INDEX_NAME: |
|
logger.error("PINECONE_INDEX_NAME is not set in environment variables") |
|
return None |
|
|
|
|
|
pc = Pinecone(api_key=PINECONE_API_KEY) |
|
|
|
try: |
|
|
|
index_list = pc.list_indexes() |
|
|
|
if not hasattr(index_list, 'names') or PINECONE_INDEX_NAME not in index_list.names(): |
|
logger.error(f"Index {PINECONE_INDEX_NAME} does not exist in Pinecone") |
|
return None |
|
|
|
|
|
index = pc.Index(PINECONE_INDEX_NAME) |
|
logger.info(f"Pinecone connection established to index {PINECONE_INDEX_NAME}") |
|
except Exception as connection_error: |
|
logger.error(f"Error connecting to Pinecone index: {connection_error}") |
|
return None |
|
|
|
return index |
|
except ImportError as e: |
|
logger.error(f"Required package for Pinecone is missing: {e}") |
|
return None |
|
except Exception as e: |
|
logger.error(f"Unexpected error initializing Pinecone: {e}") |
|
return None |
|
|
|
|
|
def get_pinecone_index(): |
|
"""Get Pinecone index""" |
|
global index |
|
if index is None: |
|
index = init_pinecone() |
|
return index |
|
|
|
|
|
def check_db_connection(): |
|
"""Check Pinecone connection""" |
|
try: |
|
pinecone_index = get_pinecone_index() |
|
if pinecone_index is None: |
|
return False |
|
|
|
|
|
stats = pinecone_index.describe_index_stats() |
|
|
|
|
|
total_vectors = stats.get('total_vector_count', 0) |
|
if hasattr(stats, 'namespaces'): |
|
|
|
total_vectors = sum(ns.get('vector_count', 0) for ns in stats.namespaces.values()) |
|
|
|
logger.info(f"Pinecone connection is working. Total vectors: {total_vectors}") |
|
return True |
|
except Exception as e: |
|
logger.error(f"Error in Pinecone connection: {e}") |
|
return False |
|
|
|
|
|
def convert_score(score: float, metric: str) -> float: |
|
""" |
|
Convert similarity score to a 0-1 scale based on the metric used. |
|
For metrics like euclidean distance where lower is better, we invert the score. |
|
|
|
Args: |
|
score: The raw similarity score |
|
metric: The similarity metric used |
|
|
|
Returns: |
|
A normalized score between 0-1 where higher means more similar |
|
""" |
|
if metric.lower() in ["euclidean", "l2"]: |
|
|
|
|
|
return max(0, 1 - (score / 2.0)) |
|
else: |
|
|
|
return score |
|
|
|
|
|
def filter_by_threshold(results, threshold: float, metric: str) -> List[Dict]: |
|
""" |
|
Filter query results based on similarity threshold. |
|
|
|
Args: |
|
results: The query results from Pinecone |
|
threshold: The similarity threshold (0-1) |
|
metric: The similarity metric used |
|
|
|
Returns: |
|
Filtered list of matches |
|
""" |
|
filtered_matches = [] |
|
|
|
if not hasattr(results, 'matches'): |
|
return filtered_matches |
|
|
|
for match in results.matches: |
|
|
|
score = getattr(match, 'score', 0) |
|
|
|
|
|
normalized_score = convert_score(score, metric) |
|
|
|
|
|
if normalized_score >= threshold: |
|
|
|
match.normalized_score = normalized_score |
|
filtered_matches.append(match) |
|
|
|
return filtered_matches |
|
|
|
|
|
async def search_vectors( |
|
query_vector, |
|
top_k: int = DEFAULT_TOP_K, |
|
limit_k: int = DEFAULT_LIMIT_K, |
|
similarity_metric: str = DEFAULT_SIMILARITY_METRIC, |
|
similarity_threshold: float = DEFAULT_SIMILARITY_THRESHOLD, |
|
namespace: str = "Default", |
|
filter: Optional[Dict] = None |
|
) -> Dict: |
|
""" |
|
Search for most similar vectors in Pinecone with advanced filtering options. |
|
|
|
Args: |
|
query_vector: The query vector |
|
top_k: Number of results to return (after threshold filtering) |
|
limit_k: Maximum number of results to retrieve from Pinecone |
|
similarity_metric: Similarity metric to use (cosine, dotproduct, euclidean) |
|
similarity_threshold: Threshold for similarity (0-1) |
|
namespace: Namespace to search in |
|
filter: Filter query |
|
|
|
Returns: |
|
Search results with matches filtered by threshold |
|
""" |
|
try: |
|
|
|
if similarity_metric not in ALLOWED_METRICS: |
|
logger.warning(f"Invalid similarity metric: {similarity_metric}. Using default: {DEFAULT_SIMILARITY_METRIC}") |
|
similarity_metric = DEFAULT_SIMILARITY_METRIC |
|
|
|
if limit_k < top_k: |
|
logger.warning(f"limit_k ({limit_k}) must be greater than or equal to top_k ({top_k}). Setting limit_k to {top_k}") |
|
limit_k = top_k |
|
|
|
|
|
pinecone_index = get_pinecone_index() |
|
if pinecone_index is None: |
|
logger.error("Failed to get Pinecone index for search") |
|
return None |
|
|
|
|
|
results = pinecone_index.query( |
|
vector=query_vector, |
|
top_k=limit_k, |
|
namespace=namespace, |
|
filter=filter, |
|
include_metadata=True, |
|
include_values=False, |
|
metric=similarity_metric |
|
) |
|
|
|
|
|
filtered_matches = filter_by_threshold(results, similarity_threshold, similarity_metric) |
|
|
|
|
|
filtered_matches = filtered_matches[:top_k] |
|
|
|
|
|
results.matches = filtered_matches |
|
|
|
|
|
match_count = len(filtered_matches) |
|
logger.info(f"Pinecone search returned {match_count} matches after threshold filtering (metric: {similarity_metric}, threshold: {similarity_threshold}, namespace: {namespace})") |
|
|
|
return results |
|
except Exception as e: |
|
logger.error(f"Error searching vectors: {e}") |
|
return None |
|
|
|
|
|
async def upsert_vectors(vectors, namespace="Default"): |
|
"""Upsert vectors to Pinecone index""" |
|
try: |
|
pinecone_index = get_pinecone_index() |
|
if pinecone_index is None: |
|
logger.error("Failed to get Pinecone index for upsert") |
|
return None |
|
|
|
response = pinecone_index.upsert( |
|
vectors=vectors, |
|
namespace=namespace |
|
) |
|
|
|
|
|
upserted_count = response.get('upserted_count', 0) |
|
logger.info(f"Upserted {upserted_count} vectors to Pinecone") |
|
|
|
return response |
|
except Exception as e: |
|
logger.error(f"Error upserting vectors: {e}") |
|
return None |
|
|
|
|
|
async def delete_vectors(ids, namespace="Default"): |
|
"""Delete vectors from Pinecone index""" |
|
try: |
|
pinecone_index = get_pinecone_index() |
|
if pinecone_index is None: |
|
logger.error("Failed to get Pinecone index for delete") |
|
return False |
|
|
|
response = pinecone_index.delete( |
|
ids=ids, |
|
namespace=namespace |
|
) |
|
|
|
logger.info(f"Deleted vectors with IDs {ids} from Pinecone") |
|
return True |
|
except Exception as e: |
|
logger.error(f"Error deleting vectors: {e}") |
|
return False |
|
|
|
|
|
async def fetch_metadata(ids, namespace="Default"): |
|
"""Fetch metadata for specific vector IDs""" |
|
try: |
|
pinecone_index = get_pinecone_index() |
|
if pinecone_index is None: |
|
logger.error("Failed to get Pinecone index for fetch") |
|
return None |
|
|
|
response = pinecone_index.fetch( |
|
ids=ids, |
|
namespace=namespace |
|
) |
|
|
|
return response |
|
except Exception as e: |
|
logger.error(f"Error fetching vector metadata: {e}") |
|
return None |
|
|
|
|
|
class ThresholdRetriever(BaseRetriever): |
|
""" |
|
Custom retriever that supports threshold-based filtering and multiple similarity metrics. |
|
This integrates with the Langchain ecosystem while using our advanced retrieval logic. |
|
""" |
|
|
|
vectorstore: Any = Field(description="Vector store to use for retrieval") |
|
embeddings: Any = Field(description="Embeddings model to use for retrieval") |
|
search_kwargs: Dict[str, Any] = Field(default_factory=dict, description="Search kwargs for the vectorstore") |
|
top_k: int = Field(default=DEFAULT_TOP_K, description="Number of results to return after filtering") |
|
limit_k: int = Field(default=DEFAULT_LIMIT_K, description="Maximum number of results to retrieve from Pinecone") |
|
similarity_metric: str = Field(default=DEFAULT_SIMILARITY_METRIC, description="Similarity metric to use") |
|
similarity_threshold: float = Field(default=DEFAULT_SIMILARITY_THRESHOLD, description="Threshold for similarity") |
|
namespace: str = "Default" |
|
|
|
class Config: |
|
"""Configuration for this pydantic object.""" |
|
arbitrary_types_allowed = True |
|
|
|
async def search_vectors_sync( |
|
self, query_vector, |
|
top_k: int = DEFAULT_TOP_K, |
|
limit_k: int = DEFAULT_LIMIT_K, |
|
similarity_metric: str = DEFAULT_SIMILARITY_METRIC, |
|
similarity_threshold: float = DEFAULT_SIMILARITY_THRESHOLD, |
|
namespace: str = "Default", |
|
filter: Optional[Dict] = None |
|
) -> Dict: |
|
"""Synchronous wrapper for search_vectors""" |
|
import asyncio |
|
try: |
|
|
|
try: |
|
loop = asyncio.get_event_loop() |
|
except RuntimeError: |
|
loop = asyncio.new_event_loop() |
|
asyncio.set_event_loop(loop) |
|
|
|
|
|
if loop.is_running(): |
|
|
|
task = asyncio.create_task(search_vectors( |
|
query_vector=query_vector, |
|
top_k=top_k, |
|
limit_k=limit_k, |
|
similarity_metric=similarity_metric, |
|
similarity_threshold=similarity_threshold, |
|
namespace=namespace, |
|
filter=filter |
|
)) |
|
return await task |
|
else: |
|
|
|
return await search_vectors( |
|
query_vector=query_vector, |
|
top_k=top_k, |
|
limit_k=limit_k, |
|
similarity_metric=similarity_metric, |
|
similarity_threshold=similarity_threshold, |
|
namespace=namespace, |
|
filter=filter |
|
) |
|
except Exception as e: |
|
logger.error(f"Error in search_vectors_sync: {e}") |
|
return None |
|
|
|
def _get_relevant_documents( |
|
self, query: str, *, run_manager: Callbacks = None |
|
) -> List[Document]: |
|
""" |
|
Get documents relevant to the query using threshold-based retrieval. |
|
|
|
Args: |
|
query: The query string |
|
run_manager: The callbacks manager |
|
|
|
Returns: |
|
List of relevant documents |
|
""" |
|
|
|
try: |
|
|
|
embedding = self.embeddings.embed_query(query) |
|
except Exception as e: |
|
logger.error(f"Error generating embedding: {e}") |
|
|
|
embedding_model = GoogleGenerativeAIEmbeddings(model="models/embedding-001") |
|
embedding = embedding_model.embed_query(query) |
|
|
|
|
|
import asyncio |
|
|
|
|
|
try: |
|
loop = asyncio.get_event_loop() |
|
except RuntimeError: |
|
loop = asyncio.new_event_loop() |
|
asyncio.set_event_loop(loop) |
|
|
|
|
|
if loop.is_running(): |
|
|
|
|
|
from concurrent.futures import ThreadPoolExecutor |
|
import functools |
|
|
|
|
|
def run_async_in_thread(): |
|
|
|
thread_loop = asyncio.new_event_loop() |
|
asyncio.set_event_loop(thread_loop) |
|
|
|
return thread_loop.run_until_complete(search_vectors( |
|
query_vector=embedding, |
|
top_k=self.top_k, |
|
limit_k=self.limit_k, |
|
similarity_metric=self.similarity_metric, |
|
similarity_threshold=self.similarity_threshold, |
|
namespace=self.namespace, |
|
|
|
)) |
|
|
|
|
|
with ThreadPoolExecutor() as executor: |
|
search_result = executor.submit(run_async_in_thread).result() |
|
else: |
|
|
|
search_result = loop.run_until_complete(search_vectors( |
|
query_vector=embedding, |
|
top_k=self.top_k, |
|
limit_k=self.limit_k, |
|
similarity_metric=self.similarity_metric, |
|
similarity_threshold=self.similarity_threshold, |
|
namespace=self.namespace, |
|
|
|
)) |
|
|
|
|
|
documents = [] |
|
if search_result and hasattr(search_result, 'matches'): |
|
for match in search_result.matches: |
|
|
|
metadata = {} |
|
if hasattr(match, 'metadata'): |
|
metadata = match.metadata |
|
|
|
|
|
score = getattr(match, 'score', 0) |
|
normalized_score = getattr(match, 'normalized_score', score) |
|
metadata['score'] = score |
|
metadata['normalized_score'] = normalized_score |
|
|
|
|
|
text = metadata.get('text', '') |
|
if 'text' in metadata: |
|
del metadata['text'] |
|
|
|
|
|
doc = Document( |
|
page_content=text, |
|
metadata=metadata |
|
) |
|
documents.append(doc) |
|
|
|
return documents |
|
|
|
|
|
def get_chain( |
|
index_name=PINECONE_INDEX_NAME, |
|
namespace="Default", |
|
top_k=DEFAULT_TOP_K, |
|
limit_k=DEFAULT_LIMIT_K, |
|
similarity_metric=DEFAULT_SIMILARITY_METRIC, |
|
similarity_threshold=DEFAULT_SIMILARITY_THRESHOLD |
|
): |
|
""" |
|
Get the retrieval chain with Pinecone vector store using threshold-based retrieval. |
|
|
|
Args: |
|
index_name: Pinecone index name |
|
namespace: Pinecone namespace |
|
top_k: Number of results to return after filtering |
|
limit_k: Maximum number of results to retrieve from Pinecone |
|
similarity_metric: Similarity metric to use (cosine, dotproduct, euclidean) |
|
similarity_threshold: Threshold for similarity (0-1) |
|
|
|
Returns: |
|
ThresholdRetriever instance |
|
""" |
|
global _retriever_instance |
|
try: |
|
|
|
if _retriever_instance is not None: |
|
return _retriever_instance |
|
|
|
start_time = time.time() |
|
logger.info("Initializing new retriever chain with threshold-based filtering") |
|
|
|
|
|
embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001") |
|
|
|
|
|
pinecone_index = get_pinecone_index() |
|
if not pinecone_index: |
|
logger.error("Failed to get Pinecone index for retriever chain") |
|
return None |
|
|
|
|
|
try: |
|
stats = pinecone_index.describe_index_stats() |
|
total_vectors = stats.get('total_vector_count', 0) |
|
logger.info(f"Pinecone index stats - Total vectors: {total_vectors}") |
|
except Exception as e: |
|
logger.error(f"Error getting index stats: {e}") |
|
|
|
|
|
from langchain_community.vectorstores import Pinecone as LangchainPinecone |
|
|
|
logger.info(f"Creating Pinecone vectorstore with index: {index_name}, namespace: {namespace}") |
|
vectorstore = LangchainPinecone.from_existing_index( |
|
embedding=embeddings, |
|
index_name=index_name, |
|
namespace=namespace, |
|
text_key="text" |
|
) |
|
|
|
|
|
logger.info(f"Creating ThresholdRetriever with top_k={top_k}, limit_k={limit_k}, " + |
|
f"metric={similarity_metric}, threshold={similarity_threshold}") |
|
|
|
|
|
_retriever_instance = ThresholdRetriever( |
|
vectorstore=vectorstore, |
|
embeddings=embeddings, |
|
top_k=top_k, |
|
limit_k=limit_k, |
|
similarity_metric=similarity_metric, |
|
similarity_threshold=similarity_threshold |
|
) |
|
|
|
logger.info(f"Pinecone retriever initialized in {time.time() - start_time:.2f} seconds") |
|
|
|
return _retriever_instance |
|
except Exception as e: |
|
logger.error(f"Error creating retrieval chain: {e}") |
|
return None |