# src/vector_store_manager/chroma_manager.py from langchain_chroma import Chroma # cite: embed_pipeline.py, query_pipeline.py from langchain.schema import Document # cite: embed_pipeline.py from config.settings import PERSIST_DIR, CHROMADB_COLLECTION_NAME # cite: embed_pipeline.py, query_pipeline.py from src.embedding_generator.embedder import EmbeddingGenerator import logging from typing import List, Dict, Any logger = logging.getLogger(__name__) class ChromaManager: """ Manages interactions with the ChromaDB vector store. """ def __init__(self, embedding_generator: EmbeddingGenerator): self.embedding_generator = embedding_generator # --- Financial Ministry Adaptation --- # TODO: Configure Chroma client to use a scalable backend (e.g., ClickHouse) # instead of or in addition to persistent_directory for production. # This might involve using chromadb.HttpClient or specific backend configurations. # Handle connection errors and retries to the database backend. # Implement authentication/authorization for ChromaDB access. # ------------------------------------ try: # Initialize Chroma with the embedding function and persistence settings # For production, you might replace persist_directory with client settings # pointing to a ClickHouse backend. self.vectordb = Chroma( persist_directory=PERSIST_DIR, # cite: embed_pipeline.py, query_pipeline.py collection_name=CHROMADB_COLLECTION_NAME, # cite: embed_pipeline.py, query_pipeline.py embedding_function=self.embedding_generator.embedder # Use the Langchain embedder instance ) logger.info(f"Initialized ChromaDB collection: '{CHROMADB_COLLECTION_NAME}' at '{PERSIST_DIR}'") # You might want to check if the collection exists and its health except Exception as e: logger.critical(f"Failed to initialize ChromaDB: {e}") raise e def add_documents(self, chunks: List[Document]): """ Adds document chunks to the ChromaDB collection. Args: chunks: A list of Langchain Document chunks with metadata. """ # --- Financial Ministry Adaptation --- # Implement error handling and retry logic for batch additions. # Consider transactional behavior if adding large batches requires it. # Log successful and failed additions. # Ensure document IDs are managed consistently (e.g., based on source + chunk index or a stable hash). # ------------------------------------ try: # Langchain's add_documents handles embedding internally using the provided embedding_function # Ensure your chunks have unique IDs if you need to update/delete later. # If IDs are not in metadata, Langchain/Chroma might generate them. # For better control, you might generate IDs in document_processor and pass them here. if not chunks: logger.warning("No chunks to add to ChromaDB.") return # If chunks don't have IDs, generate them (simple example) # In a real system, use stable IDs based on source data # chunk_ids = [f"{chunk.metadata.get('source', 'unknown')}_{i}" for i, chunk in enumerate(chunks)] # self.vectordb.add_documents(chunks, ids=chunk_ids) self.vectordb.add_documents(chunks) # Langchain handles IDs if not provided logger.info(f"Added {len(chunks)} chunks to ChromaDB.") except Exception as e: logger.error(f"Failed to add documents to ChromaDB: {e}") # Implement retry logic or raise exception def update_documents(self, ids: List[str], documents: List[str], metadatas: List[Dict[str, Any]]): """ Updates documents in the ChromaDB collection by ID. Args: ids: List of document IDs to update. documents: List of new document content corresponding to IDs. metadatas: List of new metadata dictionaries corresponding to IDs. """ # --- Financial Ministry Adaptation --- # Implement error handling and retry logic. # Validate that IDs exist before attempting to update. # ------------------------------------ try: self.vectordb._collection.update( # Accessing the underlying collection for update/delete ids=ids, documents=documents, metadatas=metadatas ) logger.info(f"Updated documents with IDs: {ids}") except Exception as e: logger.error(f"Failed to update documents with IDs {ids}: {e}") raise e def delete_documents(self, ids: List[str] = None, where: Dict[str, Any] = None): """ Deletes documents from the ChromaDB collection by ID or metadata filter. Args: ids: List of document IDs to delete. where: A dictionary for metadata filtering (e.g., {"source": "old_file.txt"}). """ # --- Financial Ministry Adaptation --- # Implement error handling and retry logic. # Add logging to record which documents were deleted and why (if using where). # ------------------------------------ try: if ids: self.vectordb._collection.delete(ids=ids) # Accessing the underlying collection logger.info(f"Deleted documents with IDs: {ids}") elif where: self.vectordb._collection.delete(where=where) # Accessing the underlying collection logger.info(f"Deleted documents matching metadata filter: {where}") else: logger.warning("Delete called without specifying ids or where filter.") except Exception as e: logger.error(f"Failed to delete documents (ids: {ids}, where: {where}): {e}") raise e def get_documents(self, ids: List[str] = None, where: Dict[str, Any] = None, where_document: Dict[str, Any] = None, limit: int = None, offset: int = None, include: List[str] = None) -> Dict[str, List[Any]]: """ Retrieves documents and their details from the ChromaDB collection. Args: ids: List of document IDs to retrieve. where: Metadata filter. where_document: Document content filter. limit: Maximum number of results. offset: Offset for pagination. include: List of fields to include (e.g., ['metadatas', 'documents']). IDs are always included. Returns: A dictionary containing the retrieved data (ids, documents, metadatas, etc.). """ # --- Financial Ministry Adaptation --- # Implement error handling and retry logic. # Ensure sensitive metadata is handled appropriately if retrieved. # ------------------------------------ try: # Default include to metadatas and documents if not specified if include is None: include = ['metadatas', 'documents'] # Default as per Chroma docs results = self.vectordb._collection.get( # Accessing the underlying collection ids=ids, where=where, where_document=where_document, limit=limit, offset=offset, include=include ) logger.debug(f"Retrieved {len(results.get('ids', []))} documents from ChromaDB.") return results except Exception as e: logger.error(f"Failed to retrieve documents from ChromaDB: {e}") raise e def as_retriever(self, search_kwargs: Dict[str, Any] = None): """ Returns a Langchain Retriever instance for the Chroma collection. Args: search_kwargs: Arguments for the retriever (e.g., {"k": 5}). Returns: A Langchain Retriever. """ # --- Financial Ministry Adaptation --- # Consider adding default search_kwargs here if not provided. # Ensure the retriever uses the configured embedding function. # ------------------------------------ if search_kwargs is None: search_kwargs = {} # Langchain's .as_retriever method automatically uses the embedding_function # provided during Chroma initialization. return self.vectordb.as_retriever(search_kwargs=search_kwargs) # cite: query_pipeline.py