File size: 4,450 Bytes
10b392a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# src/retrieval_handler/retriever.py
from src.embedding_generator.embedder import EmbeddingGenerator
from src.vector_store_manager.chroma_manager import ChromaManager
from config.settings import TOP_K # cite: query_pipeline.py
from typing import List, Dict, Any
from langchain.schema import Document # To return retrieved documents
import logging

logger = logging.getLogger(__name__)

class RetrievalHandler:
    """
    Handles the process of retrieving relevant documents from the vector store.
    """
    def __init__(self, embedding_generator: EmbeddingGenerator, vector_store_manager: ChromaManager):
        self.embedding_generator = embedding_generator
        self.vector_store_manager = vector_store_manager
        # Get the Langchain retriever from the ChromaManager
        # Configure search arguments, including the number of results (k)
        self.langchain_retriever = self.vector_store_manager.as_retriever(search_kwargs={"k": TOP_K}) # cite: query_pipeline.py
        logger.info(f"Initialized retrieval handler with TOP_K={TOP_K}")

    def retrieve_documents(self, query: str, filters: Dict[str, Any] = None) -> List[Document]:
        """
        Retrieves relevant document chunks based on a query and optional filters.

        Args:
            query: The user's query string.
            filters: Optional metadata filters to apply during retrieval.

        Returns:
            A list of relevant Langchain Document objects.
        """
        # --- Financial Ministry Adaptation ---
        # Ensure that filters are correctly passed to the vector_store_manager's get method.
        # The .as_retriever method's search_kwargs apply to the *similarity search*,
        # but if you need to filter *before* or *during* the search based on metadata
        # you might need to use the vector_store_manager's get method directly with 'where'.
        # The Langchain retriever can handle metadata filters if configured.
        # Check Langchain documentation for how to pass filters through the retriever.
        # Example: self.langchain_retriever.invoke(query, config={"filter": filters})
        # ------------------------------------

        # Using the Langchain retriever with potential filters
        try:
            # The Langchain retriever abstracts the embedding step and the Chroma query.
            # If using filters, the method signature might need adjustment based on Langchain version
            # and how its retriever handles metadata filters.
            # As a direct approach using the manager for filtered retrieval:
            if filters:
                 # This approach bypasses the Langchain retriever's similarity search abstraction
                 # to apply filters directly to the get method.
                 # A more integrated approach might be possible depending on Langchain/Chroma versions.
                 logger.debug(f"Retrieving documents with query '{query}' and filters: {filters}")
                 # First, find document IDs matching filters
                 # Note: This is a simplified approach. For large datasets, filtering first then searching
                 # might not be most efficient depending on index structure.
                 # A better approach is to use filters within the similarity search if the retriever supports it.

                 # Let's stick closer to the spirit of the original retriever chain for now,
                 # assuming filters can be passed or handled by the retriever configuration if needed.
                 # If direct filtered search is needed, adjust to use vector_store_manager.get

                 # For basic retrieval without explicit filtering in the original script's flow:
                 retrieved_docs = self.langchain_retriever.invoke(query) # Uses the configured search_kwargs (like k)
                 logger.info(f"Retrieved {len(retrieved_docs)} documents for query.")
                 return retrieved_docs
            else:
                 # No filters applied, simple retrieval
                 retrieved_docs = self.langchain_retriever.invoke(query) # cite: query_pipeline.py
                 logger.info(f"Retrieved {len(retrieved_docs)} documents for query.")
                 return retrieved_docs

        except Exception as e:
            logger.error(f"Failed to retrieve documents for query '{query}': {e}")
            # Implement retry logic or return empty list
            return []