Spaces:
Sleeping
Sleeping
# core/ingestion/ingestion_service.py | |
import os | |
from typing import List, Optional, Callable | |
from utils.logger import logger | |
from qdrant_client import QdrantClient | |
from core.data_processing.text_processor import TextProcessor | |
from core.data_processing.audio_processor import AudioProcessor | |
from core.data_processing.image_processor import ImageProcessor | |
from core.embeddings.text_embedding_model import TextEmbeddingModel | |
from core.embeddings.image_embedding_model import ImageEmbeddingModel | |
from core.embeddings.audio_embedding_model import AudioEmbeddingModel | |
from core.retrieval.vector_db_manager import VectorDBManager | |
class IngestionService: | |
def __init__(self, client: QdrantClient): | |
logger.info("Initializing IngestionService (Stateless)...") | |
self.client = client | |
self.text_processor = TextProcessor() | |
self.image_processor = ImageProcessor() | |
self.audio_processor = AudioProcessor() | |
self.text_embedder = TextEmbeddingModel() | |
self.image_embedder = ImageEmbeddingModel() | |
self.audio_embedder = AudioEmbeddingModel() | |
text_embedding_dim = self.text_embedder.model.get_sentence_embedding_dimension() | |
self.text_db_manager = VectorDBManager( | |
client=self.client, | |
collection_name="text_collection", | |
embedding_dim=text_embedding_dim | |
) | |
image_embedding_dim = self.image_embedder.model.config.hidden_size | |
self.image_vector_db_manager = VectorDBManager( | |
client=self.client, | |
collection_name="image_collection", | |
embedding_dim=image_embedding_dim | |
) | |
audio_embedding_dim = self.audio_embedder.model.config.projection_dim | |
self.audio_vector_db_manager = VectorDBManager( | |
client=self.client, | |
collection_name="audio_collection", | |
embedding_dim=audio_embedding_dim | |
) | |
logger.info("IngestionService initialized successfully.") | |
def ingest_files(self, file_paths: List[str]): | |
'''Ingest files without displaying progress bar''' | |
return self.ingest_files_with_progress(file_paths, None) | |
def ingest_files_with_progress(self, file_paths: List[str], progress_callback: Optional[Callable] = None): | |
""" | |
Turn on progress bar for tracking | |
""" | |
logger.info(f"Starting ingestion for {len(file_paths)} files...") | |
# Kiểm tra và xử lý progress_callback an toàn | |
def safe_progress(value, desc=""): | |
try: | |
if progress_callback is not None: | |
progress_callback(value, desc=desc) | |
except Exception as e: | |
logger.warning(f"Progress callback error: {e}") | |
safe_progress(0.4, desc="Starting file processing...") | |
all_chunks_to_process = [] | |
# 1. Walk through files to split chunks | |
for i, file_path in enumerate(file_paths): | |
try: | |
base_progress = 0.4 + (i / len(file_paths)) * 0.3 # 40% -> 70% | |
file_name = os.path.basename(file_path) | |
safe_progress(base_progress, desc=f"Processing file {i+1}/{len(file_paths)}: {file_name}") | |
file_ext = os.path.splitext(file_path)[1].lower() | |
chunks = [] | |
safe_progress(base_progress + 0.01, desc=f"Reading {file_name}...") | |
if file_ext in ['.txt']: | |
chunks = self.text_processor.process(file_path) | |
elif file_ext in ['.png', '.jpg', '.jpeg', '.bmp', '.gif']: | |
chunks = self.image_processor.process(file_path) | |
elif file_ext in ['.wav', '.mp3']: | |
chunks = self.audio_processor.process(file_path) | |
else: | |
logger.warning(f"Unsupported file type '{file_ext}' for file: {file_path}. Skipping.") | |
continue | |
# Kiểm tra chunks có hợp lệ không | |
if not chunks or len(chunks) == 0: | |
logger.warning(f"No chunks generated from file: {file_path}") | |
continue | |
safe_progress(base_progress + 0.02, desc=f"Generated {len(chunks)} chunks from {file_name}") | |
all_chunks_to_process.extend(chunks) | |
except Exception as e: | |
logger.error(f"Error processing file {file_path}: {e}") | |
continue | |
if not all_chunks_to_process: | |
logger.warning("No processable chunks were generated from the provided files.") | |
safe_progress(1.0, desc="No chunks to process") | |
return | |
logger.info(f"Generated {len(all_chunks_to_process)} total chunks. Now generating embeddings...") | |
safe_progress(0.7, desc=f"Generated {len(all_chunks_to_process)} chunks. Starting embeddings...") | |
# 2. Create embeddings and add to batch | |
text_embeddings_batch, text_metadatas_batch = [], [] | |
audio_embeddings_batch, audio_metadatas_batch = [], [] | |
image_embeddings_batch, image_metadatas_batch = [], [] | |
BATCH_SIZE = 32 | |
for i, chunk_data in enumerate(all_chunks_to_process): | |
try: | |
base_progress = 0.7 + (i / len(all_chunks_to_process)) * 0.25 # 70% -> 95% | |
# Kiểm tra chunk_data có hợp lệ không | |
if not chunk_data or 'metadata' not in chunk_data or 'content' not in chunk_data: | |
logger.warning(f"Invalid chunk data at index {i}, skipping...") | |
continue | |
chunk_type = chunk_data['metadata'].get('type', 'unknown') | |
content = chunk_data['content'] | |
chunk_id = chunk_data['metadata'].get('chunk_id', f'chunk_{i}') | |
# Kiểm tra content có hợp lệ không | |
if not content: | |
logger.warning(f"Empty content for chunk {chunk_id}, skipping...") | |
continue | |
safe_progress(base_progress, desc=f"Processing chunk {i+1}/{len(all_chunks_to_process)} ({chunk_type})") | |
if chunk_type == "text": | |
safe_progress(base_progress + 0.001, desc=f"Creating text embedding for chunk {i+1}") | |
embeddings = self.text_embedder.get_embeddings([content]) | |
if embeddings and len(embeddings) > 0: | |
text_embeddings_batch.append(embeddings[0]) | |
text_metadatas_batch.append(chunk_data) | |
else: | |
logger.warning(f"Failed to generate text embedding for chunk {chunk_id}") | |
elif chunk_type == "audio": | |
safe_progress(base_progress + 0.001, desc=f"Creating audio embedding for chunk {i+1}") | |
embeddings = self.audio_embedder.get_embeddings([content]) | |
if embeddings and len(embeddings) > 0: | |
audio_embeddings_batch.append(embeddings[0]) | |
audio_metadatas_batch.append(chunk_data) | |
else: | |
logger.warning(f"Failed to generate audio embedding for chunk {chunk_id}") | |
elif chunk_type == "image": | |
safe_progress(base_progress + 0.001, desc=f"Creating image embedding for chunk {i+1}") | |
embeddings = self.image_embedder.get_embeddings([content]) | |
if embeddings and len(embeddings) > 0: | |
image_embeddings_batch.append(embeddings[0]) | |
image_metadatas_batch.append(chunk_data) | |
else: | |
logger.warning(f"Failed to generate image embedding for chunk {chunk_id}") | |
# add batch when reaching BATCH_SIZE | |
if len(text_embeddings_batch) >= BATCH_SIZE: | |
safe_progress(base_progress + 0.002, desc=f"Saving batch of {len(text_embeddings_batch)} text embeddings...") | |
self.text_db_manager.add_vectors(text_embeddings_batch, text_metadatas_batch) | |
text_embeddings_batch, text_metadatas_batch = [], [] | |
if len(audio_embeddings_batch) >= BATCH_SIZE: | |
safe_progress(base_progress + 0.002, desc=f"Saving batch of {len(audio_embeddings_batch)} audio embeddings...") | |
self.audio_vector_db_manager.add_vectors(audio_embeddings_batch, audio_metadatas_batch) | |
audio_embeddings_batch, audio_metadatas_batch = [], [] | |
if len(image_embeddings_batch) >= BATCH_SIZE: | |
safe_progress(base_progress + 0.002, desc=f"Saving batch of {len(image_embeddings_batch)} image embeddings...") | |
self.image_vector_db_manager.add_vectors(image_embeddings_batch, image_metadatas_batch) | |
image_embeddings_batch, image_metadatas_batch = [], [] | |
except Exception as e: | |
logger.error(f"Error ingesting chunk {i}: {e}") | |
continue | |
safe_progress(0.95, desc="Saving final batches...") | |
# adding maintaining embeddings | |
final_operations = [] | |
if text_embeddings_batch: | |
final_operations.append(("text", len(text_embeddings_batch))) | |
if audio_embeddings_batch: | |
final_operations.append(("audio", len(audio_embeddings_batch))) | |
if image_embeddings_batch: | |
final_operations.append(("image", len(image_embeddings_batch))) | |
# Tránh chia cho 0 | |
total_operations = len(final_operations) | |
if total_operations == 0: | |
safe_progress(1.0, desc="No final batches to save") | |
else: | |
for i, (batch_type, count) in enumerate(final_operations): | |
try: | |
current_progress = 0.95 + (i / total_operations) * 0.04 # 95% -> 99% | |
if batch_type == "text" and text_embeddings_batch: | |
safe_progress(current_progress, desc=f"Saving final {count} text embeddings...") | |
self.text_db_manager.add_vectors(text_embeddings_batch, text_metadatas_batch) | |
elif batch_type == "audio" and audio_embeddings_batch: | |
safe_progress(current_progress, desc=f"Saving final {count} audio embeddings...") | |
self.audio_vector_db_manager.add_vectors(audio_embeddings_batch, audio_metadatas_batch) | |
elif batch_type == "image" and image_embeddings_batch: | |
safe_progress(current_progress, desc=f"Saving final {count} image embeddings...") | |
self.image_vector_db_manager.add_vectors(image_embeddings_batch, image_metadatas_batch) | |
except Exception as e: | |
logger.error(f"Error saving final batch {batch_type}: {e}") | |
safe_progress(1.0, desc=f"✅ Successfully ingested {len(file_paths)} files with {len(all_chunks_to_process)} chunks!") | |
logger.success(f"Successfully completed ingestion for {len(file_paths)} files.") |