Spaces:
Running
Running
# src/retrieval_handler/retriever.py | |
from src.embedding_generator.embedder import EmbeddingGenerator | |
from src.vector_store_manager.chroma_manager import ChromaManager | |
from config.settings import TOP_K # cite: query_pipeline.py | |
from typing import List, Dict, Any | |
from langchain.schema import Document # To return retrieved documents | |
import logging | |
logger = logging.getLogger(__name__) | |
class RetrievalHandler: | |
""" | |
Handles the process of retrieving relevant documents from the vector store. | |
""" | |
def __init__(self, embedding_generator: EmbeddingGenerator, vector_store_manager: ChromaManager): | |
self.embedding_generator = embedding_generator | |
self.vector_store_manager = vector_store_manager | |
# Get the Langchain retriever from the ChromaManager | |
# Configure search arguments, including the number of results (k) | |
self.langchain_retriever = self.vector_store_manager.as_retriever(search_kwargs={"k": TOP_K}) # cite: query_pipeline.py | |
logger.info(f"Initialized retrieval handler with TOP_K={TOP_K}") | |
def retrieve_documents(self, query: str, filters: Dict[str, Any] = None) -> List[Document]: | |
""" | |
Retrieves relevant document chunks based on a query and optional filters. | |
Args: | |
query: The user's query string. | |
filters: Optional metadata filters to apply during retrieval. | |
Returns: | |
A list of relevant Langchain Document objects. | |
""" | |
# --- Financial Ministry Adaptation --- | |
# Ensure that filters are correctly passed to the vector_store_manager's get method. | |
# The .as_retriever method's search_kwargs apply to the *similarity search*, | |
# but if you need to filter *before* or *during* the search based on metadata | |
# you might need to use the vector_store_manager's get method directly with 'where'. | |
# The Langchain retriever can handle metadata filters if configured. | |
# Check Langchain documentation for how to pass filters through the retriever. | |
# Example: self.langchain_retriever.invoke(query, config={"filter": filters}) | |
# ------------------------------------ | |
# Using the Langchain retriever with potential filters | |
try: | |
# The Langchain retriever abstracts the embedding step and the Chroma query. | |
# If using filters, the method signature might need adjustment based on Langchain version | |
# and how its retriever handles metadata filters. | |
# As a direct approach using the manager for filtered retrieval: | |
if filters: | |
# This approach bypasses the Langchain retriever's similarity search abstraction | |
# to apply filters directly to the get method. | |
# A more integrated approach might be possible depending on Langchain/Chroma versions. | |
logger.debug(f"Retrieving documents with query '{query}' and filters: {filters}") | |
# First, find document IDs matching filters | |
# Note: This is a simplified approach. For large datasets, filtering first then searching | |
# might not be most efficient depending on index structure. | |
# A better approach is to use filters within the similarity search if the retriever supports it. | |
# Let's stick closer to the spirit of the original retriever chain for now, | |
# assuming filters can be passed or handled by the retriever configuration if needed. | |
# If direct filtered search is needed, adjust to use vector_store_manager.get | |
# For basic retrieval without explicit filtering in the original script's flow: | |
retrieved_docs = self.langchain_retriever.invoke(query) # Uses the configured search_kwargs (like k) | |
logger.info(f"Retrieved {len(retrieved_docs)} documents for query.") | |
return retrieved_docs | |
else: | |
# No filters applied, simple retrieval | |
retrieved_docs = self.langchain_retriever.invoke(query) # cite: query_pipeline.py | |
logger.info(f"Retrieved {len(retrieved_docs)} documents for query.") | |
return retrieved_docs | |
except Exception as e: | |
logger.error(f"Failed to retrieve documents for query '{query}': {e}") | |
# Implement retry logic or return empty list | |
return [] |