Spaces:
Running
Running
# src/ingestion_orchestrator/orchestrator.py | |
from src.data_loader.loader import load_documents | |
from src.document_processor.processor import process_documents | |
from src.embedding_generator.embedder import EmbeddingGenerator | |
from src.vector_store_manager.chroma_manager import ChromaManager | |
from config.settings import DOCS_FOLDER | |
import logging | |
logger = logging.getLogger(__name__) | |
class IngestionOrchestrator: | |
""" | |
Orchestrates the end-to-end data ingestion pipeline. | |
""" | |
def __init__(self): | |
# Initialize the necessary components | |
try: | |
self.embedding_generator = EmbeddingGenerator() | |
self.vector_store_manager = ChromaManager(self.embedding_generator) | |
logger.info("Initialized ingestion orchestrator components.") | |
except Exception as e: | |
logger.critical(f"Failed to initialize ingestion orchestrator components: {e}") | |
raise e | |
def run_ingestion_pipeline(self, docs_folder: str = DOCS_FOLDER): | |
""" | |
Runs the complete ingestion pipeline: loads, processes, and embeds documents. | |
Args: | |
docs_folder: The folder containing the source documents. | |
""" | |
logger.info(f"Starting ingestion pipeline from folder: {docs_folder}") | |
# 1. Load documents | |
# --- Financial Ministry Adaptation --- | |
# Implement logic to identify *new* or *modified* documents | |
# instead of reloading everything each time for efficiency. | |
# Handle potential large number of files efficiently. | |
# ------------------------------------ | |
raw_documents = load_documents(docs_folder) | |
if not raw_documents: | |
logger.warning("No documents loaded. Ingestion pipeline finished.") | |
return | |
# 2. Process documents (split and extract metadata) | |
processed_chunks = process_documents(raw_documents) | |
if not processed_chunks: | |
logger.warning("No processed chunks generated. Ingestion pipeline finished.") | |
return | |
# 3. Add documents to the vector store | |
# The add_documents method handles embedding internally | |
# --- Financial Ministry Adaptation --- | |
# Implement logic for updating or deleting documents if the source data changed. | |
# This requires comparing current source data with what's in ChromaDB (e.g., by source path and modification date or version). | |
# Use the vector_store_manager's update_documents and delete_documents methods. | |
# Implement batching for adding documents to avoid overwhelming ChromaDB or the backend. | |
# ------------------------------------ | |
self.vector_store_manager.add_documents(processed_chunks) | |
logger.info("Ingestion pipeline finished successfully.") | |
# --- Financial Ministry Adaptation --- | |
# TODO: Add methods for handling updates and deletions specifically. | |
# def update_changed_documents(self, changed_files: List[str]): pass | |
# def delete_removed_documents(self, removed_files: List[str]): pass | |
# ------------------------------------ |