Spaces:
Running
Running
Update search_utils.py
Browse files- search_utils.py +95 -25
search_utils.py
CHANGED
@@ -55,18 +55,22 @@ class MetadataManager:
|
|
55 |
raise
|
56 |
|
57 |
def get_metadata(self, global_indices):
|
58 |
-
"""Retrieve metadata for given indices"""
|
59 |
if isinstance(global_indices, np.ndarray) and global_indices.size == 0:
|
60 |
return pd.DataFrame(columns=["title", "summary", 'authors', "similarity", "source"])
|
61 |
|
62 |
try:
|
63 |
# Directly index the DataFrame
|
64 |
results = self.df.iloc[global_indices].copy()
|
65 |
-
|
|
|
|
|
|
|
|
|
|
|
66 |
except Exception as e:
|
67 |
logger.error(f"Metadata retrieval failed: {str(e)}")
|
68 |
return pd.DataFrame(columns=["title", "summary", "similarity", "source", 'authors'])
|
69 |
-
|
70 |
|
71 |
|
72 |
class SemanticSearch:
|
@@ -134,11 +138,16 @@ class SemanticSearch:
|
|
134 |
return index, size
|
135 |
|
136 |
def _global_index(self, shard_idx, local_idx):
|
137 |
-
"""Convert a local index (within a shard) to a global index using precomputed offsets."""
|
|
|
|
|
|
|
|
|
|
|
138 |
return int(self.cumulative_offsets[shard_idx] + local_idx)
|
139 |
|
140 |
def search(self, query, top_k=5):
|
141 |
-
"""Search for a query using parallel FAISS shard search."""
|
142 |
self.logger.info(f"Searching for query: '{query}' (top_k={top_k})")
|
143 |
start_time = time.time()
|
144 |
if not query:
|
@@ -150,6 +159,8 @@ class SemanticSearch:
|
|
150 |
try:
|
151 |
self.logger.info("Encoding query")
|
152 |
query_embedding = self.model.encode([query], convert_to_numpy=True)
|
|
|
|
|
153 |
self.logger.debug(f"Query encoded to shape {query_embedding.shape}")
|
154 |
except Exception as e:
|
155 |
self.logger.error(f"Query encoding failed: {str(e)}")
|
@@ -168,13 +179,29 @@ class SemanticSearch:
|
|
168 |
distances_part, global_indices_part = result
|
169 |
all_distances.extend(distances_part)
|
170 |
all_global_indices.extend(global_indices_part)
|
|
|
|
|
|
|
|
|
|
|
|
|
171 |
self.logger.info(f"Search found {len(all_global_indices)} results across all shards")
|
172 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
173 |
self.logger.info(f"Search completed in {time.time() - start_time:.2f} seconds with {len(results)} final results")
|
174 |
return results
|
175 |
|
176 |
def _search_shard(self, shard_idx, index, query_embedding, top_k):
|
177 |
-
"""Search a single FAISS shard for the query embedding."""
|
178 |
if index.ntotal == 0:
|
179 |
self.logger.warning(f"Skipping empty shard {shard_idx}")
|
180 |
return None
|
@@ -182,23 +209,36 @@ class SemanticSearch:
|
|
182 |
try:
|
183 |
shard_start = time.time()
|
184 |
distances, indices = index.search(query_embedding, top_k)
|
185 |
-
# valid_mask = (indices[0] >= 0) & (indices[0] < index.ntotal)
|
186 |
-
valid_indices = indices[0]
|
187 |
-
valid_distances = distances[0]
|
188 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
189 |
if len(valid_indices) != top_k:
|
190 |
self.logger.debug(f"Shard {shard_idx}: Found {len(valid_indices)} valid results out of {top_k}")
|
191 |
|
192 |
global_indices = [self._global_index(shard_idx, idx) for idx in valid_indices]
|
193 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
194 |
self.logger.debug(f"Shard {shard_idx} search completed in {time.time() - shard_start:.3f}s")
|
195 |
-
return
|
196 |
except Exception as e:
|
197 |
self.logger.error(f"Search failed in shard {shard_idx}: {str(e)}")
|
198 |
return None
|
199 |
|
200 |
def _process_results(self, distances, global_indices, top_k):
|
201 |
-
"""Process raw search results with correct similarity calculation."""
|
202 |
process_start = time.time()
|
203 |
if global_indices.size == 0 or distances.size == 0:
|
204 |
self.logger.warning("No search results to process")
|
@@ -221,18 +261,10 @@ class SemanticSearch:
|
|
221 |
results = results.iloc[:min_len]
|
222 |
distances = distances[:min_len]
|
223 |
|
224 |
-
#
|
225 |
-
|
|
|
226 |
|
227 |
-
# Ensure URL lists are properly formatted
|
228 |
-
# results['source'] = results['source'].apply(
|
229 |
-
# lambda x: [
|
230 |
-
# url.strip().rstrip(')') # Clean trailing parentheses and whitespace
|
231 |
-
# for url in str(x).split(';') # Split on semicolons
|
232 |
-
# if url.strip() # Remove empty strings
|
233 |
-
# ] if isinstance(x, (str, list)) else []
|
234 |
-
# )
|
235 |
-
|
236 |
# Deduplicate and sort
|
237 |
required_columns = ["title", "summary", "authors", "source", "similarity"]
|
238 |
pre_dedup = len(results)
|
@@ -251,4 +283,42 @@ class SemanticSearch:
|
|
251 |
|
252 |
except Exception as e:
|
253 |
self.logger.error(f"Result processing failed: {str(e)}", exc_info=True)
|
254 |
-
return pd.DataFrame(columns=["title", "summary", "source", "similarity"])
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
55 |
raise
|
56 |
|
57 |
def get_metadata(self, global_indices):
|
58 |
+
"""Retrieve metadata for given indices with deduplication by title"""
|
59 |
if isinstance(global_indices, np.ndarray) and global_indices.size == 0:
|
60 |
return pd.DataFrame(columns=["title", "summary", 'authors', "similarity", "source"])
|
61 |
|
62 |
try:
|
63 |
# Directly index the DataFrame
|
64 |
results = self.df.iloc[global_indices].copy()
|
65 |
+
|
66 |
+
# Deduplicate by title to avoid near-duplicate results
|
67 |
+
if len(results) > 1:
|
68 |
+
results = results.drop_duplicates(subset=["title"])
|
69 |
+
|
70 |
+
return results
|
71 |
except Exception as e:
|
72 |
logger.error(f"Metadata retrieval failed: {str(e)}")
|
73 |
return pd.DataFrame(columns=["title", "summary", "similarity", "source", 'authors'])
|
|
|
74 |
|
75 |
|
76 |
class SemanticSearch:
|
|
|
138 |
return index, size
|
139 |
|
140 |
def _global_index(self, shard_idx, local_idx):
|
141 |
+
"""Convert a local index (within a shard) to a global index using precomputed offsets with validation."""
|
142 |
+
if shard_idx < 0 or shard_idx >= len(self.index_shards):
|
143 |
+
self.logger.error(f"Invalid shard index: {shard_idx}")
|
144 |
+
return -1
|
145 |
+
if local_idx < 0 or local_idx >= self.shard_sizes[shard_idx]:
|
146 |
+
self.logger.warning(f"Local index {local_idx} may be out of bounds for shard {shard_idx}")
|
147 |
return int(self.cumulative_offsets[shard_idx] + local_idx)
|
148 |
|
149 |
def search(self, query, top_k=5):
|
150 |
+
"""Search for a query using parallel FAISS shard search with normalized vectors for proper cosine similarity."""
|
151 |
self.logger.info(f"Searching for query: '{query}' (top_k={top_k})")
|
152 |
start_time = time.time()
|
153 |
if not query:
|
|
|
159 |
try:
|
160 |
self.logger.info("Encoding query")
|
161 |
query_embedding = self.model.encode([query], convert_to_numpy=True)
|
162 |
+
# Normalize query embedding for proper cosine similarity comparison
|
163 |
+
query_embedding = query_embedding / np.linalg.norm(query_embedding, axis=1, keepdims=True)
|
164 |
self.logger.debug(f"Query encoded to shape {query_embedding.shape}")
|
165 |
except Exception as e:
|
166 |
self.logger.error(f"Query encoding failed: {str(e)}")
|
|
|
179 |
distances_part, global_indices_part = result
|
180 |
all_distances.extend(distances_part)
|
181 |
all_global_indices.extend(global_indices_part)
|
182 |
+
|
183 |
+
# If no results found across all shards
|
184 |
+
if not all_global_indices:
|
185 |
+
self.logger.warning("No results found across any shards")
|
186 |
+
return pd.DataFrame(columns=["title", "summary", "source", "authors", "similarity"])
|
187 |
+
|
188 |
self.logger.info(f"Search found {len(all_global_indices)} results across all shards")
|
189 |
+
|
190 |
+
# Sort all results by distance before processing
|
191 |
+
combined = list(zip(all_distances, all_global_indices))
|
192 |
+
combined.sort(reverse=True) # Sort by distance (higher is better for cosine similarity)
|
193 |
+
sorted_distances, sorted_indices = zip(*combined)
|
194 |
+
|
195 |
+
# Limit to top-k across all shards
|
196 |
+
top_distances = np.array(sorted_distances[:top_k])
|
197 |
+
top_indices = np.array(sorted_indices[:top_k])
|
198 |
+
|
199 |
+
results = self._process_results(top_distances, top_indices, top_k)
|
200 |
self.logger.info(f"Search completed in {time.time() - start_time:.2f} seconds with {len(results)} final results")
|
201 |
return results
|
202 |
|
203 |
def _search_shard(self, shard_idx, index, query_embedding, top_k):
|
204 |
+
"""Search a single FAISS shard for the query embedding with proper error handling."""
|
205 |
if index.ntotal == 0:
|
206 |
self.logger.warning(f"Skipping empty shard {shard_idx}")
|
207 |
return None
|
|
|
209 |
try:
|
210 |
shard_start = time.time()
|
211 |
distances, indices = index.search(query_embedding, top_k)
|
|
|
|
|
|
|
212 |
|
213 |
+
# Filter out invalid indices (-1 is returned by FAISS for insufficient results)
|
214 |
+
valid_mask = (indices[0] >= 0) & (indices[0] < index.ntotal)
|
215 |
+
valid_indices = indices[0][valid_mask]
|
216 |
+
valid_distances = distances[0][valid_mask]
|
217 |
+
|
218 |
+
if len(valid_indices) == 0:
|
219 |
+
self.logger.debug(f"Shard {shard_idx}: No valid results found")
|
220 |
+
return None
|
221 |
+
|
222 |
if len(valid_indices) != top_k:
|
223 |
self.logger.debug(f"Shard {shard_idx}: Found {len(valid_indices)} valid results out of {top_k}")
|
224 |
|
225 |
global_indices = [self._global_index(shard_idx, idx) for idx in valid_indices]
|
226 |
+
|
227 |
+
# Filter out any invalid global indices (could happen if _global_index validation fails)
|
228 |
+
valid_global = [(d, i) for d, i in zip(valid_distances, global_indices) if i >= 0]
|
229 |
+
if not valid_global:
|
230 |
+
return None
|
231 |
+
|
232 |
+
final_distances, final_indices = zip(*valid_global)
|
233 |
+
|
234 |
self.logger.debug(f"Shard {shard_idx} search completed in {time.time() - shard_start:.3f}s")
|
235 |
+
return final_distances, final_indices
|
236 |
except Exception as e:
|
237 |
self.logger.error(f"Search failed in shard {shard_idx}: {str(e)}")
|
238 |
return None
|
239 |
|
240 |
def _process_results(self, distances, global_indices, top_k):
|
241 |
+
"""Process raw search results with correct similarity calculation for cosine similarity."""
|
242 |
process_start = time.time()
|
243 |
if global_indices.size == 0 or distances.size == 0:
|
244 |
self.logger.warning("No search results to process")
|
|
|
261 |
results = results.iloc[:min_len]
|
262 |
distances = distances[:min_len]
|
263 |
|
264 |
+
# For inner product with normalized vectors, similarity is directly the distance
|
265 |
+
# (FAISS IP search already returns higher scores for more similar items)
|
266 |
+
results['similarity'] = distances
|
267 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
268 |
# Deduplicate and sort
|
269 |
required_columns = ["title", "summary", "authors", "source", "similarity"]
|
270 |
pre_dedup = len(results)
|
|
|
283 |
|
284 |
except Exception as e:
|
285 |
self.logger.error(f"Result processing failed: {str(e)}", exc_info=True)
|
286 |
+
return pd.DataFrame(columns=["title", "summary", "source", "authors", "similarity"])
|
287 |
+
|
288 |
+
def search_with_threshold(self, query, top_k=10, similarity_threshold=0.6):
|
289 |
+
"""
|
290 |
+
Search with a fixed similarity threshold, returning only results above the threshold.
|
291 |
+
For cosine similarity with normalized vectors, threshold should be between 0 and 1.
|
292 |
+
"""
|
293 |
+
# Get more results initially to ensure we have enough after filtering
|
294 |
+
initial_results = self.search(query, top_k=top_k*2)
|
295 |
+
|
296 |
+
if initial_results.empty:
|
297 |
+
return initial_results
|
298 |
+
|
299 |
+
# Filter by similarity threshold
|
300 |
+
filtered_results = initial_results[initial_results['similarity'] >= similarity_threshold]
|
301 |
+
|
302 |
+
# Return top-k of filtered results
|
303 |
+
return filtered_results.head(top_k).reset_index(drop=True)
|
304 |
+
|
305 |
+
def search_with_adaptive_threshold(self, query, top_k=10, percentile=75):
|
306 |
+
"""
|
307 |
+
Search with an adaptive threshold based on the distribution of similarity scores.
|
308 |
+
Returns results above the specified percentile of similarity scores.
|
309 |
+
"""
|
310 |
+
# Get more results initially to determine distribution
|
311 |
+
initial_results = self.search(query, top_k=top_k*3)
|
312 |
+
|
313 |
+
if initial_results.empty or len(initial_results) < 2:
|
314 |
+
return initial_results
|
315 |
+
|
316 |
+
# Calculate threshold based on percentile of similarity scores
|
317 |
+
threshold = np.percentile(initial_results['similarity'], percentile)
|
318 |
+
self.logger.info(f"Adaptive threshold set to {threshold:.4f} (percentile: {percentile})")
|
319 |
+
|
320 |
+
# Filter results above threshold
|
321 |
+
filtered_results = initial_results[initial_results['similarity'] >= threshold]
|
322 |
+
|
323 |
+
# Return top-k of filtered results
|
324 |
+
return filtered_results.head(top_k).reset_index(drop=True)
|