File size: 9,353 Bytes
e4d5155 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 |
"""
CPU-optimized retrieval for efficient context handling.
"""
import logging
import heapq
from typing import List, Dict, Any, Optional, Tuple, Union
import numpy as np
from efficient_context.retrieval.base import BaseRetriever
from efficient_context.chunking.base import Chunk
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class CPUOptimizedRetriever(BaseRetriever):
"""
Retriever optimized for CPU performance and low memory usage.
This retriever uses techniques to minimize computational requirements
while still providing high-quality retrieval results.
"""
def __init__(
self,
embedding_model: str = "lightweight",
similarity_metric: str = "cosine",
use_batching: bool = True,
batch_size: int = 32,
max_index_size: Optional[int] = None,
):
"""
Initialize the CPUOptimizedRetriever.
Args:
embedding_model: Model to use for embeddings
similarity_metric: Metric for comparing embeddings
use_batching: Whether to batch embedding operations
batch_size: Size of batches for embedding
max_index_size: Maximum number of chunks to keep in the index
"""
self.embedding_model = embedding_model
self.similarity_metric = similarity_metric
self.use_batching = use_batching
self.batch_size = batch_size
self.max_index_size = max_index_size
# Initialize storage
self.chunks = []
self.chunk_embeddings = None
self.chunk_ids_to_index = {}
# Initialize the embedding model
self._init_embedding_model()
logger.info("CPUOptimizedRetriever initialized with model: %s", embedding_model)
def _init_embedding_model(self):
"""Initialize the embedding model."""
try:
from sentence_transformers import SentenceTransformer
# Choose a lightweight model for CPU efficiency
if self.embedding_model == "lightweight":
# MiniLM models are lightweight and efficient
self.model = SentenceTransformer('paraphrase-MiniLM-L3-v2')
else:
# Default to a balanced model
self.model = SentenceTransformer(self.embedding_model)
logger.info("Using embedding model: %s", self.model.get_sentence_embedding_dimension())
except ImportError:
logger.warning("SentenceTransformer not available, using numpy fallback (less accurate)")
self.model = None
def _get_embeddings(self, texts: List[str]) -> np.ndarray:
"""
Get embeddings for a list of texts.
Args:
texts: List of texts to embed
Returns:
embeddings: Array of text embeddings
"""
if not texts:
return np.array([])
if self.model is not None:
# Use the sentence transformer if available
# Apply batching for memory efficiency
if self.use_batching and len(texts) > self.batch_size:
embeddings = []
for i in range(0, len(texts), self.batch_size):
batch = texts[i:i+self.batch_size]
batch_embeddings = self.model.encode(
batch,
show_progress_bar=False,
convert_to_numpy=True
)
embeddings.append(batch_embeddings)
return np.vstack(embeddings)
else:
return self.model.encode(texts, show_progress_bar=False)
else:
# Fallback to a simple Bag-of-Words approach
from sklearn.feature_extraction.text import TfidfVectorizer
vectorizer = TfidfVectorizer(max_features=5000)
return vectorizer.fit_transform(texts).toarray()
def _compute_similarities(self, query_embedding: np.ndarray, chunk_embeddings: np.ndarray) -> np.ndarray:
"""
Compute similarities between query and chunk embeddings.
Args:
query_embedding: Embedding of the query
chunk_embeddings: Embeddings of the chunks
Returns:
similarities: Array of similarity scores
"""
if self.similarity_metric == "cosine":
# Normalize the embeddings for cosine similarity
query_norm = np.linalg.norm(query_embedding)
if query_norm > 0:
query_embedding = query_embedding / query_norm
# Compute cosine similarity efficiently
return np.dot(chunk_embeddings, query_embedding)
elif self.similarity_metric == "dot":
# Simple dot product
return np.dot(chunk_embeddings, query_embedding)
elif self.similarity_metric == "euclidean":
# Negative Euclidean distance (higher is more similar)
return -np.sqrt(np.sum((chunk_embeddings - query_embedding) ** 2, axis=1))
else:
# Default to cosine
return np.dot(chunk_embeddings, query_embedding)
def index_chunks(self, chunks: List[Chunk]) -> None:
"""
Index chunks for future retrieval.
Args:
chunks: Chunks to index
"""
if not chunks:
return
# Add new chunks
for chunk in chunks:
# Skip if chunk is already indexed
if chunk.chunk_id in self.chunk_ids_to_index:
continue
self.chunks.append(chunk)
self.chunk_ids_to_index[chunk.chunk_id] = len(self.chunks) - 1
# Get embeddings for all chunks
chunk_texts = [chunk.content for chunk in self.chunks]
self.chunk_embeddings = self._get_embeddings(chunk_texts)
# Apply dimensionality reduction if needed for memory efficiency
if (self.max_index_size is not None and
len(self.chunks) > self.max_index_size and
self.model is not None):
# Keep only the most recent chunks
self.chunks = self.chunks[-self.max_index_size:]
# Update the index mapping
self.chunk_ids_to_index = {
chunk.chunk_id: i for i, chunk in enumerate(self.chunks)
}
# Recalculate embeddings for the pruned set
chunk_texts = [chunk.content for chunk in self.chunks]
self.chunk_embeddings = self._get_embeddings(chunk_texts)
# Normalize embeddings for cosine similarity
if self.similarity_metric == "cosine" and self.chunk_embeddings is not None:
# Compute norms of each embedding vector
norms = np.linalg.norm(self.chunk_embeddings, axis=1, keepdims=True)
# Avoid division by zero - normalize only where norm > 0
non_zero_norms = norms > 0
if np.any(non_zero_norms):
# Directly normalize by dividing by norms (with keepdims=True, broadcasting works correctly)
self.chunk_embeddings = np.where(
non_zero_norms,
self.chunk_embeddings / norms,
self.chunk_embeddings
)
logger.info("Indexed %d chunks (total: %d)", len(chunks), len(self.chunks))
def retrieve(self, query: str, top_k: Optional[int] = None) -> List[Chunk]:
"""
Retrieve chunks relevant to a query.
Args:
query: Query to retrieve chunks for
top_k: Number of chunks to retrieve (default: 5)
Returns:
chunks: List of retrieved chunks
"""
if not self.chunks:
logger.warning("No chunks indexed for retrieval")
return []
if not query:
logger.warning("Empty query provided")
return []
# Default top_k
top_k = top_k or 5
# Get query embedding
query_embedding = self._get_embeddings([query])[0]
# Compute similarities
similarities = self._compute_similarities(query_embedding, self.chunk_embeddings)
# Get indices of top-k most similar chunks
if top_k >= len(similarities):
top_indices = list(range(len(similarities)))
top_indices.sort(key=lambda i: similarities[i], reverse=True)
else:
# More efficient partial sort for large indices
top_indices = heapq.nlargest(top_k, range(len(similarities)), key=lambda i: similarities[i])
# Get the corresponding chunks
retrieved_chunks = [self.chunks[i] for i in top_indices]
logger.info("Retrieved %d chunks for query", len(retrieved_chunks))
return retrieved_chunks
def clear(self) -> None:
"""Clear all indexed chunks."""
self.chunks = []
self.chunk_embeddings = None
self.chunk_ids_to_index = {}
logger.info("Cleared chunk index")
|