|
import os |
|
import logging |
|
import uuid |
|
import pinecone |
|
from app.utils.pinecone_fix import PineconeConnectionManager, check_connection |
|
import time |
|
from typing import List, Dict, Any, Optional |
|
|
|
|
|
from langchain_community.document_loaders import PyPDFLoader |
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
from langchain_google_genai import GoogleGenerativeAIEmbeddings |
|
import google.generativeai as genai |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
class PDFProcessor: |
|
"""Process PDF files and create embeddings in Pinecone""" |
|
|
|
def __init__(self, index_name="testbot768", namespace="Default", api_key=None, vector_db_id=None, mock_mode=False, correlation_id=None): |
|
self.index_name = index_name |
|
self.namespace = namespace |
|
self.api_key = api_key |
|
self.vector_db_id = vector_db_id |
|
self.pinecone_index = None |
|
self.mock_mode = False |
|
self.correlation_id = correlation_id or str(uuid.uuid4())[:8] |
|
self.google_api_key = os.environ.get("GOOGLE_API_KEY") |
|
|
|
|
|
if self.api_key: |
|
try: |
|
|
|
logger.info(f"[{self.correlation_id}] Initializing Pinecone connection to {self.index_name}") |
|
self.pinecone_index = PineconeConnectionManager.get_index(self.api_key, self.index_name) |
|
logger.info(f"[{self.correlation_id}] Successfully connected to Pinecone index {self.index_name}") |
|
except Exception as e: |
|
logger.error(f"[{self.correlation_id}] Failed to initialize Pinecone: {str(e)}") |
|
|
|
|
|
async def process_pdf(self, file_path, document_id=None, metadata=None, progress_callback=None): |
|
"""Process a PDF file and create vector embeddings |
|
|
|
This method: |
|
1. Extracts text from PDF using PyPDFLoader |
|
2. Splits text into chunks using RecursiveCharacterTextSplitter |
|
3. Creates embeddings using Google Gemini model |
|
4. Stores embeddings in Pinecone |
|
""" |
|
logger.info(f"[{self.correlation_id}] Processing PDF: {file_path}") |
|
|
|
try: |
|
|
|
if metadata is None: |
|
metadata = {} |
|
|
|
|
|
if document_id is None: |
|
document_id = str(uuid.uuid4()) |
|
|
|
|
|
metadata["document_id"] = document_id |
|
|
|
|
|
actual_namespace = f"vdb-{self.vector_db_id}" if self.vector_db_id else self.namespace |
|
|
|
|
|
logger.info(f"[{self.correlation_id}] Extracting text from PDF: {file_path}") |
|
if progress_callback: |
|
await progress_callback(None, document_id, "text_extraction", 0.2, "Extracting text from PDF") |
|
|
|
loader = PyPDFLoader(file_path) |
|
documents = loader.load() |
|
total_text_length = sum(len(doc.page_content) for doc in documents) |
|
|
|
logger.info(f"[{self.correlation_id}] Extracted {len(documents)} pages, total text length: {total_text_length}") |
|
|
|
|
|
if progress_callback: |
|
await progress_callback(None, document_id, "chunking", 0.4, "Splitting text into chunks") |
|
|
|
text_splitter = RecursiveCharacterTextSplitter( |
|
chunk_size=1000, |
|
chunk_overlap=100, |
|
length_function=len, |
|
separators=["\n\n", "\n", " ", ""] |
|
) |
|
|
|
chunks = text_splitter.split_documents(documents) |
|
|
|
logger.info(f"[{self.correlation_id}] Split into {len(chunks)} chunks") |
|
|
|
|
|
if progress_callback: |
|
await progress_callback(None, document_id, "embedding", 0.6, "Creating embeddings") |
|
|
|
|
|
if not self.google_api_key: |
|
raise ValueError("Google API key not found in environment variables") |
|
|
|
genai.configure(api_key=self.google_api_key) |
|
|
|
|
|
logger.info(f"[{self.correlation_id}] Checking Pinecone index dimensions") |
|
if not self.pinecone_index: |
|
self.pinecone_index = PineconeConnectionManager.get_index(self.api_key, self.index_name) |
|
|
|
stats = self.pinecone_index.describe_index_stats() |
|
pinecone_dimension = stats.dimension |
|
logger.info(f"[{self.correlation_id}] Pinecone index dimension: {pinecone_dimension}") |
|
|
|
|
|
embedding_model = GoogleGenerativeAIEmbeddings( |
|
model="models/embedding-001", |
|
google_api_key=self.google_api_key, |
|
task_type="retrieval_document" |
|
) |
|
|
|
|
|
sample_embedding = embedding_model.embed_query("test") |
|
embedding_dimension = len(sample_embedding) |
|
|
|
logger.info(f"[{self.correlation_id}] Generated embeddings with dimension: {embedding_dimension}") |
|
|
|
|
|
if embedding_dimension != pinecone_dimension: |
|
logger.warning(f"[{self.correlation_id}] Embedding dimension mismatch: got {embedding_dimension}, need {pinecone_dimension}") |
|
|
|
if embedding_dimension < pinecone_dimension: |
|
|
|
|
|
logger.info(f"[{self.correlation_id}] Using duplication strategy to upscale from {embedding_dimension} to {pinecone_dimension}") |
|
|
|
if embedding_dimension * 2 == pinecone_dimension: |
|
|
|
def adjust_embedding(embedding): |
|
|
|
return [val for val in embedding for _ in range(2)] |
|
else: |
|
|
|
pad_size = pinecone_dimension - embedding_dimension |
|
def adjust_embedding(embedding): |
|
return embedding + [0.0] * pad_size |
|
else: |
|
|
|
logger.info(f"[{self.correlation_id}] Will truncate embeddings from {embedding_dimension} to {pinecone_dimension}") |
|
|
|
def adjust_embedding(embedding): |
|
return embedding[:pinecone_dimension] |
|
else: |
|
|
|
def adjust_embedding(embedding): |
|
return embedding |
|
|
|
|
|
batch_size = 10 |
|
vectors_to_upsert = [] |
|
|
|
for i in range(0, len(chunks), batch_size): |
|
batch = chunks[i:i+batch_size] |
|
|
|
|
|
texts = [chunk.page_content for chunk in batch] |
|
|
|
|
|
embeddings = embedding_model.embed_documents(texts) |
|
|
|
|
|
for j, (chunk, embedding) in enumerate(zip(batch, embeddings)): |
|
|
|
adjusted_embedding = adjust_embedding(embedding) |
|
|
|
|
|
if len(adjusted_embedding) != pinecone_dimension: |
|
raise ValueError(f"Dimension mismatch after adjustment: got {len(adjusted_embedding)}, expected {pinecone_dimension}") |
|
|
|
|
|
chunk_metadata = { |
|
"document_id": document_id, |
|
"page": chunk.metadata.get("page", 0), |
|
"chunk_id": f"{document_id}-chunk-{i+j}", |
|
"text": chunk.page_content[:1000], |
|
**metadata |
|
} |
|
|
|
|
|
vector = { |
|
"id": f"{document_id}-{i+j}", |
|
"values": adjusted_embedding, |
|
"metadata": chunk_metadata |
|
} |
|
|
|
vectors_to_upsert.append(vector) |
|
|
|
logger.info(f"[{self.correlation_id}] Processed batch {i//batch_size + 1}/{(len(chunks)-1)//batch_size + 1}") |
|
|
|
|
|
if progress_callback: |
|
await progress_callback(None, document_id, "storing", 0.8, f"Storing {len(vectors_to_upsert)} vectors in Pinecone") |
|
|
|
logger.info(f"[{self.correlation_id}] Upserting {len(vectors_to_upsert)} vectors to Pinecone index {self.index_name}, namespace {actual_namespace}") |
|
|
|
|
|
result = PineconeConnectionManager.upsert_vectors_with_validation( |
|
self.pinecone_index, |
|
vectors_to_upsert, |
|
namespace=actual_namespace |
|
) |
|
|
|
logger.info(f"[{self.correlation_id}] Successfully upserted {result.get('upserted_count', 0)} vectors to Pinecone") |
|
|
|
if progress_callback: |
|
await progress_callback(None, document_id, "embedding_complete", 1.0, "Processing completed") |
|
|
|
|
|
return { |
|
"success": True, |
|
"document_id": document_id, |
|
"chunks_processed": len(chunks), |
|
"total_text_length": total_text_length, |
|
"vectors_created": len(vectors_to_upsert), |
|
"vectors_upserted": result.get('upserted_count', 0), |
|
"message": "PDF processed successfully" |
|
} |
|
except Exception as e: |
|
logger.error(f"[{self.correlation_id}] Error processing PDF: {str(e)}") |
|
return { |
|
"success": False, |
|
"error": f"Error processing PDF: {str(e)}" |
|
} |
|
|
|
async def list_namespaces(self): |
|
"""List all namespaces in the Pinecone index""" |
|
try: |
|
if not self.pinecone_index: |
|
self.pinecone_index = PineconeConnectionManager.get_index(self.api_key, self.index_name) |
|
|
|
|
|
stats = self.pinecone_index.describe_index_stats() |
|
namespaces = list(stats.get("namespaces", {}).keys()) |
|
|
|
return { |
|
"success": True, |
|
"namespaces": namespaces |
|
} |
|
except Exception as e: |
|
logger.error(f"[{self.correlation_id}] Error listing namespaces: {str(e)}") |
|
return { |
|
"success": False, |
|
"error": f"Error listing namespaces: {str(e)}" |
|
} |
|
|
|
async def delete_namespace(self): |
|
"""Delete all vectors in a namespace""" |
|
try: |
|
if not self.pinecone_index: |
|
self.pinecone_index = PineconeConnectionManager.get_index(self.api_key, self.index_name) |
|
|
|
logger.info(f"[{self.correlation_id}] Deleting namespace '{self.namespace}' from index '{self.index_name}'") |
|
|
|
|
|
stats = self.pinecone_index.describe_index_stats() |
|
namespaces = stats.get("namespaces", {}) |
|
|
|
if self.namespace in namespaces: |
|
vector_count = namespaces[self.namespace].get("vector_count", 0) |
|
|
|
self.pinecone_index.delete(delete_all=True, namespace=self.namespace) |
|
return { |
|
"success": True, |
|
"namespace": self.namespace, |
|
"deleted_count": vector_count, |
|
"message": f"Successfully deleted namespace '{self.namespace}' with {vector_count} vectors" |
|
} |
|
else: |
|
return { |
|
"success": True, |
|
"namespace": self.namespace, |
|
"deleted_count": 0, |
|
"message": f"Namespace '{self.namespace}' does not exist - nothing to delete" |
|
} |
|
except Exception as e: |
|
logger.error(f"[{self.correlation_id}] Error deleting namespace: {str(e)}") |
|
return { |
|
"success": False, |
|
"namespace": self.namespace, |
|
"error": f"Error deleting namespace: {str(e)}" |
|
} |
|
|
|
async def delete_document(self, document_id, additional_metadata=None): |
|
"""Delete vectors associated with a specific document ID or name""" |
|
logger.info(f"[{self.correlation_id}] Deleting vectors for document '{document_id}' from namespace '{self.namespace}'") |
|
|
|
try: |
|
if not self.pinecone_index: |
|
self.pinecone_index = PineconeConnectionManager.get_index(self.api_key, self.index_name) |
|
|
|
|
|
|
|
actual_namespace = f"vdb-{self.vector_db_id}" if self.vector_db_id else self.namespace |
|
|
|
|
|
filters = [] |
|
|
|
|
|
filters.append({"document_id": document_id}) |
|
|
|
|
|
if len(document_id) >= 32: |
|
|
|
if "-" in document_id: |
|
|
|
filters.append({"document_id": document_id.replace("-", "")}) |
|
else: |
|
|
|
try: |
|
formatted_uuid = str(uuid.UUID(document_id)) |
|
filters.append({"document_id": formatted_uuid}) |
|
except ValueError: |
|
pass |
|
|
|
|
|
if not document_id.startswith("doc-") and not document_id.startswith("test-doc-") and len(document_id) < 36: |
|
|
|
filters.append({"title": document_id}) |
|
|
|
|
|
if additional_metadata: |
|
if "document_name" in additional_metadata: |
|
|
|
filters.append({"title": additional_metadata["document_name"]}) |
|
|
|
|
|
if "." in additional_metadata["document_name"]: |
|
filters.append({"filename": additional_metadata["document_name"]}) |
|
|
|
|
|
found_vectors = False |
|
deleted_count = 0 |
|
filter_used = "" |
|
|
|
logger.info(f"[{self.correlation_id}] Will try {len(filters)} different filters to find document") |
|
|
|
for i, filter_query in enumerate(filters): |
|
logger.info(f"[{self.correlation_id}] Searching for vectors with filter #{i+1}: {filter_query}") |
|
|
|
|
|
try: |
|
results = self.pinecone_index.query( |
|
vector=[0] * 1536, |
|
top_k=1, |
|
include_metadata=True, |
|
filter=filter_query, |
|
namespace=actual_namespace |
|
) |
|
|
|
if results and results.get("matches") and len(results.get("matches", [])) > 0: |
|
logger.info(f"[{self.correlation_id}] Found vectors matching filter: {filter_query}") |
|
found_vectors = True |
|
filter_used = str(filter_query) |
|
|
|
|
|
delete_result = self.pinecone_index.delete( |
|
filter=filter_query, |
|
namespace=actual_namespace |
|
) |
|
|
|
|
|
deleted_count = delete_result.get("deleted_count", 0) |
|
logger.info(f"[{self.correlation_id}] Deleted {deleted_count} vectors with filter: {filter_query}") |
|
break |
|
except Exception as filter_error: |
|
logger.warning(f"[{self.correlation_id}] Error searching with filter {filter_query}: {str(filter_error)}") |
|
continue |
|
|
|
|
|
if not found_vectors: |
|
logger.warning(f"[{self.correlation_id}] No vectors found for document '{document_id}' in namespace '{actual_namespace}'") |
|
return { |
|
"success": True, |
|
"document_id": document_id, |
|
"namespace": actual_namespace, |
|
"deleted_count": 0, |
|
"warning": f"No vectors found for document '{document_id}' in namespace '{actual_namespace}'", |
|
"message": f"Found 0 vectors for document '{document_id}' in namespace '{actual_namespace}'", |
|
"vectors_found": False, |
|
"vectors_deleted": 0 |
|
} |
|
|
|
return { |
|
"success": True, |
|
"document_id": document_id, |
|
"namespace": actual_namespace, |
|
"deleted_count": deleted_count, |
|
"filter_used": filter_used, |
|
"message": f"Successfully deleted {deleted_count} vectors for document '{document_id}' from namespace '{actual_namespace}'", |
|
"vectors_found": True, |
|
"vectors_deleted": deleted_count |
|
} |
|
except Exception as e: |
|
logger.error(f"[{self.correlation_id}] Error deleting document vectors: {str(e)}") |
|
return { |
|
"success": False, |
|
"document_id": document_id, |
|
"error": f"Error deleting document vectors: {str(e)}", |
|
"vectors_found": False, |
|
"vectors_deleted": 0 |
|
} |
|
|
|
async def list_documents(self): |
|
"""List all documents in a namespace""" |
|
|
|
actual_namespace = f"vdb-{self.vector_db_id}" if self.vector_db_id else self.namespace |
|
|
|
try: |
|
if not self.pinecone_index: |
|
self.pinecone_index = PineconeConnectionManager.get_index(self.api_key, self.index_name) |
|
|
|
logger.info(f"[{self.correlation_id}] Listing documents in namespace '{actual_namespace}'") |
|
|
|
|
|
stats = self.pinecone_index.describe_index_stats() |
|
namespace_stats = stats.get("namespaces", {}).get(actual_namespace, {}) |
|
vector_count = namespace_stats.get("vector_count", 0) |
|
|
|
if vector_count == 0: |
|
|
|
return DocumentsListResponse( |
|
success=True, |
|
total_vectors=0, |
|
namespace=actual_namespace, |
|
index_name=self.index_name, |
|
documents=[] |
|
).dict() |
|
|
|
|
|
|
|
results = self.pinecone_index.query( |
|
vector=[0] * stats.dimension, |
|
top_k=min(vector_count, 1000), |
|
include_metadata=True, |
|
namespace=actual_namespace |
|
) |
|
|
|
|
|
seen_documents = set() |
|
documents = [] |
|
|
|
for match in results.get("matches", []): |
|
metadata = match.get("metadata", {}) |
|
document_id = metadata.get("document_id") |
|
|
|
if document_id and document_id not in seen_documents: |
|
seen_documents.add(document_id) |
|
doc_info = { |
|
"id": document_id, |
|
"title": metadata.get("title"), |
|
"filename": metadata.get("filename"), |
|
"content_type": metadata.get("content_type"), |
|
"chunk_count": 0 |
|
} |
|
documents.append(doc_info) |
|
|
|
|
|
for doc in documents: |
|
if doc["id"] == document_id: |
|
doc["chunk_count"] += 1 |
|
break |
|
|
|
return DocumentsListResponse( |
|
success=True, |
|
total_vectors=vector_count, |
|
namespace=actual_namespace, |
|
index_name=self.index_name, |
|
documents=documents |
|
).dict() |
|
|
|
except Exception as e: |
|
logger.error(f"[{self.correlation_id}] Error listing documents: {str(e)}") |
|
return DocumentsListResponse( |
|
success=False, |
|
error=f"Error listing documents: {str(e)}" |
|
).dict() |
|
|
|
|
|
|
|
|
|
|