Spaces:
Runtime error
Runtime error
from pathlib import Path | |
from llama_parse import LlamaParse | |
import docx | |
import pypandoc | |
from datetime import datetime, timezone | |
from rag_components import get_huggingface_embeddings | |
from io import BytesIO | |
import os | |
import logging | |
from langchain_core.documents import Document | |
import config | |
from db.weaviateDB import connect_to_weaviate | |
from db.mongoDB import mongo_db | |
from fastapi.concurrency import run_in_threadpool | |
logger = logging.getLogger(__name__) | |
from rag_components import create_weaviate_schema_if_not_exists, ingest_chunks_with_native_batching | |
from utils.process_data import hierarchical_split_law_document,extract_document_metadata,clean_document_text,infer_field, infer_entity_type, filter_and_serialize_complex_metadata | |
class ApiKeyManager: | |
"""Quản lý một danh sách các API key.""" | |
def __init__(self, api_key_string: str): | |
self.keys = [key.strip() for key in api_key_string.split(',') if key.strip()] | |
if not self.keys: | |
raise ValueError("Chuỗi API key không hợp lệ hoặc rỗng.") | |
self.current_key_index = 0 | |
logger.info(f"Đã khởi tạo Key Manager với {len(self.keys)} key.") | |
def get_key(self) -> str | None: | |
"""Trả về key hiện tại.""" | |
if self.current_key_index < len(self.keys): | |
return self.keys[self.current_key_index] | |
return None | |
def get_next_key(self) -> str | None: | |
"""Chuyển sang key tiếp theo và trả về nó.""" | |
self.current_key_index += 1 | |
logger.warning(f"Chuyển sang sử dụng API key tiếp theo (index: {self.current_key_index}).") | |
return self.get_key() | |
def reset(self): | |
"""Reset lại index để bắt đầu từ key đầu tiên cho lần xử lý mới.""" | |
self.current_key_index = 0 | |
logger.info("Key Manager đã được reset.") | |
llama_key_manager = ApiKeyManager(config.LLAMA_CLOUD_API_KEYS) | |
# --- SỬA LẠI HÀM NÀY ĐỂ NHẬN STREAM --- | |
def convert_to_text_content(source_stream: BytesIO, original_filename: str) -> str: | |
"""Trích xuất nội dung text từ một stream trong bộ nhớ.""" | |
file_extension = Path(original_filename).suffix.lower() | |
logger.info(f"Extracting content from: {original_filename}") | |
content = "" | |
source_stream.seek(0) | |
if file_extension == ".pdf": | |
# Do LlamaParse cần đường dẫn file, chúng ta sẽ ghi stream ra file tạm MỘT LẦN | |
# và tái sử dụng đường dẫn này trong vòng lặp thử key. | |
# Tạo tên file tạm duy nhất để tránh xung đột khi xử lý song song | |
temp_pdf_path = f"/tmp/{original_filename}" | |
try: | |
with open(temp_pdf_path, "wb") as f: | |
f.write(source_stream.getvalue()) | |
# Reset key manager trước khi bắt đầu để đảm bảo nó luôn thử từ key đầu tiên | |
llama_key_manager.reset() | |
# Bắt đầu vòng lặp để thử các API key | |
while (current_key := llama_key_manager.get_key()) is not None: | |
try: | |
logger.info(f"Đang thử chuyển đổi PDF '{original_filename}' bằng key index: {llama_key_manager.current_key_index}...") | |
parser = LlamaParse( | |
api_key=current_key, | |
result_type="text", | |
verbose=True, # Giữ để debug | |
language="vi" | |
) | |
# Sử dụng đường dẫn file tạm đã tạo | |
documents = parser.load_data([temp_pdf_path]) | |
if documents and documents[0].text.strip(): | |
content = documents[0].text | |
logger.info(f"✅ Chuyển đổi PDF thành công bằng key index: {llama_key_manager.current_key_index}.") | |
break # Thành công, thoát khỏi vòng lặp | |
else: | |
raise ValueError("LlamaParse trả về nội dung rỗng.") | |
except Exception as e: | |
logger.error(f"❌ Lỗi với key index {llama_key_manager.current_key_index} cho file '{original_filename}': {e}") | |
if llama_key_manager.get_next_key() is None: | |
logger.critical("Đã thử hết tất cả các API key nhưng đều thất bại cho file PDF.") | |
raise Exception(f"Không thể chuyển đổi file '{original_filename}' sau khi đã thử tất cả các API key.") from e | |
if not content: | |
raise ValueError(f"Không thể trích xuất nội dung từ PDF '{original_filename}' sau khi thử các key.") | |
finally: | |
# Luôn dọn dẹp file tạm, dù thành công hay thất bại | |
if os.path.exists(temp_pdf_path): | |
os.remove(temp_pdf_path) | |
logger.debug(f"Đã dọn dẹp file tạm: {temp_pdf_path}") | |
elif file_extension == ".docx": | |
# docx có thể đọc trực tiếp từ stream | |
doc = docx.Document(source_stream) | |
content = '\n'.join([para.text for para in doc.paragraphs]) | |
elif file_extension == ".doc": | |
# pypandoc cần file trên đĩa | |
temp_doc_path = f"/tmp/{original_filename}" | |
try: | |
with open(temp_doc_path, "wb") as f: | |
f.write(source_stream.getvalue()) | |
content = pypandoc.convert_file(temp_doc_path, 'plain', format='doc') | |
finally: | |
if os.path.exists(temp_doc_path): | |
os.remove(temp_doc_path) | |
logger.debug(f"Đã dọn dẹp file tạm: {temp_doc_path}") | |
else: | |
raise ValueError(f"Định dạng file không được hỗ trợ: {file_extension}") | |
if not content.strip(): | |
raise ValueError(f"Nội dung trích xuất từ '{original_filename}' bị rỗng.") | |
logger.info(f"✅ Trích xuất nội dung thành công từ stream của file: {original_filename}.") | |
return content | |
async def full_process_and_ingest_pipeline(raw_content: str, filename: str, file_hash: str): | |
""" | |
Pipeline xử lý nền đã được tối ưu hoàn toàn. | |
""" | |
logger.info(f"BACKGROUND TASK: Starting NLP and Ingestion for: {filename} (Hash: {file_hash[:10]}...)") | |
weaviate_client = None | |
try: | |
embeddings_model = get_huggingface_embeddings( | |
config.EMBEDDING_MODEL_NAME | |
) | |
# Giai đoạn 1: Xử lý NLP (CPU-bound) | |
doc_metadata = await run_in_threadpool(extract_document_metadata, raw_content, filename) | |
doc_metadata["source"] = filename | |
cleaned_content = await run_in_threadpool(clean_document_text, raw_content) | |
doc_metadata["field"] = await run_in_threadpool(infer_field, cleaned_content, doc_metadata.get("ten_van_ban")) | |
doc_metadata["entity_type"] = await run_in_threadpool(infer_entity_type, cleaned_content, doc_metadata.get("field", "")) | |
doc_to_split = Document(page_content=cleaned_content, metadata=doc_metadata) | |
chunks_from_file = await run_in_threadpool(hierarchical_split_law_document, doc_to_split) | |
if not chunks_from_file: | |
raise ValueError("File did not yield any chunks after processing.") | |
processed_chunks = filter_and_serialize_complex_metadata(chunks_from_file) | |
# Giai đoạn 2: Ingest vào Weaviate (I/O-bound và CPU-bound) | |
weaviate_client = connect_to_weaviate(run_diagnostics=False) | |
await run_in_threadpool(create_weaviate_schema_if_not_exists, weaviate_client, config.WEAVIATE_COLLECTION_NAME) | |
await run_in_threadpool( | |
ingest_chunks_with_native_batching, | |
weaviate_client, config.WEAVIATE_COLLECTION_NAME, processed_chunks, embeddings_model | |
) | |
# Giai đoạn 3: Ghi log thành công | |
await log_processed_hash(file_hash, filename) | |
logger.info(f"✅✅✅ All tasks completed for '{filename}'.") | |
except Exception as e: | |
logger.error(f"❌ FAILED pipeline for '{filename}': {e}", exc_info=True) | |
await log_failed_process(file_hash, filename, str(e)) | |
finally: | |
if weaviate_client and weaviate_client.is_connected(): | |
weaviate_client.close() | |
async def log_processed_hash(file_hash: str, filename: str, status: str = "SUCCESS", error_message: str = None): | |
"""Ghi lại trạng thái xử lý (thành công hoặc thất bại) vào MongoDB.""" | |
try: | |
record = { | |
"file_hash": file_hash, | |
"original_filename": filename, | |
"processed_at": datetime.now(timezone.utc), | |
"status": status, | |
} | |
if error_message: | |
record["error_message"] = error_message | |
await mongo_db.processed_documents.insert_one(record) | |
except Exception as e: | |
logger.error(f"Could not write process record to MongoDB for hash {file_hash}: {e}") | |
# Wrapper cho việc log lỗi | |
async def log_failed_process(file_hash: str, filename: str, error_message: str): | |
await log_processed_hash(file_hash, filename, status="FAILED", error_message=error_message) | |
# Hàm kiểm tra trùng lặp | |
async def check_if_hash_exists(file_hash: str) -> bool: | |
count = await mongo_db.processed_documents.count_documents({"file_hash": file_hash, "status": "SUCCESS"}) | |
return count > 0 | |