juribot-backend / services /document_service.py
entidi2608's picture
update: retriever
97c3215
from pathlib import Path
from llama_parse import LlamaParse
import docx
import pypandoc
from datetime import datetime, timezone
from rag_components import get_huggingface_embeddings
from io import BytesIO
import os
import logging
from langchain_core.documents import Document
import config
from db.weaviateDB import connect_to_weaviate
from db.mongoDB import mongo_db
from fastapi.concurrency import run_in_threadpool
logger = logging.getLogger(__name__)
from rag_components import create_weaviate_schema_if_not_exists, ingest_chunks_with_native_batching
from utils.process_data import hierarchical_split_law_document,extract_document_metadata,clean_document_text,infer_field, infer_entity_type, filter_and_serialize_complex_metadata
class ApiKeyManager:
"""Quản lý một danh sách các API key."""
def __init__(self, api_key_string: str):
self.keys = [key.strip() for key in api_key_string.split(',') if key.strip()]
if not self.keys:
raise ValueError("Chuỗi API key không hợp lệ hoặc rỗng.")
self.current_key_index = 0
logger.info(f"Đã khởi tạo Key Manager với {len(self.keys)} key.")
def get_key(self) -> str | None:
"""Trả về key hiện tại."""
if self.current_key_index < len(self.keys):
return self.keys[self.current_key_index]
return None
def get_next_key(self) -> str | None:
"""Chuyển sang key tiếp theo và trả về nó."""
self.current_key_index += 1
logger.warning(f"Chuyển sang sử dụng API key tiếp theo (index: {self.current_key_index}).")
return self.get_key()
def reset(self):
"""Reset lại index để bắt đầu từ key đầu tiên cho lần xử lý mới."""
self.current_key_index = 0
logger.info("Key Manager đã được reset.")
llama_key_manager = ApiKeyManager(config.LLAMA_CLOUD_API_KEYS)
# --- SỬA LẠI HÀM NÀY ĐỂ NHẬN STREAM ---
def convert_to_text_content(source_stream: BytesIO, original_filename: str) -> str:
"""Trích xuất nội dung text từ một stream trong bộ nhớ."""
file_extension = Path(original_filename).suffix.lower()
logger.info(f"Extracting content from: {original_filename}")
content = ""
source_stream.seek(0)
if file_extension == ".pdf":
# Do LlamaParse cần đường dẫn file, chúng ta sẽ ghi stream ra file tạm MỘT LẦN
# và tái sử dụng đường dẫn này trong vòng lặp thử key.
# Tạo tên file tạm duy nhất để tránh xung đột khi xử lý song song
temp_pdf_path = f"/tmp/{original_filename}"
try:
with open(temp_pdf_path, "wb") as f:
f.write(source_stream.getvalue())
# Reset key manager trước khi bắt đầu để đảm bảo nó luôn thử từ key đầu tiên
llama_key_manager.reset()
# Bắt đầu vòng lặp để thử các API key
while (current_key := llama_key_manager.get_key()) is not None:
try:
logger.info(f"Đang thử chuyển đổi PDF '{original_filename}' bằng key index: {llama_key_manager.current_key_index}...")
parser = LlamaParse(
api_key=current_key,
result_type="text",
verbose=True, # Giữ để debug
language="vi"
)
# Sử dụng đường dẫn file tạm đã tạo
documents = parser.load_data([temp_pdf_path])
if documents and documents[0].text.strip():
content = documents[0].text
logger.info(f"✅ Chuyển đổi PDF thành công bằng key index: {llama_key_manager.current_key_index}.")
break # Thành công, thoát khỏi vòng lặp
else:
raise ValueError("LlamaParse trả về nội dung rỗng.")
except Exception as e:
logger.error(f"❌ Lỗi với key index {llama_key_manager.current_key_index} cho file '{original_filename}': {e}")
if llama_key_manager.get_next_key() is None:
logger.critical("Đã thử hết tất cả các API key nhưng đều thất bại cho file PDF.")
raise Exception(f"Không thể chuyển đổi file '{original_filename}' sau khi đã thử tất cả các API key.") from e
if not content:
raise ValueError(f"Không thể trích xuất nội dung từ PDF '{original_filename}' sau khi thử các key.")
finally:
# Luôn dọn dẹp file tạm, dù thành công hay thất bại
if os.path.exists(temp_pdf_path):
os.remove(temp_pdf_path)
logger.debug(f"Đã dọn dẹp file tạm: {temp_pdf_path}")
elif file_extension == ".docx":
# docx có thể đọc trực tiếp từ stream
doc = docx.Document(source_stream)
content = '\n'.join([para.text for para in doc.paragraphs])
elif file_extension == ".doc":
# pypandoc cần file trên đĩa
temp_doc_path = f"/tmp/{original_filename}"
try:
with open(temp_doc_path, "wb") as f:
f.write(source_stream.getvalue())
content = pypandoc.convert_file(temp_doc_path, 'plain', format='doc')
finally:
if os.path.exists(temp_doc_path):
os.remove(temp_doc_path)
logger.debug(f"Đã dọn dẹp file tạm: {temp_doc_path}")
else:
raise ValueError(f"Định dạng file không được hỗ trợ: {file_extension}")
if not content.strip():
raise ValueError(f"Nội dung trích xuất từ '{original_filename}' bị rỗng.")
logger.info(f"✅ Trích xuất nội dung thành công từ stream của file: {original_filename}.")
return content
async def full_process_and_ingest_pipeline(raw_content: str, filename: str, file_hash: str):
"""
Pipeline xử lý nền đã được tối ưu hoàn toàn.
"""
logger.info(f"BACKGROUND TASK: Starting NLP and Ingestion for: {filename} (Hash: {file_hash[:10]}...)")
weaviate_client = None
try:
embeddings_model = get_huggingface_embeddings(
config.EMBEDDING_MODEL_NAME
)
# Giai đoạn 1: Xử lý NLP (CPU-bound)
doc_metadata = await run_in_threadpool(extract_document_metadata, raw_content, filename)
doc_metadata["source"] = filename
cleaned_content = await run_in_threadpool(clean_document_text, raw_content)
doc_metadata["field"] = await run_in_threadpool(infer_field, cleaned_content, doc_metadata.get("ten_van_ban"))
doc_metadata["entity_type"] = await run_in_threadpool(infer_entity_type, cleaned_content, doc_metadata.get("field", ""))
doc_to_split = Document(page_content=cleaned_content, metadata=doc_metadata)
chunks_from_file = await run_in_threadpool(hierarchical_split_law_document, doc_to_split)
if not chunks_from_file:
raise ValueError("File did not yield any chunks after processing.")
processed_chunks = filter_and_serialize_complex_metadata(chunks_from_file)
# Giai đoạn 2: Ingest vào Weaviate (I/O-bound và CPU-bound)
weaviate_client = connect_to_weaviate(run_diagnostics=False)
await run_in_threadpool(create_weaviate_schema_if_not_exists, weaviate_client, config.WEAVIATE_COLLECTION_NAME)
await run_in_threadpool(
ingest_chunks_with_native_batching,
weaviate_client, config.WEAVIATE_COLLECTION_NAME, processed_chunks, embeddings_model
)
# Giai đoạn 3: Ghi log thành công
await log_processed_hash(file_hash, filename)
logger.info(f"✅✅✅ All tasks completed for '{filename}'.")
except Exception as e:
logger.error(f"❌ FAILED pipeline for '{filename}': {e}", exc_info=True)
await log_failed_process(file_hash, filename, str(e))
finally:
if weaviate_client and weaviate_client.is_connected():
weaviate_client.close()
async def log_processed_hash(file_hash: str, filename: str, status: str = "SUCCESS", error_message: str = None):
"""Ghi lại trạng thái xử lý (thành công hoặc thất bại) vào MongoDB."""
try:
record = {
"file_hash": file_hash,
"original_filename": filename,
"processed_at": datetime.now(timezone.utc),
"status": status,
}
if error_message:
record["error_message"] = error_message
await mongo_db.processed_documents.insert_one(record)
except Exception as e:
logger.error(f"Could not write process record to MongoDB for hash {file_hash}: {e}")
# Wrapper cho việc log lỗi
async def log_failed_process(file_hash: str, filename: str, error_message: str):
await log_processed_hash(file_hash, filename, status="FAILED", error_message=error_message)
# Hàm kiểm tra trùng lặp
async def check_if_hash_exists(file_hash: str) -> bool:
count = await mongo_db.processed_documents.count_documents({"file_hash": file_hash, "status": "SUCCESS"})
return count > 0