Pix-Agent / app /utils /pdf_processor.py
Cuong2004's picture
version 1.1
e83f5e9
raw
history blame
8.93 kB
import os
import time
import uuid
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyPDFLoader
from langchain_google_genai import GoogleGenerativeAIEmbeddings
import logging
from app.database.pinecone import get_pinecone_index, init_pinecone
# Cấu hình logging
logger = logging.getLogger(__name__)
# Khởi tạo embeddings model
embeddings_model = GoogleGenerativeAIEmbeddings(model="models/embedding-001")
class PDFProcessor:
"""Lớp xử lý file PDF và tạo embeddings"""
def __init__(self, index_name="testbot768", namespace="Default"):
"""Khởi tạo với tên index và namespace Pinecone mặc định"""
self.index_name = index_name
self.namespace = namespace
self.pinecone_index = None
def _init_pinecone_connection(self):
"""Khởi tạo kết nối đến Pinecone"""
try:
# Sử dụng singleton pattern từ module database.pinecone
self.pinecone_index = get_pinecone_index()
if not self.pinecone_index:
logger.error("Không thể kết nối đến Pinecone")
return False
return True
except Exception as e:
logger.error(f"Lỗi khi kết nối Pinecone: {str(e)}")
return False
async def process_pdf(self, file_path, document_id=None, metadata=None, progress_callback=None):
"""
Xử lý file PDF, chia thành chunks và tạo embeddings
Args:
file_path (str): Đường dẫn tới file PDF
document_id (str, optional): ID của tài liệu, nếu không cung cấp sẽ tạo ID mới
metadata (dict, optional): Metadata bổ sung cho tài liệu
progress_callback (callable, optional): Callback function để cập nhật tiến độ
Returns:
dict: Thông tin kết quả xử lý gồm document_id và số chunks đã xử lý
"""
try:
# Khởi tạo kết nối Pinecone nếu chưa có
if not self.pinecone_index:
if not self._init_pinecone_connection():
return {"success": False, "error": "Không thể kết nối đến Pinecone"}
# Tạo document_id nếu không có
if not document_id:
document_id = str(uuid.uuid4())
# Đọc file PDF bằng PyPDFLoader
logger.info(f"Đang đọc file PDF: {file_path}")
if progress_callback:
await progress_callback("pdf_loading", 0.5, "Loading PDF file")
loader = PyPDFLoader(file_path)
pages = loader.load()
# Trích xuất và nối text từ tất cả các trang
all_text = ""
for page in pages:
all_text += page.page_content + "\n"
if progress_callback:
await progress_callback("text_extraction", 0.6, "Extracted text from PDF")
# Chia văn bản thành các chunk
text_splitter = RecursiveCharacterTextSplitter(chunk_size=800, chunk_overlap=300)
chunks = text_splitter.split_text(all_text)
logger.info(f"Đã chia file PDF thành {len(chunks)} chunks")
if progress_callback:
await progress_callback("chunking", 0.7, f"Split document into {len(chunks)} chunks")
# Xử lý embedding cho từng chunk và upsert lên Pinecone
vectors = []
for i, chunk in enumerate(chunks):
# Cập nhật tiến độ embedding
if progress_callback and i % 5 == 0: # Cập nhật sau mỗi 5 chunks để tránh quá nhiều thông báo
embedding_progress = 0.7 + (0.3 * (i / len(chunks)))
await progress_callback("embedding", embedding_progress, f"Processing chunk {i+1}/{len(chunks)}")
# Tạo vector embedding cho từng chunk
vector = embeddings_model.embed_query(chunk)
# Chuẩn bị metadata cho vector
vector_metadata = {
"document_id": document_id,
"chunk_index": i,
"text": chunk
}
# Thêm metadata bổ sung nếu có
if metadata:
for key, value in metadata.items():
if key not in vector_metadata:
vector_metadata[key] = value
# Thêm vector vào danh sách để upsert
vectors.append({
"id": f"{document_id}_{i}",
"values": vector,
"metadata": vector_metadata
})
# Upsert mỗi 100 vectors để tránh quá lớn
if len(vectors) >= 100:
await self._upsert_vectors(vectors)
vectors = []
# Upsert các vectors còn lại
if vectors:
await self._upsert_vectors(vectors)
logger.info(f"Đã embedding và lưu {len(chunks)} chunks từ PDF với document_id: {document_id}")
# Final progress update
if progress_callback:
await progress_callback("completed", 1.0, "PDF processing complete")
return {
"success": True,
"document_id": document_id,
"chunks_processed": len(chunks),
"total_text_length": len(all_text)
}
except Exception as e:
logger.error(f"Lỗi khi xử lý PDF: {str(e)}")
if progress_callback:
await progress_callback("error", 0, f"Error processing PDF: {str(e)}")
return {
"success": False,
"error": str(e)
}
async def _upsert_vectors(self, vectors):
"""Upsert vectors vào Pinecone"""
try:
if not vectors:
return
result = self.pinecone_index.upsert(
vectors=vectors,
namespace=self.namespace
)
logger.info(f"Đã upsert {len(vectors)} vectors vào Pinecone")
return result
except Exception as e:
logger.error(f"Lỗi khi upsert vectors: {str(e)}")
raise
async def delete_namespace(self):
"""
Xóa toàn bộ vectors trong namespace hiện tại (tương đương xoá namespace).
"""
# Khởi tạo kết nối nếu cần
if not self.pinecone_index and not self._init_pinecone_connection():
return {"success": False, "error": "Không thể kết nối đến Pinecone"}
try:
# delete_all=True sẽ xóa toàn bộ vectors trong namespace
result = self.pinecone_index.delete(
delete_all=True,
namespace=self.namespace
)
logger.info(f"Đã xóa namespace '{self.namespace}' (tất cả vectors).")
return {"success": True, "detail": result}
except Exception as e:
logger.error(f"Lỗi khi xóa namespace '{self.namespace}': {e}")
return {"success": False, "error": str(e)}
async def list_documents(self):
"""Lấy danh sách tất cả document_id từ Pinecone"""
try:
# Khởi tạo kết nối Pinecone nếu chưa có
if not self.pinecone_index:
if not self._init_pinecone_connection():
return {"success": False, "error": "Không thể kết nối đến Pinecone"}
# Lấy thông tin index
stats = self.pinecone_index.describe_index_stats()
# Thực hiện truy vấn để lấy danh sách tất cả document_id duy nhất
# Phương pháp này có thể không hiệu quả với dataset lớn, nhưng là cách đơn giản nhất
# Trong thực tế, nên lưu danh sách document_id trong một database riêng
return {
"success": True,
"total_vectors": stats.get('total_vector_count', 0),
"namespace": self.namespace,
"index_name": self.index_name
}
except Exception as e:
logger.error(f"Lỗi khi lấy danh sách documents: {str(e)}")
return {
"success": False,
"error": str(e)
}