File size: 9,421 Bytes
a6fd1a3
 
 
 
97c3215
 
 
a6fd1a3
 
 
 
 
97c3215
 
 
a6fd1a3
 
 
 
 
4f7e10b
97c3215
4f7e10b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
97c3215
 
 
 
4f7e10b
97c3215
4f7e10b
97c3215
 
 
 
 
25e6e74
4f7e10b
97c3215
25e6e74
97c3215
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
253c2e1
a6fd1a3
97c3215
 
a6fd1a3
97c3215
a6fd1a3
97c3215
 
 
 
 
 
 
 
 
 
 
 
a6fd1a3
4f7e10b
 
 
97c3215
253c2e1
97c3215
a6fd1a3
 
97c3215
 
 
 
 
 
 
 
a6fd1a3
 
97c3215
 
 
 
 
25e6e74
97c3215
 
 
 
a6fd1a3
 
97c3215
a6fd1a3
 
 
 
 
 
97c3215
 
 
 
 
 
 
 
 
253c2e1
97c3215
 
 
253c2e1
a6fd1a3
 
97c3215
a6fd1a3
 
97c3215
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
from pathlib import Path
from llama_parse import LlamaParse
import docx
import pypandoc
from datetime import datetime, timezone
from rag_components import get_huggingface_embeddings
from io import BytesIO
import os
import logging
from langchain_core.documents import Document
import config
from db.weaviateDB import connect_to_weaviate
from db.mongoDB import mongo_db

from fastapi.concurrency import run_in_threadpool
logger = logging.getLogger(__name__)

from rag_components import create_weaviate_schema_if_not_exists, ingest_chunks_with_native_batching
from utils.process_data import hierarchical_split_law_document,extract_document_metadata,clean_document_text,infer_field, infer_entity_type, filter_and_serialize_complex_metadata

class ApiKeyManager:
    """Quản lý một danh sách các API key."""
    def __init__(self, api_key_string: str):
        self.keys = [key.strip() for key in api_key_string.split(',') if key.strip()]
        if not self.keys:
            raise ValueError("Chuỗi API key không hợp lệ hoặc rỗng.")
        self.current_key_index = 0
        logger.info(f"Đã khởi tạo Key Manager với {len(self.keys)} key.")

    def get_key(self) -> str | None:
        """Trả về key hiện tại."""
        if self.current_key_index < len(self.keys):
            return self.keys[self.current_key_index]
        return None

    def get_next_key(self) -> str | None:
        """Chuyển sang key tiếp theo và trả về nó."""
        self.current_key_index += 1
        logger.warning(f"Chuyển sang sử dụng API key tiếp theo (index: {self.current_key_index}).")
        return self.get_key()

    def reset(self):
        """Reset lại index để bắt đầu từ key đầu tiên cho lần xử lý mới."""
        self.current_key_index = 0
        logger.info("Key Manager đã được reset.")

llama_key_manager = ApiKeyManager(config.LLAMA_CLOUD_API_KEYS)

# --- SỬA LẠI HÀM NÀY ĐỂ NHẬN STREAM ---
def convert_to_text_content(source_stream: BytesIO, original_filename: str) -> str:
    """Trích xuất nội dung text từ một stream trong bộ nhớ."""
    file_extension = Path(original_filename).suffix.lower()
    logger.info(f"Extracting content from: {original_filename}")
    content = ""

    source_stream.seek(0)

    if file_extension == ".pdf":
        # Do LlamaParse cần đường dẫn file, chúng ta sẽ ghi stream ra file tạm MỘT LẦN
        # và tái sử dụng đường dẫn này trong vòng lặp thử key.

        # Tạo tên file tạm duy nhất để tránh xung đột khi xử lý song song

        temp_pdf_path = f"/tmp/{original_filename}"

        try:
            with open(temp_pdf_path, "wb") as f:
                f.write(source_stream.getvalue())

            # Reset key manager trước khi bắt đầu để đảm bảo nó luôn thử từ key đầu tiên
            llama_key_manager.reset()

            # Bắt đầu vòng lặp để thử các API key
            while (current_key := llama_key_manager.get_key()) is not None:
                try:
                    logger.info(f"Đang thử chuyển đổi PDF '{original_filename}' bằng key index: {llama_key_manager.current_key_index}...")
                    parser = LlamaParse(
                        api_key=current_key,
                        result_type="text",
                        verbose=True, # Giữ để debug
                        language="vi"
                    )
                    # Sử dụng đường dẫn file tạm đã tạo
                    documents = parser.load_data([temp_pdf_path])

                    if documents and documents[0].text.strip():
                        content = documents[0].text
                        logger.info(f"✅ Chuyển đổi PDF thành công bằng key index: {llama_key_manager.current_key_index}.")
                        break # Thành công, thoát khỏi vòng lặp
                    else:
                        raise ValueError("LlamaParse trả về nội dung rỗng.")

                except Exception as e:
                    logger.error(f"❌ Lỗi với key index {llama_key_manager.current_key_index} cho file '{original_filename}': {e}")
                    if llama_key_manager.get_next_key() is None:
                        logger.critical("Đã thử hết tất cả các API key nhưng đều thất bại cho file PDF.")
                        raise Exception(f"Không thể chuyển đổi file '{original_filename}' sau khi đã thử tất cả các API key.") from e

            if not content:
                raise ValueError(f"Không thể trích xuất nội dung từ PDF '{original_filename}' sau khi thử các key.")

        finally:
            # Luôn dọn dẹp file tạm, dù thành công hay thất bại
            if os.path.exists(temp_pdf_path):
                os.remove(temp_pdf_path)
                logger.debug(f"Đã dọn dẹp file tạm: {temp_pdf_path}")

    elif file_extension == ".docx":
        # docx có thể đọc trực tiếp từ stream
        doc = docx.Document(source_stream)
        content = '\n'.join([para.text for para in doc.paragraphs])

    elif file_extension == ".doc":
        # pypandoc cần file trên đĩa

        temp_doc_path = f"/tmp/{original_filename}"
        try:
            with open(temp_doc_path, "wb") as f:
                f.write(source_stream.getvalue())
            content = pypandoc.convert_file(temp_doc_path, 'plain', format='doc')
        finally:
             if os.path.exists(temp_doc_path):
                os.remove(temp_doc_path)
                logger.debug(f"Đã dọn dẹp file tạm: {temp_doc_path}")

    else:
        raise ValueError(f"Định dạng file không được hỗ trợ: {file_extension}")

    if not content.strip():
        raise ValueError(f"Nội dung trích xuất từ '{original_filename}' bị rỗng.")

    logger.info(f"✅ Trích xuất nội dung thành công từ stream của file: {original_filename}.")
    return content




async def full_process_and_ingest_pipeline(raw_content: str, filename: str, file_hash: str):
    """
    Pipeline xử lý nền đã được tối ưu hoàn toàn.
    """
    logger.info(f"BACKGROUND TASK: Starting NLP and Ingestion for: {filename} (Hash: {file_hash[:10]}...)")
    weaviate_client = None
    try:
        embeddings_model = get_huggingface_embeddings(
            config.EMBEDDING_MODEL_NAME
        )
        # Giai đoạn 1: Xử lý NLP (CPU-bound)
        doc_metadata = await run_in_threadpool(extract_document_metadata, raw_content, filename)
        doc_metadata["source"] = filename

        cleaned_content = await run_in_threadpool(clean_document_text, raw_content)
        doc_metadata["field"] = await run_in_threadpool(infer_field, cleaned_content, doc_metadata.get("ten_van_ban"))
        doc_metadata["entity_type"] = await run_in_threadpool(infer_entity_type, cleaned_content, doc_metadata.get("field", ""))

        doc_to_split = Document(page_content=cleaned_content, metadata=doc_metadata)
        chunks_from_file = await run_in_threadpool(hierarchical_split_law_document, doc_to_split)

        if not chunks_from_file:
            raise ValueError("File did not yield any chunks after processing.")

        processed_chunks = filter_and_serialize_complex_metadata(chunks_from_file)

        # Giai đoạn 2: Ingest vào Weaviate (I/O-bound và CPU-bound)
        weaviate_client = connect_to_weaviate(run_diagnostics=False)


        await run_in_threadpool(create_weaviate_schema_if_not_exists, weaviate_client, config.WEAVIATE_COLLECTION_NAME)
        await run_in_threadpool(
            ingest_chunks_with_native_batching,
            weaviate_client, config.WEAVIATE_COLLECTION_NAME, processed_chunks, embeddings_model
        )

        # Giai đoạn 3: Ghi log thành công
        await log_processed_hash(file_hash, filename)
        logger.info(f"✅✅✅ All tasks completed for '{filename}'.")

    except Exception as e:
        logger.error(f"❌ FAILED pipeline for '{filename}': {e}", exc_info=True)
        await log_failed_process(file_hash, filename, str(e))
    finally:
        if weaviate_client and weaviate_client.is_connected():
            weaviate_client.close()


async def log_processed_hash(file_hash: str, filename: str, status: str = "SUCCESS", error_message: str = None):
    """Ghi lại trạng thái xử lý (thành công hoặc thất bại) vào MongoDB."""
    try:
        record = {
            "file_hash": file_hash,
            "original_filename": filename,
            "processed_at": datetime.now(timezone.utc),
            "status": status,
        }
        if error_message:
            record["error_message"] = error_message

        await mongo_db.processed_documents.insert_one(record)
    except Exception as e:
        logger.error(f"Could not write process record to MongoDB for hash {file_hash}: {e}")

# Wrapper cho việc log lỗi
async def log_failed_process(file_hash: str, filename: str, error_message: str):
    await log_processed_hash(file_hash, filename, status="FAILED", error_message=error_message)

# Hàm kiểm tra trùng lặp
async def check_if_hash_exists(file_hash: str) -> bool:
    count = await mongo_db.processed_documents.count_documents({"file_hash": file_hash, "status": "SUCCESS"})
    return count > 0