# build_v5.py import time import gc import os import logging from tqdm import tqdm from concurrent.futures import ProcessPoolExecutor, as_completed # Import các hàm cần thiết import config from db.weaviateDB import connect_to_weaviate from rag_components import ( get_huggingface_embeddings, create_weaviate_schema_if_not_exists, ingest_chunks_with_native_batching, filter_and_serialize_complex_metadata ) from utils.process_data import process_single_file logger = logging.getLogger(__name__) # --- CÁC HÀM HỖ TRỢ CHO CHECKPOINTING --- def load_processed_files() -> set: """Đọc file checkpoint và trả về một set các tên file đã xử lý.""" if not os.path.exists(config.CHECKPOINT_FILE): return set() with open(config.CHECKPOINT_FILE, 'r', encoding='utf-8') as f: return {line.strip() for line in f if line.strip()} def log_processed_file(filename: str): """Ghi tên file đã xử lý thành công vào file checkpoint.""" with open(config.CHECKPOINT_FILE, 'a', encoding='utf-8') as f: f.write(filename + '\n') def clear_weaviate_collection(client, collection_name: str): """Hàm helper để xóa collection và file checkpoint.""" try: if client.collections.exists(collection_name): logger.warning(f"🗑️ Đang xóa collection '{collection_name}'...") client.collections.delete(collection_name) logger.info(f"✅ Đã xóa collection '{collection_name}' thành công.") # Xóa cả file checkpoint khi rebuild if os.path.exists(config.CHECKPOINT_FILE): os.remove(config.CHECKPOINT_FILE) logger.info(f"🗑️ Đã xóa file checkpoint '{config.CHECKPOINT_FILE}'.") except Exception as e: logger.error(f"❌ Lỗi khi xóa collection: {e}") raise e def build_store_v5(force_rebuild: bool = False, pool_batch_size: int = 50): """ Hàm xây dựng Vector Store với Checkpointing và Pool Restart để xử lý các job dài hơi. Args: force_rebuild (bool): Xóa dữ liệu cũ và bắt đầu lại từ đầu. pool_batch_size (int): Số lượng file xử lý trước khi tái khởi động worker pool. """ logger.info(f"🚀 Bắt đầu Quá trình Xây dựng Vector Store v5 (Checkpointing & Pool Restart) 🚀") total_start_time = time.time() total_chunks_indexed = 0 weaviate_client = None try: # --- 1. SETUP PHASE --- logger.info("⚙️ 1. Thiết lập môi trường...") device = 'cpu' # Giả định không có GPU logger.info(f"💻 Sử dụng thiết bị: {device}") weaviate_client = connect_to_weaviate(run_diagnostics=False) if not weaviate_client: raise ConnectionError("Không thể kết nối đến Weaviate.") logger.info("🧠 Tải model embedding (chạy trên CPU)...") embeddings_model = get_huggingface_embeddings(config.EMBEDDING_MODEL_NAME, device) if not embeddings_model: raise RuntimeError("Không thể khởi tạo model embedding.") collection_name = config.WEAVIATE_COLLECTION_NAME if force_rebuild: clear_weaviate_collection(weaviate_client, collection_name) create_weaviate_schema_if_not_exists(weaviate_client, collection_name) # --- 2. LỌC FILE VÀ CHUẨN BỊ BATCH --- processed_files = load_processed_files() all_txt_paths = [os.path.join(config.CORE_DATA_FOLDER, f) for f in os.listdir(config.CORE_DATA_FOLDER) if f.lower().endswith('.txt')] # Lọc ra những file chưa được xử lý files_to_process = [path for path in all_txt_paths if os.path.basename(path) not in processed_files] if not files_to_process: logger.info("✅ Tất cả các file đã được xử lý. Không có gì để làm.") return logger.info(f"🔍 Đã xử lý {len(processed_files)} files. Còn lại {len(files_to_process)} files cần xử lý.") # --- 3. XỬ LÝ THEO LÔ ĐỂ CHỐNG RÒ RỈ BỘ NHỚ --- max_workers = os.cpu_count() or 1 # Tạo thanh tiến trình tổng main_progress_bar = tqdm(total=len(files_to_process), desc="Tổng tiến trình") # Chia danh sách file thành các lô nhỏ for i in range(0, len(files_to_process), pool_batch_size): file_batch = files_to_process[i:i + pool_batch_size] logger.info(f"\n🔄 Đang xử lý lô {i//pool_batch_size + 1}, gồm {len(file_batch)} files. Tái khởi động Worker Pool...") # Tạo một Worker Pool MỚI cho mỗi lô with ProcessPoolExecutor(max_workers=max_workers) as executor: future_to_file = {executor.submit(process_single_file, path): path for path in file_batch} for future in as_completed(future_to_file): path = future_to_file[future] filename = os.path.basename(path) try: chunks_from_file = future.result() if not chunks_from_file: logger.warning(f"File '{filename}' không có chunks.") log_processed_file(filename) # Vẫn ghi nhận là đã xử lý main_progress_bar.update(1) continue processed_chunks = filter_and_serialize_complex_metadata(chunks_from_file) ingest_chunks_with_native_batching( client=weaviate_client, collection_name=collection_name, chunks=processed_chunks, embeddings_model=embeddings_model ) total_chunks_indexed += len(chunks_from_file) log_processed_file(filename) # Ghi nhận thành công main_progress_bar.set_description(f"✅ Ingested '{filename}'") main_progress_bar.update(1) # Cập nhật thanh tiến trình tổng del chunks_from_file, processed_chunks gc.collect() except Exception as e: logger.error(f"❌ Lỗi khi xử lý file '{filename}': {e}", exc_info=True) main_progress_bar.close() logger.info(f"\n✅ Đã ingest thành công thêm {total_chunks_indexed} chunks.") except (ConnectionError, RuntimeError, Exception) as e: logger.error(f"💥 Đã xảy ra lỗi nghiêm trọng: {e}", exc_info=True) finally: # --- 4. DỌN DẸP --- logger.info("\n⚙️ 4. Dọn dẹp tài nguyên...") if 'embeddings_model' in locals(): del embeddings_model if weaviate_client and weaviate_client.is_connected(): weaviate_client.close() logger.info("🔌 Đã đóng kết nối Weaviate.") gc.collect() total_end_time = time.time() logger.info(f"⏱️ Tổng thời gian chạy lần này: {total_end_time - total_start_time:.2f} giây") logger.info("🎉 Chương trình kết thúc! 🎉") if __name__ == "__main__": # Khi chạy lại, chỉ cần chạy lệnh này. Nó sẽ tự động tiếp tục. # Đặt force_rebuild=True chỉ khi bạn muốn xóa sạch và làm lại từ đầu. build_store_v5(force_rebuild=True)