File size: 3,091 Bytes
10b392a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# src/ingestion_orchestrator/orchestrator.py
from src.data_loader.loader import load_documents
from src.document_processor.processor import process_documents
from src.embedding_generator.embedder import EmbeddingGenerator
from src.vector_store_manager.chroma_manager import ChromaManager
from config.settings import DOCS_FOLDER
import logging

logger = logging.getLogger(__name__)

class IngestionOrchestrator:
    """
    Orchestrates the end-to-end data ingestion pipeline.
    """
    def __init__(self):
        # Initialize the necessary components
        try:
            self.embedding_generator = EmbeddingGenerator()
            self.vector_store_manager = ChromaManager(self.embedding_generator)
            logger.info("Initialized ingestion orchestrator components.")
        except Exception as e:
             logger.critical(f"Failed to initialize ingestion orchestrator components: {e}")
             raise e


    def run_ingestion_pipeline(self, docs_folder: str = DOCS_FOLDER):
        """
        Runs the complete ingestion pipeline: loads, processes, and embeds documents.

        Args:
            docs_folder: The folder containing the source documents.
        """
        logger.info(f"Starting ingestion pipeline from folder: {docs_folder}")

        # 1. Load documents
        # --- Financial Ministry Adaptation ---
        # Implement logic to identify *new* or *modified* documents
        # instead of reloading everything each time for efficiency.
        # Handle potential large number of files efficiently.
        # ------------------------------------
        raw_documents = load_documents(docs_folder)
        if not raw_documents:
            logger.warning("No documents loaded. Ingestion pipeline finished.")
            return

        # 2. Process documents (split and extract metadata)
        processed_chunks = process_documents(raw_documents)
        if not processed_chunks:
            logger.warning("No processed chunks generated. Ingestion pipeline finished.")
            return

        # 3. Add documents to the vector store
        # The add_documents method handles embedding internally
        # --- Financial Ministry Adaptation ---
        # Implement logic for updating or deleting documents if the source data changed.
        # This requires comparing current source data with what's in ChromaDB (e.g., by source path and modification date or version).
        # Use the vector_store_manager's update_documents and delete_documents methods.
        # Implement batching for adding documents to avoid overwhelming ChromaDB or the backend.
        # ------------------------------------
        self.vector_store_manager.add_documents(processed_chunks)

        logger.info("Ingestion pipeline finished successfully.")

    # --- Financial Ministry Adaptation ---
    # TODO: Add methods for handling updates and deletions specifically.
    # def update_changed_documents(self, changed_files: List[str]): pass
    # def delete_removed_documents(self, removed_files: List[str]): pass
    # ------------------------------------