Spaces:
Runtime error
Runtime error
File size: 9,421 Bytes
a6fd1a3 97c3215 a6fd1a3 97c3215 a6fd1a3 4f7e10b 97c3215 4f7e10b 97c3215 4f7e10b 97c3215 4f7e10b 97c3215 25e6e74 4f7e10b 97c3215 25e6e74 97c3215 253c2e1 a6fd1a3 97c3215 a6fd1a3 97c3215 a6fd1a3 97c3215 a6fd1a3 4f7e10b 97c3215 253c2e1 97c3215 a6fd1a3 97c3215 a6fd1a3 97c3215 25e6e74 97c3215 a6fd1a3 97c3215 a6fd1a3 97c3215 253c2e1 97c3215 253c2e1 a6fd1a3 97c3215 a6fd1a3 97c3215 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 |
from pathlib import Path
from llama_parse import LlamaParse
import docx
import pypandoc
from datetime import datetime, timezone
from rag_components import get_huggingface_embeddings
from io import BytesIO
import os
import logging
from langchain_core.documents import Document
import config
from db.weaviateDB import connect_to_weaviate
from db.mongoDB import mongo_db
from fastapi.concurrency import run_in_threadpool
logger = logging.getLogger(__name__)
from rag_components import create_weaviate_schema_if_not_exists, ingest_chunks_with_native_batching
from utils.process_data import hierarchical_split_law_document,extract_document_metadata,clean_document_text,infer_field, infer_entity_type, filter_and_serialize_complex_metadata
class ApiKeyManager:
"""Quản lý một danh sách các API key."""
def __init__(self, api_key_string: str):
self.keys = [key.strip() for key in api_key_string.split(',') if key.strip()]
if not self.keys:
raise ValueError("Chuỗi API key không hợp lệ hoặc rỗng.")
self.current_key_index = 0
logger.info(f"Đã khởi tạo Key Manager với {len(self.keys)} key.")
def get_key(self) -> str | None:
"""Trả về key hiện tại."""
if self.current_key_index < len(self.keys):
return self.keys[self.current_key_index]
return None
def get_next_key(self) -> str | None:
"""Chuyển sang key tiếp theo và trả về nó."""
self.current_key_index += 1
logger.warning(f"Chuyển sang sử dụng API key tiếp theo (index: {self.current_key_index}).")
return self.get_key()
def reset(self):
"""Reset lại index để bắt đầu từ key đầu tiên cho lần xử lý mới."""
self.current_key_index = 0
logger.info("Key Manager đã được reset.")
llama_key_manager = ApiKeyManager(config.LLAMA_CLOUD_API_KEYS)
# --- SỬA LẠI HÀM NÀY ĐỂ NHẬN STREAM ---
def convert_to_text_content(source_stream: BytesIO, original_filename: str) -> str:
"""Trích xuất nội dung text từ một stream trong bộ nhớ."""
file_extension = Path(original_filename).suffix.lower()
logger.info(f"Extracting content from: {original_filename}")
content = ""
source_stream.seek(0)
if file_extension == ".pdf":
# Do LlamaParse cần đường dẫn file, chúng ta sẽ ghi stream ra file tạm MỘT LẦN
# và tái sử dụng đường dẫn này trong vòng lặp thử key.
# Tạo tên file tạm duy nhất để tránh xung đột khi xử lý song song
temp_pdf_path = f"/tmp/{original_filename}"
try:
with open(temp_pdf_path, "wb") as f:
f.write(source_stream.getvalue())
# Reset key manager trước khi bắt đầu để đảm bảo nó luôn thử từ key đầu tiên
llama_key_manager.reset()
# Bắt đầu vòng lặp để thử các API key
while (current_key := llama_key_manager.get_key()) is not None:
try:
logger.info(f"Đang thử chuyển đổi PDF '{original_filename}' bằng key index: {llama_key_manager.current_key_index}...")
parser = LlamaParse(
api_key=current_key,
result_type="text",
verbose=True, # Giữ để debug
language="vi"
)
# Sử dụng đường dẫn file tạm đã tạo
documents = parser.load_data([temp_pdf_path])
if documents and documents[0].text.strip():
content = documents[0].text
logger.info(f"✅ Chuyển đổi PDF thành công bằng key index: {llama_key_manager.current_key_index}.")
break # Thành công, thoát khỏi vòng lặp
else:
raise ValueError("LlamaParse trả về nội dung rỗng.")
except Exception as e:
logger.error(f"❌ Lỗi với key index {llama_key_manager.current_key_index} cho file '{original_filename}': {e}")
if llama_key_manager.get_next_key() is None:
logger.critical("Đã thử hết tất cả các API key nhưng đều thất bại cho file PDF.")
raise Exception(f"Không thể chuyển đổi file '{original_filename}' sau khi đã thử tất cả các API key.") from e
if not content:
raise ValueError(f"Không thể trích xuất nội dung từ PDF '{original_filename}' sau khi thử các key.")
finally:
# Luôn dọn dẹp file tạm, dù thành công hay thất bại
if os.path.exists(temp_pdf_path):
os.remove(temp_pdf_path)
logger.debug(f"Đã dọn dẹp file tạm: {temp_pdf_path}")
elif file_extension == ".docx":
# docx có thể đọc trực tiếp từ stream
doc = docx.Document(source_stream)
content = '\n'.join([para.text for para in doc.paragraphs])
elif file_extension == ".doc":
# pypandoc cần file trên đĩa
temp_doc_path = f"/tmp/{original_filename}"
try:
with open(temp_doc_path, "wb") as f:
f.write(source_stream.getvalue())
content = pypandoc.convert_file(temp_doc_path, 'plain', format='doc')
finally:
if os.path.exists(temp_doc_path):
os.remove(temp_doc_path)
logger.debug(f"Đã dọn dẹp file tạm: {temp_doc_path}")
else:
raise ValueError(f"Định dạng file không được hỗ trợ: {file_extension}")
if not content.strip():
raise ValueError(f"Nội dung trích xuất từ '{original_filename}' bị rỗng.")
logger.info(f"✅ Trích xuất nội dung thành công từ stream của file: {original_filename}.")
return content
async def full_process_and_ingest_pipeline(raw_content: str, filename: str, file_hash: str):
"""
Pipeline xử lý nền đã được tối ưu hoàn toàn.
"""
logger.info(f"BACKGROUND TASK: Starting NLP and Ingestion for: {filename} (Hash: {file_hash[:10]}...)")
weaviate_client = None
try:
embeddings_model = get_huggingface_embeddings(
config.EMBEDDING_MODEL_NAME
)
# Giai đoạn 1: Xử lý NLP (CPU-bound)
doc_metadata = await run_in_threadpool(extract_document_metadata, raw_content, filename)
doc_metadata["source"] = filename
cleaned_content = await run_in_threadpool(clean_document_text, raw_content)
doc_metadata["field"] = await run_in_threadpool(infer_field, cleaned_content, doc_metadata.get("ten_van_ban"))
doc_metadata["entity_type"] = await run_in_threadpool(infer_entity_type, cleaned_content, doc_metadata.get("field", ""))
doc_to_split = Document(page_content=cleaned_content, metadata=doc_metadata)
chunks_from_file = await run_in_threadpool(hierarchical_split_law_document, doc_to_split)
if not chunks_from_file:
raise ValueError("File did not yield any chunks after processing.")
processed_chunks = filter_and_serialize_complex_metadata(chunks_from_file)
# Giai đoạn 2: Ingest vào Weaviate (I/O-bound và CPU-bound)
weaviate_client = connect_to_weaviate(run_diagnostics=False)
await run_in_threadpool(create_weaviate_schema_if_not_exists, weaviate_client, config.WEAVIATE_COLLECTION_NAME)
await run_in_threadpool(
ingest_chunks_with_native_batching,
weaviate_client, config.WEAVIATE_COLLECTION_NAME, processed_chunks, embeddings_model
)
# Giai đoạn 3: Ghi log thành công
await log_processed_hash(file_hash, filename)
logger.info(f"✅✅✅ All tasks completed for '{filename}'.")
except Exception as e:
logger.error(f"❌ FAILED pipeline for '{filename}': {e}", exc_info=True)
await log_failed_process(file_hash, filename, str(e))
finally:
if weaviate_client and weaviate_client.is_connected():
weaviate_client.close()
async def log_processed_hash(file_hash: str, filename: str, status: str = "SUCCESS", error_message: str = None):
"""Ghi lại trạng thái xử lý (thành công hoặc thất bại) vào MongoDB."""
try:
record = {
"file_hash": file_hash,
"original_filename": filename,
"processed_at": datetime.now(timezone.utc),
"status": status,
}
if error_message:
record["error_message"] = error_message
await mongo_db.processed_documents.insert_one(record)
except Exception as e:
logger.error(f"Could not write process record to MongoDB for hash {file_hash}: {e}")
# Wrapper cho việc log lỗi
async def log_failed_process(file_hash: str, filename: str, error_message: str):
await log_processed_hash(file_hash, filename, status="FAILED", error_message=error_message)
# Hàm kiểm tra trùng lặp
async def check_if_hash_exists(file_hash: str) -> bool:
count = await mongo_db.processed_documents.count_documents({"file_hash": file_hash, "status": "SUCCESS"})
return count > 0
|