Spaces:
Running
Running
File size: 8,710 Bytes
10b392a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
# src/vector_store_manager/chroma_manager.py
from langchain_chroma import Chroma # cite: embed_pipeline.py, query_pipeline.py
from langchain.schema import Document # cite: embed_pipeline.py
from config.settings import PERSIST_DIR, CHROMADB_COLLECTION_NAME # cite: embed_pipeline.py, query_pipeline.py
from src.embedding_generator.embedder import EmbeddingGenerator
import logging
from typing import List, Dict, Any
logger = logging.getLogger(__name__)
class ChromaManager:
"""
Manages interactions with the ChromaDB vector store.
"""
def __init__(self, embedding_generator: EmbeddingGenerator):
self.embedding_generator = embedding_generator
# --- Financial Ministry Adaptation ---
# TODO: Configure Chroma client to use a scalable backend (e.g., ClickHouse)
# instead of or in addition to persistent_directory for production.
# This might involve using chromadb.HttpClient or specific backend configurations.
# Handle connection errors and retries to the database backend.
# Implement authentication/authorization for ChromaDB access.
# ------------------------------------
try:
# Initialize Chroma with the embedding function and persistence settings
# For production, you might replace persist_directory with client settings
# pointing to a ClickHouse backend.
self.vectordb = Chroma(
persist_directory=PERSIST_DIR, # cite: embed_pipeline.py, query_pipeline.py
collection_name=CHROMADB_COLLECTION_NAME, # cite: embed_pipeline.py, query_pipeline.py
embedding_function=self.embedding_generator.embedder # Use the Langchain embedder instance
)
logger.info(f"Initialized ChromaDB collection: '{CHROMADB_COLLECTION_NAME}' at '{PERSIST_DIR}'")
# You might want to check if the collection exists and its health
except Exception as e:
logger.critical(f"Failed to initialize ChromaDB: {e}")
raise e
def add_documents(self, chunks: List[Document]):
"""
Adds document chunks to the ChromaDB collection.
Args:
chunks: A list of Langchain Document chunks with metadata.
"""
# --- Financial Ministry Adaptation ---
# Implement error handling and retry logic for batch additions.
# Consider transactional behavior if adding large batches requires it.
# Log successful and failed additions.
# Ensure document IDs are managed consistently (e.g., based on source + chunk index or a stable hash).
# ------------------------------------
try:
# Langchain's add_documents handles embedding internally using the provided embedding_function
# Ensure your chunks have unique IDs if you need to update/delete later.
# If IDs are not in metadata, Langchain/Chroma might generate them.
# For better control, you might generate IDs in document_processor and pass them here.
if not chunks:
logger.warning("No chunks to add to ChromaDB.")
return
# If chunks don't have IDs, generate them (simple example)
# In a real system, use stable IDs based on source data
# chunk_ids = [f"{chunk.metadata.get('source', 'unknown')}_{i}" for i, chunk in enumerate(chunks)]
# self.vectordb.add_documents(chunks, ids=chunk_ids)
self.vectordb.add_documents(chunks) # Langchain handles IDs if not provided
logger.info(f"Added {len(chunks)} chunks to ChromaDB.")
except Exception as e:
logger.error(f"Failed to add documents to ChromaDB: {e}")
# Implement retry logic or raise exception
def update_documents(self, ids: List[str], documents: List[str], metadatas: List[Dict[str, Any]]):
"""
Updates documents in the ChromaDB collection by ID.
Args:
ids: List of document IDs to update.
documents: List of new document content corresponding to IDs.
metadatas: List of new metadata dictionaries corresponding to IDs.
"""
# --- Financial Ministry Adaptation ---
# Implement error handling and retry logic.
# Validate that IDs exist before attempting to update.
# ------------------------------------
try:
self.vectordb._collection.update( # Accessing the underlying collection for update/delete
ids=ids,
documents=documents,
metadatas=metadatas
)
logger.info(f"Updated documents with IDs: {ids}")
except Exception as e:
logger.error(f"Failed to update documents with IDs {ids}: {e}")
raise e
def delete_documents(self, ids: List[str] = None, where: Dict[str, Any] = None):
"""
Deletes documents from the ChromaDB collection by ID or metadata filter.
Args:
ids: List of document IDs to delete.
where: A dictionary for metadata filtering (e.g., {"source": "old_file.txt"}).
"""
# --- Financial Ministry Adaptation ---
# Implement error handling and retry logic.
# Add logging to record which documents were deleted and why (if using where).
# ------------------------------------
try:
if ids:
self.vectordb._collection.delete(ids=ids) # Accessing the underlying collection
logger.info(f"Deleted documents with IDs: {ids}")
elif where:
self.vectordb._collection.delete(where=where) # Accessing the underlying collection
logger.info(f"Deleted documents matching metadata filter: {where}")
else:
logger.warning("Delete called without specifying ids or where filter.")
except Exception as e:
logger.error(f"Failed to delete documents (ids: {ids}, where: {where}): {e}")
raise e
def get_documents(self, ids: List[str] = None, where: Dict[str, Any] = None,
where_document: Dict[str, Any] = None, limit: int = None,
offset: int = None, include: List[str] = None) -> Dict[str, List[Any]]:
"""
Retrieves documents and their details from the ChromaDB collection.
Args:
ids: List of document IDs to retrieve.
where: Metadata filter.
where_document: Document content filter.
limit: Maximum number of results.
offset: Offset for pagination.
include: List of fields to include (e.g., ['metadatas', 'documents']). IDs are always included.
Returns:
A dictionary containing the retrieved data (ids, documents, metadatas, etc.).
"""
# --- Financial Ministry Adaptation ---
# Implement error handling and retry logic.
# Ensure sensitive metadata is handled appropriately if retrieved.
# ------------------------------------
try:
# Default include to metadatas and documents if not specified
if include is None:
include = ['metadatas', 'documents'] # Default as per Chroma docs
results = self.vectordb._collection.get( # Accessing the underlying collection
ids=ids,
where=where,
where_document=where_document,
limit=limit,
offset=offset,
include=include
)
logger.debug(f"Retrieved {len(results.get('ids', []))} documents from ChromaDB.")
return results
except Exception as e:
logger.error(f"Failed to retrieve documents from ChromaDB: {e}")
raise e
def as_retriever(self, search_kwargs: Dict[str, Any] = None):
"""
Returns a Langchain Retriever instance for the Chroma collection.
Args:
search_kwargs: Arguments for the retriever (e.g., {"k": 5}).
Returns:
A Langchain Retriever.
"""
# --- Financial Ministry Adaptation ---
# Consider adding default search_kwargs here if not provided.
# Ensure the retriever uses the configured embedding function.
# ------------------------------------
if search_kwargs is None:
search_kwargs = {}
# Langchain's .as_retriever method automatically uses the embedding_function
# provided during Chroma initialization.
return self.vectordb.as_retriever(search_kwargs=search_kwargs) # cite: query_pipeline.py |