api / src /ingestion_orchestrator /orchestrator.py
Chandima Prabhath
Refactor code structure for improved readability and maintainability
10b392a
# src/ingestion_orchestrator/orchestrator.py
from src.data_loader.loader import load_documents
from src.document_processor.processor import process_documents
from src.embedding_generator.embedder import EmbeddingGenerator
from src.vector_store_manager.chroma_manager import ChromaManager
from config.settings import DOCS_FOLDER
import logging
logger = logging.getLogger(__name__)
class IngestionOrchestrator:
"""
Orchestrates the end-to-end data ingestion pipeline.
"""
def __init__(self):
# Initialize the necessary components
try:
self.embedding_generator = EmbeddingGenerator()
self.vector_store_manager = ChromaManager(self.embedding_generator)
logger.info("Initialized ingestion orchestrator components.")
except Exception as e:
logger.critical(f"Failed to initialize ingestion orchestrator components: {e}")
raise e
def run_ingestion_pipeline(self, docs_folder: str = DOCS_FOLDER):
"""
Runs the complete ingestion pipeline: loads, processes, and embeds documents.
Args:
docs_folder: The folder containing the source documents.
"""
logger.info(f"Starting ingestion pipeline from folder: {docs_folder}")
# 1. Load documents
# --- Financial Ministry Adaptation ---
# Implement logic to identify *new* or *modified* documents
# instead of reloading everything each time for efficiency.
# Handle potential large number of files efficiently.
# ------------------------------------
raw_documents = load_documents(docs_folder)
if not raw_documents:
logger.warning("No documents loaded. Ingestion pipeline finished.")
return
# 2. Process documents (split and extract metadata)
processed_chunks = process_documents(raw_documents)
if not processed_chunks:
logger.warning("No processed chunks generated. Ingestion pipeline finished.")
return
# 3. Add documents to the vector store
# The add_documents method handles embedding internally
# --- Financial Ministry Adaptation ---
# Implement logic for updating or deleting documents if the source data changed.
# This requires comparing current source data with what's in ChromaDB (e.g., by source path and modification date or version).
# Use the vector_store_manager's update_documents and delete_documents methods.
# Implement batching for adding documents to avoid overwhelming ChromaDB or the backend.
# ------------------------------------
self.vector_store_manager.add_documents(processed_chunks)
logger.info("Ingestion pipeline finished successfully.")
# --- Financial Ministry Adaptation ---
# TODO: Add methods for handling updates and deletions specifically.
# def update_changed_documents(self, changed_files: List[str]): pass
# def delete_removed_documents(self, removed_files: List[str]): pass
# ------------------------------------