Spaces:
Runtime error
Runtime error
File size: 7,553 Bytes
a6fd1a3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
# build_v5.py
import time
import gc
import os
import logging
from tqdm import tqdm
from concurrent.futures import ProcessPoolExecutor, as_completed
# Import các hàm cần thiết
import config
from db.weaviateDB import connect_to_weaviate
from rag_components import (
get_huggingface_embeddings,
create_weaviate_schema_if_not_exists,
ingest_chunks_with_native_batching,
filter_and_serialize_complex_metadata
)
from utils.process_data import process_single_file
logger = logging.getLogger(__name__)
# --- CÁC HÀM HỖ TRỢ CHO CHECKPOINTING ---
def load_processed_files() -> set:
"""Đọc file checkpoint và trả về một set các tên file đã xử lý."""
if not os.path.exists(config.CHECKPOINT_FILE):
return set()
with open(config.CHECKPOINT_FILE, 'r', encoding='utf-8') as f:
return {line.strip() for line in f if line.strip()}
def log_processed_file(filename: str):
"""Ghi tên file đã xử lý thành công vào file checkpoint."""
with open(config.CHECKPOINT_FILE, 'a', encoding='utf-8') as f:
f.write(filename + '\n')
def clear_weaviate_collection(client, collection_name: str):
"""Hàm helper để xóa collection và file checkpoint."""
try:
if client.collections.exists(collection_name):
logger.warning(f"🗑️ Đang xóa collection '{collection_name}'...")
client.collections.delete(collection_name)
logger.info(f"✅ Đã xóa collection '{collection_name}' thành công.")
# Xóa cả file checkpoint khi rebuild
if os.path.exists(config.CHECKPOINT_FILE):
os.remove(config.CHECKPOINT_FILE)
logger.info(f"🗑️ Đã xóa file checkpoint '{config.CHECKPOINT_FILE}'.")
except Exception as e:
logger.error(f"❌ Lỗi khi xóa collection: {e}")
raise e
def build_store_v5(force_rebuild: bool = False, pool_batch_size: int = 50):
"""
Hàm xây dựng Vector Store với Checkpointing và Pool Restart để xử lý các job dài hơi.
Args:
force_rebuild (bool): Xóa dữ liệu cũ và bắt đầu lại từ đầu.
pool_batch_size (int): Số lượng file xử lý trước khi tái khởi động worker pool.
"""
logger.info(f"🚀 Bắt đầu Quá trình Xây dựng Vector Store v5 (Checkpointing & Pool Restart) 🚀")
total_start_time = time.time()
total_chunks_indexed = 0
weaviate_client = None
try:
# --- 1. SETUP PHASE ---
logger.info("⚙️ 1. Thiết lập môi trường...")
device = 'cpu' # Giả định không có GPU
logger.info(f"💻 Sử dụng thiết bị: {device}")
weaviate_client = connect_to_weaviate(run_diagnostics=False)
if not weaviate_client: raise ConnectionError("Không thể kết nối đến Weaviate.")
logger.info("🧠 Tải model embedding (chạy trên CPU)...")
embeddings_model = get_huggingface_embeddings(config.EMBEDDING_MODEL_NAME, device)
if not embeddings_model: raise RuntimeError("Không thể khởi tạo model embedding.")
collection_name = config.WEAVIATE_COLLECTION_NAME
if force_rebuild:
clear_weaviate_collection(weaviate_client, collection_name)
create_weaviate_schema_if_not_exists(weaviate_client, collection_name)
# --- 2. LỌC FILE VÀ CHUẨN BỊ BATCH ---
processed_files = load_processed_files()
all_txt_paths = [os.path.join(config.CORE_DATA_FOLDER, f) for f in os.listdir(config.CORE_DATA_FOLDER) if f.lower().endswith('.txt')]
# Lọc ra những file chưa được xử lý
files_to_process = [path for path in all_txt_paths if os.path.basename(path) not in processed_files]
if not files_to_process:
logger.info("✅ Tất cả các file đã được xử lý. Không có gì để làm.")
return
logger.info(f"🔍 Đã xử lý {len(processed_files)} files. Còn lại {len(files_to_process)} files cần xử lý.")
# --- 3. XỬ LÝ THEO LÔ ĐỂ CHỐNG RÒ RỈ BỘ NHỚ ---
max_workers = os.cpu_count() or 1
# Tạo thanh tiến trình tổng
main_progress_bar = tqdm(total=len(files_to_process), desc="Tổng tiến trình")
# Chia danh sách file thành các lô nhỏ
for i in range(0, len(files_to_process), pool_batch_size):
file_batch = files_to_process[i:i + pool_batch_size]
logger.info(f"\n🔄 Đang xử lý lô {i//pool_batch_size + 1}, gồm {len(file_batch)} files. Tái khởi động Worker Pool...")
# Tạo một Worker Pool MỚI cho mỗi lô
with ProcessPoolExecutor(max_workers=max_workers) as executor:
future_to_file = {executor.submit(process_single_file, path): path for path in file_batch}
for future in as_completed(future_to_file):
path = future_to_file[future]
filename = os.path.basename(path)
try:
chunks_from_file = future.result()
if not chunks_from_file:
logger.warning(f"File '{filename}' không có chunks.")
log_processed_file(filename) # Vẫn ghi nhận là đã xử lý
main_progress_bar.update(1)
continue
processed_chunks = filter_and_serialize_complex_metadata(chunks_from_file)
ingest_chunks_with_native_batching(
client=weaviate_client,
collection_name=collection_name,
chunks=processed_chunks,
embeddings_model=embeddings_model
)
total_chunks_indexed += len(chunks_from_file)
log_processed_file(filename) # Ghi nhận thành công
main_progress_bar.set_description(f"✅ Ingested '{filename}'")
main_progress_bar.update(1) # Cập nhật thanh tiến trình tổng
del chunks_from_file, processed_chunks
gc.collect()
except Exception as e:
logger.error(f"❌ Lỗi khi xử lý file '{filename}': {e}", exc_info=True)
main_progress_bar.close()
logger.info(f"\n✅ Đã ingest thành công thêm {total_chunks_indexed} chunks.")
except (ConnectionError, RuntimeError, Exception) as e:
logger.error(f"💥 Đã xảy ra lỗi nghiêm trọng: {e}", exc_info=True)
finally:
# --- 4. DỌN DẸP ---
logger.info("\n⚙️ 4. Dọn dẹp tài nguyên...")
if 'embeddings_model' in locals(): del embeddings_model
if weaviate_client and weaviate_client.is_connected():
weaviate_client.close()
logger.info("🔌 Đã đóng kết nối Weaviate.")
gc.collect()
total_end_time = time.time()
logger.info(f"⏱️ Tổng thời gian chạy lần này: {total_end_time - total_start_time:.2f} giây")
logger.info("🎉 Chương trình kết thúc! 🎉")
if __name__ == "__main__":
# Khi chạy lại, chỉ cần chạy lệnh này. Nó sẽ tự động tiếp tục.
# Đặt force_rebuild=True chỉ khi bạn muốn xóa sạch và làm lại từ đầu.
build_store_v5(force_rebuild=True) |