biswanath2.roul
Initial commit
e4d5155
"""
Semantic deduplication for compressing context content.
"""
import logging
from typing import List, Optional, Tuple, Dict, Any
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from efficient_context.compression.base import BaseCompressor
from efficient_context.utils.text import split_into_sentences, get_sentence_importance
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class SemanticDeduplicator(BaseCompressor):
"""
Compressor that removes semantically duplicate or redundant content.
This compressor identifies and removes sentences that are semantically
similar to others in the content, keeping only the most representative ones.
It's designed to be CPU-friendly and memory-efficient.
"""
def __init__(
self,
threshold: float = 0.85,
embedding_model: str = "lightweight",
min_sentence_length: int = 10,
importance_weight: float = 0.3,
):
"""
Initialize the SemanticDeduplicator.
Args:
threshold: Similarity threshold for considering content duplicated (0.0 to 1.0)
embedding_model: The model to use for generating embeddings
min_sentence_length: Minimum length of sentences to consider
importance_weight: Weight given to sentence importance vs. deduplication
"""
self.threshold = threshold
self.embedding_model = embedding_model
self.min_sentence_length = min_sentence_length
self.importance_weight = importance_weight
# Initialize the embedding model
self._init_embedding_model()
logger.info("SemanticDeduplicator initialized with threshold: %.2f", threshold)
def _init_embedding_model(self):
"""Initialize the embedding model based on the selected type."""
try:
from sentence_transformers import SentenceTransformer
# Choose a lightweight model for CPU efficiency
if self.embedding_model == "lightweight":
# MiniLM models are lightweight and efficient
self.model = SentenceTransformer('paraphrase-MiniLM-L3-v2')
else:
# Default to a balanced model
self.model = SentenceTransformer(self.embedding_model)
logger.info("Using embedding model: %s", self.model.get_sentence_embedding_dimension())
except ImportError:
logger.warning("SentenceTransformer not available, using numpy fallback (less accurate)")
self.model = None
def _get_embeddings(self, sentences: List[str]) -> np.ndarray:
"""
Get embeddings for a list of sentences.
Args:
sentences: List of sentences to embed
Returns:
embeddings: Array of sentence embeddings
"""
if not sentences:
return np.array([])
if self.model is not None:
# Use the sentence transformer if available
return self.model.encode(sentences, show_progress_bar=False)
else:
# Fallback to a simple Bag-of-Words approach
# This is much less accurate but works without dependencies
from sklearn.feature_extraction.text import TfidfVectorizer
vectorizer = TfidfVectorizer(max_features=5000)
return vectorizer.fit_transform(sentences).toarray()
def _compute_similarity_matrix(self, embeddings: np.ndarray) -> np.ndarray:
"""
Compute pairwise similarity between embeddings.
Args:
embeddings: Array of sentence embeddings
Returns:
similarity_matrix: Matrix of pairwise similarities
"""
# Return empty array for empty input
if embeddings.shape[0] == 0:
return np.array([])
# Compute cosine similarity
return cosine_similarity(embeddings)
def _deduplicate_sentences(
self,
sentences: List[str],
importances: Optional[List[float]] = None
) -> List[int]:
"""
Identify non-redundant sentence indices.
Args:
sentences: List of sentences to deduplicate
importances: Optional list of importance scores
Returns:
kept_indices: Indices of sentences to keep
"""
if not sentences:
return []
# Filter out sentences that are too short
valid_indices = [i for i, s in enumerate(sentences) if len(s.split()) >= self.min_sentence_length]
if not valid_indices:
# If no sentences meet the min length, return all indices
return list(range(len(sentences)))
# Get embeddings for valid sentences
valid_sentences = [sentences[i] for i in valid_indices]
embeddings = self._get_embeddings(valid_sentences)
# Compute pairwise similarity
similarity_matrix = self._compute_similarity_matrix(embeddings)
# Set diagonal to 0 to avoid self-similarity
np.fill_diagonal(similarity_matrix, 0)
# Determine which sentences to keep
kept_indices = []
remaining_indices = set(range(len(valid_indices)))
# If importances are provided, start with most important sentences
if importances is not None:
valid_importances = [importances[i] for i in valid_indices]
ordered_indices = [i for i, _ in sorted(
enumerate(valid_importances),
key=lambda x: x[1],
reverse=True
)]
else:
# Otherwise, use sentence length as a simple importance proxy
ordered_indices = [i for i, _ in sorted(
enumerate(valid_sentences),
key=lambda x: len(x[1].split()),
reverse=True
)]
# Process sentences in order of importance
for idx in ordered_indices:
if idx not in remaining_indices:
continue
# Keep this sentence
kept_indices.append(valid_indices[idx])
remaining_indices.remove(idx)
# Remove similar sentences
similar_indices = [
i for i in remaining_indices
if similarity_matrix[idx, i] > self.threshold
]
remaining_indices -= set(similar_indices)
# Break if we've processed all indices
if not remaining_indices:
break
# Add any remaining short sentences we skipped earlier
short_indices = [i for i, s in enumerate(sentences) if len(s.split()) < self.min_sentence_length]
kept_indices.extend(short_indices)
# Sort to maintain original order
return sorted(kept_indices)
def compress(self, content: str, target_size: Optional[int] = None) -> str:
"""
Compress content by removing semantic duplicates.
Args:
content: The content to compress
target_size: Optional target size in tokens
Returns:
compressed_content: The compressed content
"""
# Split content into sentences
sentences = split_into_sentences(content)
if not sentences:
return content
# Get sentence importance scores
importances = get_sentence_importance(sentences)
# Deduplicate sentences
kept_indices = self._deduplicate_sentences(sentences, importances)
# Combine kept sentences
kept_sentences = [sentences[i] for i in kept_indices]
compressed = " ".join(kept_sentences)
# If we need to compress further to meet target size
if target_size and len(compressed.split()) > target_size:
# Calculate how many more sentences to remove
current_size = len(compressed.split())
reduction_needed = current_size - target_size
# Sort sentences by importance (lowest first)
sentence_priorities = [(i, importances[i]) for i in kept_indices]
sorted_priorities = sorted(sentence_priorities, key=lambda x: x[1])
# Remove least important sentences until we meet target size
remove_count = 0
tokens_removed = 0
indices_to_remove = []
for idx, _ in sorted_priorities:
sentence_tokens = len(sentences[idx].split())
tokens_removed += sentence_tokens
remove_count += 1
indices_to_remove.append(idx)
if tokens_removed >= reduction_needed:
break
# Remove the low-importance sentences
final_indices = [i for i in kept_indices if i not in indices_to_remove]
# Recombine
compressed = " ".join(sentences[i] for i in sorted(final_indices))
# Log compression stats
original_tokens = len(content.split())
compressed_tokens = len(compressed.split())
reduction = (1 - compressed_tokens / original_tokens) * 100 if original_tokens > 0 else 0
logger.info(
"Compressed from %d to %d tokens (%.1f%% reduction)",
original_tokens, compressed_tokens, reduction
)
return compressed