Multimodal-RAG / ingestions /ingestion.py
3v324v23's picture
fix
be398ac
# core/ingestion/ingestion_service.py
import os
from typing import List, Optional, Callable
from utils.logger import logger
from qdrant_client import QdrantClient
from core.data_processing.text_processor import TextProcessor
from core.data_processing.audio_processor import AudioProcessor
from core.data_processing.image_processor import ImageProcessor
from core.embeddings.text_embedding_model import TextEmbeddingModel
from core.embeddings.image_embedding_model import ImageEmbeddingModel
from core.embeddings.audio_embedding_model import AudioEmbeddingModel
from core.retrieval.vector_db_manager import VectorDBManager
class IngestionService:
def __init__(self, client: QdrantClient):
logger.info("Initializing IngestionService (Stateless)...")
self.client = client
self.text_processor = TextProcessor()
self.image_processor = ImageProcessor()
self.audio_processor = AudioProcessor()
self.text_embedder = TextEmbeddingModel()
self.image_embedder = ImageEmbeddingModel()
self.audio_embedder = AudioEmbeddingModel()
text_embedding_dim = self.text_embedder.model.get_sentence_embedding_dimension()
self.text_db_manager = VectorDBManager(
client=self.client,
collection_name="text_collection",
embedding_dim=text_embedding_dim
)
image_embedding_dim = self.image_embedder.model.config.hidden_size
self.image_vector_db_manager = VectorDBManager(
client=self.client,
collection_name="image_collection",
embedding_dim=image_embedding_dim
)
audio_embedding_dim = self.audio_embedder.model.config.projection_dim
self.audio_vector_db_manager = VectorDBManager(
client=self.client,
collection_name="audio_collection",
embedding_dim=audio_embedding_dim
)
logger.info("IngestionService initialized successfully.")
def ingest_files(self, file_paths: List[str]):
'''Ingest files without displaying progress bar'''
return self.ingest_files_with_progress(file_paths, None)
def ingest_files_with_progress(self, file_paths: List[str], progress_callback: Optional[Callable] = None):
"""
Turn on progress bar for tracking
"""
logger.info(f"Starting ingestion for {len(file_paths)} files...")
# Kiểm tra và xử lý progress_callback an toàn
def safe_progress(value, desc=""):
try:
if progress_callback is not None:
progress_callback(value, desc=desc)
except Exception as e:
logger.warning(f"Progress callback error: {e}")
safe_progress(0.4, desc="Starting file processing...")
all_chunks_to_process = []
# 1. Walk through files to split chunks
for i, file_path in enumerate(file_paths):
try:
base_progress = 0.4 + (i / len(file_paths)) * 0.3 # 40% -> 70%
file_name = os.path.basename(file_path)
safe_progress(base_progress, desc=f"Processing file {i+1}/{len(file_paths)}: {file_name}")
file_ext = os.path.splitext(file_path)[1].lower()
chunks = []
safe_progress(base_progress + 0.01, desc=f"Reading {file_name}...")
if file_ext in ['.txt']:
chunks = self.text_processor.process(file_path)
elif file_ext in ['.png', '.jpg', '.jpeg', '.bmp', '.gif']:
chunks = self.image_processor.process(file_path)
elif file_ext in ['.wav', '.mp3']:
chunks = self.audio_processor.process(file_path)
else:
logger.warning(f"Unsupported file type '{file_ext}' for file: {file_path}. Skipping.")
continue
# Kiểm tra chunks có hợp lệ không
if not chunks or len(chunks) == 0:
logger.warning(f"No chunks generated from file: {file_path}")
continue
safe_progress(base_progress + 0.02, desc=f"Generated {len(chunks)} chunks from {file_name}")
all_chunks_to_process.extend(chunks)
except Exception as e:
logger.error(f"Error processing file {file_path}: {e}")
continue
if not all_chunks_to_process:
logger.warning("No processable chunks were generated from the provided files.")
safe_progress(1.0, desc="No chunks to process")
return
logger.info(f"Generated {len(all_chunks_to_process)} total chunks. Now generating embeddings...")
safe_progress(0.7, desc=f"Generated {len(all_chunks_to_process)} chunks. Starting embeddings...")
# 2. Create embeddings and add to batch
text_embeddings_batch, text_metadatas_batch = [], []
audio_embeddings_batch, audio_metadatas_batch = [], []
image_embeddings_batch, image_metadatas_batch = [], []
BATCH_SIZE = 32
for i, chunk_data in enumerate(all_chunks_to_process):
try:
base_progress = 0.7 + (i / len(all_chunks_to_process)) * 0.25 # 70% -> 95%
# Kiểm tra chunk_data có hợp lệ không
if not chunk_data or 'metadata' not in chunk_data or 'content' not in chunk_data:
logger.warning(f"Invalid chunk data at index {i}, skipping...")
continue
chunk_type = chunk_data['metadata'].get('type', 'unknown')
content = chunk_data['content']
chunk_id = chunk_data['metadata'].get('chunk_id', f'chunk_{i}')
# Kiểm tra content có hợp lệ không
if not content:
logger.warning(f"Empty content for chunk {chunk_id}, skipping...")
continue
safe_progress(base_progress, desc=f"Processing chunk {i+1}/{len(all_chunks_to_process)} ({chunk_type})")
if chunk_type == "text":
safe_progress(base_progress + 0.001, desc=f"Creating text embedding for chunk {i+1}")
embeddings = self.text_embedder.get_embeddings([content])
if embeddings and len(embeddings) > 0:
text_embeddings_batch.append(embeddings[0])
text_metadatas_batch.append(chunk_data)
else:
logger.warning(f"Failed to generate text embedding for chunk {chunk_id}")
elif chunk_type == "audio":
safe_progress(base_progress + 0.001, desc=f"Creating audio embedding for chunk {i+1}")
embeddings = self.audio_embedder.get_embeddings([content])
if embeddings and len(embeddings) > 0:
audio_embeddings_batch.append(embeddings[0])
audio_metadatas_batch.append(chunk_data)
else:
logger.warning(f"Failed to generate audio embedding for chunk {chunk_id}")
elif chunk_type == "image":
safe_progress(base_progress + 0.001, desc=f"Creating image embedding for chunk {i+1}")
embeddings = self.image_embedder.get_embeddings([content])
if embeddings and len(embeddings) > 0:
image_embeddings_batch.append(embeddings[0])
image_metadatas_batch.append(chunk_data)
else:
logger.warning(f"Failed to generate image embedding for chunk {chunk_id}")
# add batch when reaching BATCH_SIZE
if len(text_embeddings_batch) >= BATCH_SIZE:
safe_progress(base_progress + 0.002, desc=f"Saving batch of {len(text_embeddings_batch)} text embeddings...")
self.text_db_manager.add_vectors(text_embeddings_batch, text_metadatas_batch)
text_embeddings_batch, text_metadatas_batch = [], []
if len(audio_embeddings_batch) >= BATCH_SIZE:
safe_progress(base_progress + 0.002, desc=f"Saving batch of {len(audio_embeddings_batch)} audio embeddings...")
self.audio_vector_db_manager.add_vectors(audio_embeddings_batch, audio_metadatas_batch)
audio_embeddings_batch, audio_metadatas_batch = [], []
if len(image_embeddings_batch) >= BATCH_SIZE:
safe_progress(base_progress + 0.002, desc=f"Saving batch of {len(image_embeddings_batch)} image embeddings...")
self.image_vector_db_manager.add_vectors(image_embeddings_batch, image_metadatas_batch)
image_embeddings_batch, image_metadatas_batch = [], []
except Exception as e:
logger.error(f"Error ingesting chunk {i}: {e}")
continue
safe_progress(0.95, desc="Saving final batches...")
# adding maintaining embeddings
final_operations = []
if text_embeddings_batch:
final_operations.append(("text", len(text_embeddings_batch)))
if audio_embeddings_batch:
final_operations.append(("audio", len(audio_embeddings_batch)))
if image_embeddings_batch:
final_operations.append(("image", len(image_embeddings_batch)))
# Tránh chia cho 0
total_operations = len(final_operations)
if total_operations == 0:
safe_progress(1.0, desc="No final batches to save")
else:
for i, (batch_type, count) in enumerate(final_operations):
try:
current_progress = 0.95 + (i / total_operations) * 0.04 # 95% -> 99%
if batch_type == "text" and text_embeddings_batch:
safe_progress(current_progress, desc=f"Saving final {count} text embeddings...")
self.text_db_manager.add_vectors(text_embeddings_batch, text_metadatas_batch)
elif batch_type == "audio" and audio_embeddings_batch:
safe_progress(current_progress, desc=f"Saving final {count} audio embeddings...")
self.audio_vector_db_manager.add_vectors(audio_embeddings_batch, audio_metadatas_batch)
elif batch_type == "image" and image_embeddings_batch:
safe_progress(current_progress, desc=f"Saving final {count} image embeddings...")
self.image_vector_db_manager.add_vectors(image_embeddings_batch, image_metadatas_batch)
except Exception as e:
logger.error(f"Error saving final batch {batch_type}: {e}")
safe_progress(1.0, desc=f"✅ Successfully ingested {len(file_paths)} files with {len(all_chunks_to_process)} chunks!")
logger.success(f"Successfully completed ingestion for {len(file_paths)} files.")