from pathlib import Path from llama_parse import LlamaParse import docx import pypandoc from datetime import datetime, timezone from rag_components import get_huggingface_embeddings from io import BytesIO import os import logging from langchain_core.documents import Document import config from db.weaviateDB import connect_to_weaviate from db.mongoDB import mongo_db from fastapi.concurrency import run_in_threadpool logger = logging.getLogger(__name__) from rag_components import create_weaviate_schema_if_not_exists, ingest_chunks_with_native_batching from utils.process_data import hierarchical_split_law_document,extract_document_metadata,clean_document_text,infer_field, infer_entity_type, filter_and_serialize_complex_metadata class ApiKeyManager: """Quản lý một danh sách các API key.""" def __init__(self, api_key_string: str): self.keys = [key.strip() for key in api_key_string.split(',') if key.strip()] if not self.keys: raise ValueError("Chuỗi API key không hợp lệ hoặc rỗng.") self.current_key_index = 0 logger.info(f"Đã khởi tạo Key Manager với {len(self.keys)} key.") def get_key(self) -> str | None: """Trả về key hiện tại.""" if self.current_key_index < len(self.keys): return self.keys[self.current_key_index] return None def get_next_key(self) -> str | None: """Chuyển sang key tiếp theo và trả về nó.""" self.current_key_index += 1 logger.warning(f"Chuyển sang sử dụng API key tiếp theo (index: {self.current_key_index}).") return self.get_key() def reset(self): """Reset lại index để bắt đầu từ key đầu tiên cho lần xử lý mới.""" self.current_key_index = 0 logger.info("Key Manager đã được reset.") llama_key_manager = ApiKeyManager(config.LLAMA_CLOUD_API_KEYS) # --- SỬA LẠI HÀM NÀY ĐỂ NHẬN STREAM --- def convert_to_text_content(source_stream: BytesIO, original_filename: str) -> str: """Trích xuất nội dung text từ một stream trong bộ nhớ.""" file_extension = Path(original_filename).suffix.lower() logger.info(f"Extracting content from: {original_filename}") content = "" source_stream.seek(0) if file_extension == ".pdf": # Do LlamaParse cần đường dẫn file, chúng ta sẽ ghi stream ra file tạm MỘT LẦN # và tái sử dụng đường dẫn này trong vòng lặp thử key. # Tạo tên file tạm duy nhất để tránh xung đột khi xử lý song song temp_pdf_path = f"/tmp/{original_filename}" try: with open(temp_pdf_path, "wb") as f: f.write(source_stream.getvalue()) # Reset key manager trước khi bắt đầu để đảm bảo nó luôn thử từ key đầu tiên llama_key_manager.reset() # Bắt đầu vòng lặp để thử các API key while (current_key := llama_key_manager.get_key()) is not None: try: logger.info(f"Đang thử chuyển đổi PDF '{original_filename}' bằng key index: {llama_key_manager.current_key_index}...") parser = LlamaParse( api_key=current_key, result_type="text", verbose=True, # Giữ để debug language="vi" ) # Sử dụng đường dẫn file tạm đã tạo documents = parser.load_data([temp_pdf_path]) if documents and documents[0].text.strip(): content = documents[0].text logger.info(f"✅ Chuyển đổi PDF thành công bằng key index: {llama_key_manager.current_key_index}.") break # Thành công, thoát khỏi vòng lặp else: raise ValueError("LlamaParse trả về nội dung rỗng.") except Exception as e: logger.error(f"❌ Lỗi với key index {llama_key_manager.current_key_index} cho file '{original_filename}': {e}") if llama_key_manager.get_next_key() is None: logger.critical("Đã thử hết tất cả các API key nhưng đều thất bại cho file PDF.") raise Exception(f"Không thể chuyển đổi file '{original_filename}' sau khi đã thử tất cả các API key.") from e if not content: raise ValueError(f"Không thể trích xuất nội dung từ PDF '{original_filename}' sau khi thử các key.") finally: # Luôn dọn dẹp file tạm, dù thành công hay thất bại if os.path.exists(temp_pdf_path): os.remove(temp_pdf_path) logger.debug(f"Đã dọn dẹp file tạm: {temp_pdf_path}") elif file_extension == ".docx": # docx có thể đọc trực tiếp từ stream doc = docx.Document(source_stream) content = '\n'.join([para.text for para in doc.paragraphs]) elif file_extension == ".doc": # pypandoc cần file trên đĩa temp_doc_path = f"/tmp/{original_filename}" try: with open(temp_doc_path, "wb") as f: f.write(source_stream.getvalue()) content = pypandoc.convert_file(temp_doc_path, 'plain', format='doc') finally: if os.path.exists(temp_doc_path): os.remove(temp_doc_path) logger.debug(f"Đã dọn dẹp file tạm: {temp_doc_path}") else: raise ValueError(f"Định dạng file không được hỗ trợ: {file_extension}") if not content.strip(): raise ValueError(f"Nội dung trích xuất từ '{original_filename}' bị rỗng.") logger.info(f"✅ Trích xuất nội dung thành công từ stream của file: {original_filename}.") return content async def full_process_and_ingest_pipeline(raw_content: str, filename: str, file_hash: str): """ Pipeline xử lý nền đã được tối ưu hoàn toàn. """ logger.info(f"BACKGROUND TASK: Starting NLP and Ingestion for: {filename} (Hash: {file_hash[:10]}...)") weaviate_client = None try: embeddings_model = get_huggingface_embeddings( config.EMBEDDING_MODEL_NAME ) # Giai đoạn 1: Xử lý NLP (CPU-bound) doc_metadata = await run_in_threadpool(extract_document_metadata, raw_content, filename) doc_metadata["source"] = filename cleaned_content = await run_in_threadpool(clean_document_text, raw_content) doc_metadata["field"] = await run_in_threadpool(infer_field, cleaned_content, doc_metadata.get("ten_van_ban")) doc_metadata["entity_type"] = await run_in_threadpool(infer_entity_type, cleaned_content, doc_metadata.get("field", "")) doc_to_split = Document(page_content=cleaned_content, metadata=doc_metadata) chunks_from_file = await run_in_threadpool(hierarchical_split_law_document, doc_to_split) if not chunks_from_file: raise ValueError("File did not yield any chunks after processing.") processed_chunks = filter_and_serialize_complex_metadata(chunks_from_file) # Giai đoạn 2: Ingest vào Weaviate (I/O-bound và CPU-bound) weaviate_client = connect_to_weaviate(run_diagnostics=False) await run_in_threadpool(create_weaviate_schema_if_not_exists, weaviate_client, config.WEAVIATE_COLLECTION_NAME) await run_in_threadpool( ingest_chunks_with_native_batching, weaviate_client, config.WEAVIATE_COLLECTION_NAME, processed_chunks, embeddings_model ) # Giai đoạn 3: Ghi log thành công await log_processed_hash(file_hash, filename) logger.info(f"✅✅✅ All tasks completed for '{filename}'.") except Exception as e: logger.error(f"❌ FAILED pipeline for '{filename}': {e}", exc_info=True) await log_failed_process(file_hash, filename, str(e)) finally: if weaviate_client and weaviate_client.is_connected(): weaviate_client.close() async def log_processed_hash(file_hash: str, filename: str, status: str = "SUCCESS", error_message: str = None): """Ghi lại trạng thái xử lý (thành công hoặc thất bại) vào MongoDB.""" try: record = { "file_hash": file_hash, "original_filename": filename, "processed_at": datetime.now(timezone.utc), "status": status, } if error_message: record["error_message"] = error_message await mongo_db.processed_documents.insert_one(record) except Exception as e: logger.error(f"Could not write process record to MongoDB for hash {file_hash}: {e}") # Wrapper cho việc log lỗi async def log_failed_process(file_hash: str, filename: str, error_message: str): await log_processed_hash(file_hash, filename, status="FAILED", error_message=error_message) # Hàm kiểm tra trùng lặp async def check_if_hash_exists(file_hash: str) -> bool: count = await mongo_db.processed_documents.count_documents({"file_hash": file_hash, "status": "SUCCESS"}) return count > 0