File size: 9,786 Bytes
e4d5155
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
"""
Semantic deduplication for compressing context content.
"""

import logging
from typing import List, Optional, Tuple, Dict, Any

import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

from efficient_context.compression.base import BaseCompressor
from efficient_context.utils.text import split_into_sentences, get_sentence_importance

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class SemanticDeduplicator(BaseCompressor):
    """
    Compressor that removes semantically duplicate or redundant content.
    
    This compressor identifies and removes sentences that are semantically
    similar to others in the content, keeping only the most representative ones.
    It's designed to be CPU-friendly and memory-efficient.
    """
    
    def __init__(
        self,
        threshold: float = 0.85,
        embedding_model: str = "lightweight",
        min_sentence_length: int = 10,
        importance_weight: float = 0.3,
    ):
        """
        Initialize the SemanticDeduplicator.
        
        Args:
            threshold: Similarity threshold for considering content duplicated (0.0 to 1.0)
            embedding_model: The model to use for generating embeddings
            min_sentence_length: Minimum length of sentences to consider
            importance_weight: Weight given to sentence importance vs. deduplication
        """
        self.threshold = threshold
        self.embedding_model = embedding_model
        self.min_sentence_length = min_sentence_length
        self.importance_weight = importance_weight
        
        # Initialize the embedding model
        self._init_embedding_model()
        
        logger.info("SemanticDeduplicator initialized with threshold: %.2f", threshold)
    
    def _init_embedding_model(self):
        """Initialize the embedding model based on the selected type."""
        try:
            from sentence_transformers import SentenceTransformer
            
            # Choose a lightweight model for CPU efficiency
            if self.embedding_model == "lightweight":
                # MiniLM models are lightweight and efficient
                self.model = SentenceTransformer('paraphrase-MiniLM-L3-v2')
            else:
                # Default to a balanced model
                self.model = SentenceTransformer(self.embedding_model)
                
            logger.info("Using embedding model: %s", self.model.get_sentence_embedding_dimension())
        except ImportError:
            logger.warning("SentenceTransformer not available, using numpy fallback (less accurate)")
            self.model = None
    
    def _get_embeddings(self, sentences: List[str]) -> np.ndarray:
        """
        Get embeddings for a list of sentences.
        
        Args:
            sentences: List of sentences to embed
            
        Returns:
            embeddings: Array of sentence embeddings
        """
        if not sentences:
            return np.array([])
            
        if self.model is not None:
            # Use the sentence transformer if available
            return self.model.encode(sentences, show_progress_bar=False)
        else:
            # Fallback to a simple Bag-of-Words approach
            # This is much less accurate but works without dependencies
            from sklearn.feature_extraction.text import TfidfVectorizer
            vectorizer = TfidfVectorizer(max_features=5000)
            return vectorizer.fit_transform(sentences).toarray()
    
    def _compute_similarity_matrix(self, embeddings: np.ndarray) -> np.ndarray:
        """
        Compute pairwise similarity between embeddings.
        
        Args:
            embeddings: Array of sentence embeddings
            
        Returns:
            similarity_matrix: Matrix of pairwise similarities
        """
        # Return empty array for empty input
        if embeddings.shape[0] == 0:
            return np.array([])
            
        # Compute cosine similarity
        return cosine_similarity(embeddings)
    
    def _deduplicate_sentences(
        self, 
        sentences: List[str], 
        importances: Optional[List[float]] = None
    ) -> List[int]:
        """
        Identify non-redundant sentence indices.
        
        Args:
            sentences: List of sentences to deduplicate
            importances: Optional list of importance scores
            
        Returns:
            kept_indices: Indices of sentences to keep
        """
        if not sentences:
            return []
            
        # Filter out sentences that are too short
        valid_indices = [i for i, s in enumerate(sentences) if len(s.split()) >= self.min_sentence_length]
        
        if not valid_indices:
            # If no sentences meet the min length, return all indices
            return list(range(len(sentences)))
            
        # Get embeddings for valid sentences
        valid_sentences = [sentences[i] for i in valid_indices]
        embeddings = self._get_embeddings(valid_sentences)
        
        # Compute pairwise similarity
        similarity_matrix = self._compute_similarity_matrix(embeddings)
        
        # Set diagonal to 0 to avoid self-similarity
        np.fill_diagonal(similarity_matrix, 0)
        
        # Determine which sentences to keep
        kept_indices = []
        remaining_indices = set(range(len(valid_indices)))
        
        # If importances are provided, start with most important sentences
        if importances is not None:
            valid_importances = [importances[i] for i in valid_indices]
            ordered_indices = [i for i, _ in sorted(
                enumerate(valid_importances), 
                key=lambda x: x[1], 
                reverse=True
            )]
        else:
            # Otherwise, use sentence length as a simple importance proxy
            ordered_indices = [i for i, _ in sorted(
                enumerate(valid_sentences), 
                key=lambda x: len(x[1].split()), 
                reverse=True
            )]
        
        # Process sentences in order of importance
        for idx in ordered_indices:
            if idx not in remaining_indices:
                continue
                
            # Keep this sentence
            kept_indices.append(valid_indices[idx])
            remaining_indices.remove(idx)
            
            # Remove similar sentences
            similar_indices = [
                i for i in remaining_indices 
                if similarity_matrix[idx, i] > self.threshold
            ]
            
            remaining_indices -= set(similar_indices)
            
            # Break if we've processed all indices
            if not remaining_indices:
                break
        
        # Add any remaining short sentences we skipped earlier
        short_indices = [i for i, s in enumerate(sentences) if len(s.split()) < self.min_sentence_length]
        kept_indices.extend(short_indices)
        
        # Sort to maintain original order
        return sorted(kept_indices)
    
    def compress(self, content: str, target_size: Optional[int] = None) -> str:
        """
        Compress content by removing semantic duplicates.
        
        Args:
            content: The content to compress
            target_size: Optional target size in tokens
            
        Returns:
            compressed_content: The compressed content
        """
        # Split content into sentences
        sentences = split_into_sentences(content)
        
        if not sentences:
            return content
            
        # Get sentence importance scores
        importances = get_sentence_importance(sentences)
        
        # Deduplicate sentences
        kept_indices = self._deduplicate_sentences(sentences, importances)
        
        # Combine kept sentences
        kept_sentences = [sentences[i] for i in kept_indices]
        compressed = " ".join(kept_sentences)
        
        # If we need to compress further to meet target size
        if target_size and len(compressed.split()) > target_size:
            # Calculate how many more sentences to remove
            current_size = len(compressed.split())
            reduction_needed = current_size - target_size
            
            # Sort sentences by importance (lowest first)
            sentence_priorities = [(i, importances[i]) for i in kept_indices]
            sorted_priorities = sorted(sentence_priorities, key=lambda x: x[1])
            
            # Remove least important sentences until we meet target size
            remove_count = 0
            tokens_removed = 0
            indices_to_remove = []
            
            for idx, _ in sorted_priorities:
                sentence_tokens = len(sentences[idx].split())
                tokens_removed += sentence_tokens
                remove_count += 1
                indices_to_remove.append(idx)
                
                if tokens_removed >= reduction_needed:
                    break
            
            # Remove the low-importance sentences
            final_indices = [i for i in kept_indices if i not in indices_to_remove]
            
            # Recombine
            compressed = " ".join(sentences[i] for i in sorted(final_indices))
        
        # Log compression stats
        original_tokens = len(content.split())
        compressed_tokens = len(compressed.split())
        reduction = (1 - compressed_tokens / original_tokens) * 100 if original_tokens > 0 else 0
        
        logger.info(
            "Compressed from %d to %d tokens (%.1f%% reduction)",
            original_tokens, compressed_tokens, reduction
        )
        
        return compressed