File size: 8,934 Bytes
e83f5e9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
import os
import time
import uuid
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyPDFLoader
from langchain_google_genai import GoogleGenerativeAIEmbeddings
import logging
from app.database.pinecone import get_pinecone_index, init_pinecone
# Cấu hình logging
logger = logging.getLogger(__name__)
# Khởi tạo embeddings model
embeddings_model = GoogleGenerativeAIEmbeddings(model="models/embedding-001")
class PDFProcessor:
"""Lớp xử lý file PDF và tạo embeddings"""
def __init__(self, index_name="testbot768", namespace="Default"):
"""Khởi tạo với tên index và namespace Pinecone mặc định"""
self.index_name = index_name
self.namespace = namespace
self.pinecone_index = None
def _init_pinecone_connection(self):
"""Khởi tạo kết nối đến Pinecone"""
try:
# Sử dụng singleton pattern từ module database.pinecone
self.pinecone_index = get_pinecone_index()
if not self.pinecone_index:
logger.error("Không thể kết nối đến Pinecone")
return False
return True
except Exception as e:
logger.error(f"Lỗi khi kết nối Pinecone: {str(e)}")
return False
async def process_pdf(self, file_path, document_id=None, metadata=None, progress_callback=None):
"""
Xử lý file PDF, chia thành chunks và tạo embeddings
Args:
file_path (str): Đường dẫn tới file PDF
document_id (str, optional): ID của tài liệu, nếu không cung cấp sẽ tạo ID mới
metadata (dict, optional): Metadata bổ sung cho tài liệu
progress_callback (callable, optional): Callback function để cập nhật tiến độ
Returns:
dict: Thông tin kết quả xử lý gồm document_id và số chunks đã xử lý
"""
try:
# Khởi tạo kết nối Pinecone nếu chưa có
if not self.pinecone_index:
if not self._init_pinecone_connection():
return {"success": False, "error": "Không thể kết nối đến Pinecone"}
# Tạo document_id nếu không có
if not document_id:
document_id = str(uuid.uuid4())
# Đọc file PDF bằng PyPDFLoader
logger.info(f"Đang đọc file PDF: {file_path}")
if progress_callback:
await progress_callback("pdf_loading", 0.5, "Loading PDF file")
loader = PyPDFLoader(file_path)
pages = loader.load()
# Trích xuất và nối text từ tất cả các trang
all_text = ""
for page in pages:
all_text += page.page_content + "\n"
if progress_callback:
await progress_callback("text_extraction", 0.6, "Extracted text from PDF")
# Chia văn bản thành các chunk
text_splitter = RecursiveCharacterTextSplitter(chunk_size=800, chunk_overlap=300)
chunks = text_splitter.split_text(all_text)
logger.info(f"Đã chia file PDF thành {len(chunks)} chunks")
if progress_callback:
await progress_callback("chunking", 0.7, f"Split document into {len(chunks)} chunks")
# Xử lý embedding cho từng chunk và upsert lên Pinecone
vectors = []
for i, chunk in enumerate(chunks):
# Cập nhật tiến độ embedding
if progress_callback and i % 5 == 0: # Cập nhật sau mỗi 5 chunks để tránh quá nhiều thông báo
embedding_progress = 0.7 + (0.3 * (i / len(chunks)))
await progress_callback("embedding", embedding_progress, f"Processing chunk {i+1}/{len(chunks)}")
# Tạo vector embedding cho từng chunk
vector = embeddings_model.embed_query(chunk)
# Chuẩn bị metadata cho vector
vector_metadata = {
"document_id": document_id,
"chunk_index": i,
"text": chunk
}
# Thêm metadata bổ sung nếu có
if metadata:
for key, value in metadata.items():
if key not in vector_metadata:
vector_metadata[key] = value
# Thêm vector vào danh sách để upsert
vectors.append({
"id": f"{document_id}_{i}",
"values": vector,
"metadata": vector_metadata
})
# Upsert mỗi 100 vectors để tránh quá lớn
if len(vectors) >= 100:
await self._upsert_vectors(vectors)
vectors = []
# Upsert các vectors còn lại
if vectors:
await self._upsert_vectors(vectors)
logger.info(f"Đã embedding và lưu {len(chunks)} chunks từ PDF với document_id: {document_id}")
# Final progress update
if progress_callback:
await progress_callback("completed", 1.0, "PDF processing complete")
return {
"success": True,
"document_id": document_id,
"chunks_processed": len(chunks),
"total_text_length": len(all_text)
}
except Exception as e:
logger.error(f"Lỗi khi xử lý PDF: {str(e)}")
if progress_callback:
await progress_callback("error", 0, f"Error processing PDF: {str(e)}")
return {
"success": False,
"error": str(e)
}
async def _upsert_vectors(self, vectors):
"""Upsert vectors vào Pinecone"""
try:
if not vectors:
return
result = self.pinecone_index.upsert(
vectors=vectors,
namespace=self.namespace
)
logger.info(f"Đã upsert {len(vectors)} vectors vào Pinecone")
return result
except Exception as e:
logger.error(f"Lỗi khi upsert vectors: {str(e)}")
raise
async def delete_namespace(self):
"""
Xóa toàn bộ vectors trong namespace hiện tại (tương đương xoá namespace).
"""
# Khởi tạo kết nối nếu cần
if not self.pinecone_index and not self._init_pinecone_connection():
return {"success": False, "error": "Không thể kết nối đến Pinecone"}
try:
# delete_all=True sẽ xóa toàn bộ vectors trong namespace
result = self.pinecone_index.delete(
delete_all=True,
namespace=self.namespace
)
logger.info(f"Đã xóa namespace '{self.namespace}' (tất cả vectors).")
return {"success": True, "detail": result}
except Exception as e:
logger.error(f"Lỗi khi xóa namespace '{self.namespace}': {e}")
return {"success": False, "error": str(e)}
async def list_documents(self):
"""Lấy danh sách tất cả document_id từ Pinecone"""
try:
# Khởi tạo kết nối Pinecone nếu chưa có
if not self.pinecone_index:
if not self._init_pinecone_connection():
return {"success": False, "error": "Không thể kết nối đến Pinecone"}
# Lấy thông tin index
stats = self.pinecone_index.describe_index_stats()
# Thực hiện truy vấn để lấy danh sách tất cả document_id duy nhất
# Phương pháp này có thể không hiệu quả với dataset lớn, nhưng là cách đơn giản nhất
# Trong thực tế, nên lưu danh sách document_id trong một database riêng
return {
"success": True,
"total_vectors": stats.get('total_vector_count', 0),
"namespace": self.namespace,
"index_name": self.index_name
}
except Exception as e:
logger.error(f"Lỗi khi lấy danh sách documents: {str(e)}")
return {
"success": False,
"error": str(e)
} |