juribot-backend / build_vectorstore.py
entidi2608's picture
Initial backend deployment
a6fd1a3
# build_v5.py
import time
import gc
import os
import logging
from tqdm import tqdm
from concurrent.futures import ProcessPoolExecutor, as_completed
# Import các hàm cần thiết
import config
from db.weaviateDB import connect_to_weaviate
from rag_components import (
get_huggingface_embeddings,
create_weaviate_schema_if_not_exists,
ingest_chunks_with_native_batching,
filter_and_serialize_complex_metadata
)
from utils.process_data import process_single_file
logger = logging.getLogger(__name__)
# --- CÁC HÀM HỖ TRỢ CHO CHECKPOINTING ---
def load_processed_files() -> set:
"""Đọc file checkpoint và trả về một set các tên file đã xử lý."""
if not os.path.exists(config.CHECKPOINT_FILE):
return set()
with open(config.CHECKPOINT_FILE, 'r', encoding='utf-8') as f:
return {line.strip() for line in f if line.strip()}
def log_processed_file(filename: str):
"""Ghi tên file đã xử lý thành công vào file checkpoint."""
with open(config.CHECKPOINT_FILE, 'a', encoding='utf-8') as f:
f.write(filename + '\n')
def clear_weaviate_collection(client, collection_name: str):
"""Hàm helper để xóa collection và file checkpoint."""
try:
if client.collections.exists(collection_name):
logger.warning(f"🗑️ Đang xóa collection '{collection_name}'...")
client.collections.delete(collection_name)
logger.info(f"✅ Đã xóa collection '{collection_name}' thành công.")
# Xóa cả file checkpoint khi rebuild
if os.path.exists(config.CHECKPOINT_FILE):
os.remove(config.CHECKPOINT_FILE)
logger.info(f"🗑️ Đã xóa file checkpoint '{config.CHECKPOINT_FILE}'.")
except Exception as e:
logger.error(f"❌ Lỗi khi xóa collection: {e}")
raise e
def build_store_v5(force_rebuild: bool = False, pool_batch_size: int = 50):
"""
Hàm xây dựng Vector Store với Checkpointing và Pool Restart để xử lý các job dài hơi.
Args:
force_rebuild (bool): Xóa dữ liệu cũ và bắt đầu lại từ đầu.
pool_batch_size (int): Số lượng file xử lý trước khi tái khởi động worker pool.
"""
logger.info(f"🚀 Bắt đầu Quá trình Xây dựng Vector Store v5 (Checkpointing & Pool Restart) 🚀")
total_start_time = time.time()
total_chunks_indexed = 0
weaviate_client = None
try:
# --- 1. SETUP PHASE ---
logger.info("⚙️ 1. Thiết lập môi trường...")
device = 'cpu' # Giả định không có GPU
logger.info(f"💻 Sử dụng thiết bị: {device}")
weaviate_client = connect_to_weaviate(run_diagnostics=False)
if not weaviate_client: raise ConnectionError("Không thể kết nối đến Weaviate.")
logger.info("🧠 Tải model embedding (chạy trên CPU)...")
embeddings_model = get_huggingface_embeddings(config.EMBEDDING_MODEL_NAME, device)
if not embeddings_model: raise RuntimeError("Không thể khởi tạo model embedding.")
collection_name = config.WEAVIATE_COLLECTION_NAME
if force_rebuild:
clear_weaviate_collection(weaviate_client, collection_name)
create_weaviate_schema_if_not_exists(weaviate_client, collection_name)
# --- 2. LỌC FILE VÀ CHUẨN BỊ BATCH ---
processed_files = load_processed_files()
all_txt_paths = [os.path.join(config.CORE_DATA_FOLDER, f) for f in os.listdir(config.CORE_DATA_FOLDER) if f.lower().endswith('.txt')]
# Lọc ra những file chưa được xử lý
files_to_process = [path for path in all_txt_paths if os.path.basename(path) not in processed_files]
if not files_to_process:
logger.info("✅ Tất cả các file đã được xử lý. Không có gì để làm.")
return
logger.info(f"🔍 Đã xử lý {len(processed_files)} files. Còn lại {len(files_to_process)} files cần xử lý.")
# --- 3. XỬ LÝ THEO LÔ ĐỂ CHỐNG RÒ RỈ BỘ NHỚ ---
max_workers = os.cpu_count() or 1
# Tạo thanh tiến trình tổng
main_progress_bar = tqdm(total=len(files_to_process), desc="Tổng tiến trình")
# Chia danh sách file thành các lô nhỏ
for i in range(0, len(files_to_process), pool_batch_size):
file_batch = files_to_process[i:i + pool_batch_size]
logger.info(f"\n🔄 Đang xử lý lô {i//pool_batch_size + 1}, gồm {len(file_batch)} files. Tái khởi động Worker Pool...")
# Tạo một Worker Pool MỚI cho mỗi lô
with ProcessPoolExecutor(max_workers=max_workers) as executor:
future_to_file = {executor.submit(process_single_file, path): path for path in file_batch}
for future in as_completed(future_to_file):
path = future_to_file[future]
filename = os.path.basename(path)
try:
chunks_from_file = future.result()
if not chunks_from_file:
logger.warning(f"File '{filename}' không có chunks.")
log_processed_file(filename) # Vẫn ghi nhận là đã xử lý
main_progress_bar.update(1)
continue
processed_chunks = filter_and_serialize_complex_metadata(chunks_from_file)
ingest_chunks_with_native_batching(
client=weaviate_client,
collection_name=collection_name,
chunks=processed_chunks,
embeddings_model=embeddings_model
)
total_chunks_indexed += len(chunks_from_file)
log_processed_file(filename) # Ghi nhận thành công
main_progress_bar.set_description(f"✅ Ingested '{filename}'")
main_progress_bar.update(1) # Cập nhật thanh tiến trình tổng
del chunks_from_file, processed_chunks
gc.collect()
except Exception as e:
logger.error(f"❌ Lỗi khi xử lý file '{filename}': {e}", exc_info=True)
main_progress_bar.close()
logger.info(f"\n✅ Đã ingest thành công thêm {total_chunks_indexed} chunks.")
except (ConnectionError, RuntimeError, Exception) as e:
logger.error(f"💥 Đã xảy ra lỗi nghiêm trọng: {e}", exc_info=True)
finally:
# --- 4. DỌN DẸP ---
logger.info("\n⚙️ 4. Dọn dẹp tài nguyên...")
if 'embeddings_model' in locals(): del embeddings_model
if weaviate_client and weaviate_client.is_connected():
weaviate_client.close()
logger.info("🔌 Đã đóng kết nối Weaviate.")
gc.collect()
total_end_time = time.time()
logger.info(f"⏱️ Tổng thời gian chạy lần này: {total_end_time - total_start_time:.2f} giây")
logger.info("🎉 Chương trình kết thúc! 🎉")
if __name__ == "__main__":
# Khi chạy lại, chỉ cần chạy lệnh này. Nó sẽ tự động tiếp tục.
# Đặt force_rebuild=True chỉ khi bạn muốn xóa sạch và làm lại từ đầu.
build_store_v5(force_rebuild=True)