import os import logging import uuid import pinecone from app.utils.pinecone_fix import PineconeConnectionManager, check_connection import time from typing import List, Dict, Any, Optional # Langchain imports for document processing from langchain_community.document_loaders import PyPDFLoader from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_google_genai import GoogleGenerativeAIEmbeddings import google.generativeai as genai # Configure logger logger = logging.getLogger(__name__) class PDFProcessor: """Process PDF files and create embeddings in Pinecone""" def __init__(self, index_name="testbot768", namespace="Default", api_key=None, vector_db_id=None, mock_mode=False, correlation_id=None): self.index_name = index_name self.namespace = namespace self.api_key = api_key self.vector_db_id = vector_db_id self.pinecone_index = None self.mock_mode = False # Always set mock_mode to False to use real database self.correlation_id = correlation_id or str(uuid.uuid4())[:8] self.google_api_key = os.environ.get("GOOGLE_API_KEY") # Initialize Pinecone connection if self.api_key: try: # Use connection manager from pinecone_fix logger.info(f"[{self.correlation_id}] Initializing Pinecone connection to {self.index_name}") self.pinecone_index = PineconeConnectionManager.get_index(self.api_key, self.index_name) logger.info(f"[{self.correlation_id}] Successfully connected to Pinecone index {self.index_name}") except Exception as e: logger.error(f"[{self.correlation_id}] Failed to initialize Pinecone: {str(e)}") # No fallback to mock mode - require a valid connection async def process_pdf(self, file_path, document_id=None, metadata=None, progress_callback=None): """Process a PDF file and create vector embeddings This method: 1. Extracts text from PDF using PyPDFLoader 2. Splits text into chunks using RecursiveCharacterTextSplitter 3. Creates embeddings using Google Gemini model 4. Stores embeddings in Pinecone """ logger.info(f"[{self.correlation_id}] Processing PDF: {file_path}") try: # Initialize metadata if not provided if metadata is None: metadata = {} # Ensure document_id is included if document_id is None: document_id = str(uuid.uuid4()) # Add document_id to metadata metadata["document_id"] = document_id # The namespace to use might be in vdb-X format if vector_db_id provided actual_namespace = f"vdb-{self.vector_db_id}" if self.vector_db_id else self.namespace # 1. Extract text from PDF logger.info(f"[{self.correlation_id}] Extracting text from PDF: {file_path}") if progress_callback: await progress_callback(None, document_id, "text_extraction", 0.2, "Extracting text from PDF") loader = PyPDFLoader(file_path) documents = loader.load() total_text_length = sum(len(doc.page_content) for doc in documents) logger.info(f"[{self.correlation_id}] Extracted {len(documents)} pages, total text length: {total_text_length}") # 2. Split text into chunks if progress_callback: await progress_callback(None, document_id, "chunking", 0.4, "Splitting text into chunks") text_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=100, length_function=len, separators=["\n\n", "\n", " ", ""] ) chunks = text_splitter.split_documents(documents) logger.info(f"[{self.correlation_id}] Split into {len(chunks)} chunks") # 3. Create embeddings if progress_callback: await progress_callback(None, document_id, "embedding", 0.6, "Creating embeddings") # Initialize Google Gemini for embeddings if not self.google_api_key: raise ValueError("Google API key not found in environment variables") genai.configure(api_key=self.google_api_key) # First, get the expected dimensions from Pinecone logger.info(f"[{self.correlation_id}] Checking Pinecone index dimensions") if not self.pinecone_index: self.pinecone_index = PineconeConnectionManager.get_index(self.api_key, self.index_name) stats = self.pinecone_index.describe_index_stats() pinecone_dimension = stats.dimension logger.info(f"[{self.correlation_id}] Pinecone index dimension: {pinecone_dimension}") # Create embedding model embedding_model = GoogleGenerativeAIEmbeddings( model="models/embedding-001", google_api_key=self.google_api_key, task_type="retrieval_document" # Use document embedding mode for longer text ) # Get a sample embedding to check dimensions sample_embedding = embedding_model.embed_query("test") embedding_dimension = len(sample_embedding) logger.info(f"[{self.correlation_id}] Generated embeddings with dimension: {embedding_dimension}") # Dimension handling - if mismatch, we handle it appropriately if embedding_dimension != pinecone_dimension: logger.warning(f"[{self.correlation_id}] Embedding dimension mismatch: got {embedding_dimension}, need {pinecone_dimension}") if embedding_dimension < pinecone_dimension: # For upscaling from 768 to 1536: duplicate each value and scale appropriately # This is one approach to handle dimension mismatches while preserving semantic information logger.info(f"[{self.correlation_id}] Using duplication strategy to upscale from {embedding_dimension} to {pinecone_dimension}") if embedding_dimension * 2 == pinecone_dimension: # Perfect doubling (768 -> 1536) def adjust_embedding(embedding): # Duplicate each value to double the dimension return [val for val in embedding for _ in range(2)] else: # Generic padding with zeros pad_size = pinecone_dimension - embedding_dimension def adjust_embedding(embedding): return embedding + [0.0] * pad_size else: # Truncation strategy - take first pinecone_dimension values logger.info(f"[{self.correlation_id}] Will truncate embeddings from {embedding_dimension} to {pinecone_dimension}") def adjust_embedding(embedding): return embedding[:pinecone_dimension] else: # No adjustment needed def adjust_embedding(embedding): return embedding # Process in batches to avoid memory issues batch_size = 10 vectors_to_upsert = [] for i in range(0, len(chunks), batch_size): batch = chunks[i:i+batch_size] # Extract text content texts = [chunk.page_content for chunk in batch] # Create embeddings for batch embeddings = embedding_model.embed_documents(texts) # Prepare vectors for Pinecone for j, (chunk, embedding) in enumerate(zip(batch, embeddings)): # Adjust embedding dimensions if needed adjusted_embedding = adjust_embedding(embedding) # Verify dimensions are correct if len(adjusted_embedding) != pinecone_dimension: raise ValueError(f"Dimension mismatch after adjustment: got {len(adjusted_embedding)}, expected {pinecone_dimension}") # Create metadata for this chunk chunk_metadata = { "document_id": document_id, "page": chunk.metadata.get("page", 0), "chunk_id": f"{document_id}-chunk-{i+j}", "text": chunk.page_content[:1000], # Store first 1000 chars of text **metadata # Include original metadata } # Create vector record vector = { "id": f"{document_id}-{i+j}", "values": adjusted_embedding, "metadata": chunk_metadata } vectors_to_upsert.append(vector) logger.info(f"[{self.correlation_id}] Processed batch {i//batch_size + 1}/{(len(chunks)-1)//batch_size + 1}") # 4. Store embeddings in Pinecone if progress_callback: await progress_callback(None, document_id, "storing", 0.8, f"Storing {len(vectors_to_upsert)} vectors in Pinecone") logger.info(f"[{self.correlation_id}] Upserting {len(vectors_to_upsert)} vectors to Pinecone index {self.index_name}, namespace {actual_namespace}") # Use PineconeConnectionManager for better error handling result = PineconeConnectionManager.upsert_vectors_with_validation( self.pinecone_index, vectors_to_upsert, namespace=actual_namespace ) logger.info(f"[{self.correlation_id}] Successfully upserted {result.get('upserted_count', 0)} vectors to Pinecone") if progress_callback: await progress_callback(None, document_id, "embedding_complete", 1.0, "Processing completed") # Return success with stats return { "success": True, "document_id": document_id, "chunks_processed": len(chunks), "total_text_length": total_text_length, "vectors_created": len(vectors_to_upsert), "vectors_upserted": result.get('upserted_count', 0), "message": "PDF processed successfully" } except Exception as e: logger.error(f"[{self.correlation_id}] Error processing PDF: {str(e)}") return { "success": False, "error": f"Error processing PDF: {str(e)}" } async def list_namespaces(self): """List all namespaces in the Pinecone index""" try: if not self.pinecone_index: self.pinecone_index = PineconeConnectionManager.get_index(self.api_key, self.index_name) # Get index stats which includes namespaces stats = self.pinecone_index.describe_index_stats() namespaces = list(stats.get("namespaces", {}).keys()) return { "success": True, "namespaces": namespaces } except Exception as e: logger.error(f"[{self.correlation_id}] Error listing namespaces: {str(e)}") return { "success": False, "error": f"Error listing namespaces: {str(e)}" } async def delete_namespace(self): """Delete all vectors in a namespace""" try: if not self.pinecone_index: self.pinecone_index = PineconeConnectionManager.get_index(self.api_key, self.index_name) logger.info(f"[{self.correlation_id}] Deleting namespace '{self.namespace}' from index '{self.index_name}'") # Check if namespace exists stats = self.pinecone_index.describe_index_stats() namespaces = stats.get("namespaces", {}) if self.namespace in namespaces: vector_count = namespaces[self.namespace].get("vector_count", 0) # Delete all vectors in namespace self.pinecone_index.delete(delete_all=True, namespace=self.namespace) return { "success": True, "namespace": self.namespace, "deleted_count": vector_count, "message": f"Successfully deleted namespace '{self.namespace}' with {vector_count} vectors" } else: return { "success": True, "namespace": self.namespace, "deleted_count": 0, "message": f"Namespace '{self.namespace}' does not exist - nothing to delete" } except Exception as e: logger.error(f"[{self.correlation_id}] Error deleting namespace: {str(e)}") return { "success": False, "namespace": self.namespace, "error": f"Error deleting namespace: {str(e)}" } async def delete_document(self, document_id, additional_metadata=None): """Delete vectors associated with a specific document ID or name""" logger.info(f"[{self.correlation_id}] Deleting vectors for document '{document_id}' from namespace '{self.namespace}'") try: if not self.pinecone_index: self.pinecone_index = PineconeConnectionManager.get_index(self.api_key, self.index_name) # Use metadata filtering to find vectors with matching document_id # The specific namespace to use might be vdb-X format if vector_db_id provided actual_namespace = f"vdb-{self.vector_db_id}" if self.vector_db_id else self.namespace # Try to find vectors using multiple approaches filters = [] # First try with exact document_id which could be UUID (preferred) filters.append({"document_id": document_id}) # If this is a UUID, try with different formats (with/without hyphens) if len(document_id) >= 32: # This looks like it might be a UUID - try variations if "-" in document_id: # If it has hyphens, try without filters.append({"document_id": document_id.replace("-", "")}) else: # If it doesn't have hyphens, try to format it as UUID try: formatted_uuid = str(uuid.UUID(document_id)) filters.append({"document_id": formatted_uuid}) except ValueError: pass # Also try with title field if it could be a document name if not document_id.startswith("doc-") and not document_id.startswith("test-doc-") and len(document_id) < 36: # This might be a document title/name filters.append({"title": document_id}) # If additional metadata was provided, use it to make extra filters if additional_metadata: if "document_name" in additional_metadata: # Try exact name match filters.append({"title": additional_metadata["document_name"]}) # Also try filename if name has extension if "." in additional_metadata["document_name"]: filters.append({"filename": additional_metadata["document_name"]}) # Search for vectors with any of these filters found_vectors = False deleted_count = 0 filter_used = "" logger.info(f"[{self.correlation_id}] Will try {len(filters)} different filters to find document") for i, filter_query in enumerate(filters): logger.info(f"[{self.correlation_id}] Searching for vectors with filter #{i+1}: {filter_query}") # Search for vectors with this filter try: results = self.pinecone_index.query( vector=[0] * 1536, # Dummy vector, we only care about metadata filter top_k=1, include_metadata=True, filter=filter_query, namespace=actual_namespace ) if results and results.get("matches") and len(results.get("matches", [])) > 0: logger.info(f"[{self.correlation_id}] Found vectors matching filter: {filter_query}") found_vectors = True filter_used = str(filter_query) # Delete vectors by filter delete_result = self.pinecone_index.delete( filter=filter_query, namespace=actual_namespace ) # Get delete count from result deleted_count = delete_result.get("deleted_count", 0) logger.info(f"[{self.correlation_id}] Deleted {deleted_count} vectors with filter: {filter_query}") break except Exception as filter_error: logger.warning(f"[{self.correlation_id}] Error searching with filter {filter_query}: {str(filter_error)}") continue # If no vectors found with any filter if not found_vectors: logger.warning(f"[{self.correlation_id}] No vectors found for document '{document_id}' in namespace '{actual_namespace}'") return { "success": True, # Still return success=True to maintain backward compatibility "document_id": document_id, "namespace": actual_namespace, "deleted_count": 0, "warning": f"No vectors found for document '{document_id}' in namespace '{actual_namespace}'", "message": f"Found 0 vectors for document '{document_id}' in namespace '{actual_namespace}'", "vectors_found": False, "vectors_deleted": 0 } return { "success": True, "document_id": document_id, "namespace": actual_namespace, "deleted_count": deleted_count, "filter_used": filter_used, "message": f"Successfully deleted {deleted_count} vectors for document '{document_id}' from namespace '{actual_namespace}'", "vectors_found": True, "vectors_deleted": deleted_count } except Exception as e: logger.error(f"[{self.correlation_id}] Error deleting document vectors: {str(e)}") return { "success": False, "document_id": document_id, "error": f"Error deleting document vectors: {str(e)}", "vectors_found": False, "vectors_deleted": 0 } async def list_documents(self): """List all documents in a namespace""" # The namespace to use might be vdb-X format if vector_db_id provided actual_namespace = f"vdb-{self.vector_db_id}" if self.vector_db_id else self.namespace try: if not self.pinecone_index: self.pinecone_index = PineconeConnectionManager.get_index(self.api_key, self.index_name) logger.info(f"[{self.correlation_id}] Listing documents in namespace '{actual_namespace}'") # Get index stats for namespace stats = self.pinecone_index.describe_index_stats() namespace_stats = stats.get("namespaces", {}).get(actual_namespace, {}) vector_count = namespace_stats.get("vector_count", 0) if vector_count == 0: # No vectors in namespace return DocumentsListResponse( success=True, total_vectors=0, namespace=actual_namespace, index_name=self.index_name, documents=[] ).dict() # Query for vectors with a dummy vector to get back metadata # This is not efficient but is a simple approach to extract document info results = self.pinecone_index.query( vector=[0] * stats.dimension, # Use index dimensions top_k=min(vector_count, 1000), # Get at most 1000 vectors include_metadata=True, namespace=actual_namespace ) # Process results to extract unique documents seen_documents = set() documents = [] for match in results.get("matches", []): metadata = match.get("metadata", {}) document_id = metadata.get("document_id") if document_id and document_id not in seen_documents: seen_documents.add(document_id) doc_info = { "id": document_id, "title": metadata.get("title"), "filename": metadata.get("filename"), "content_type": metadata.get("content_type"), "chunk_count": 0 } documents.append(doc_info) # Count chunks for this document for doc in documents: if doc["id"] == document_id: doc["chunk_count"] += 1 break return DocumentsListResponse( success=True, total_vectors=vector_count, namespace=actual_namespace, index_name=self.index_name, documents=documents ).dict() except Exception as e: logger.error(f"[{self.correlation_id}] Error listing documents: {str(e)}") return DocumentsListResponse( success=False, error=f"Error listing documents: {str(e)}" ).dict()