File size: 14,888 Bytes
5ee0a10
eb5ebce
5ee0a10
7ccde22
dd6b309
7ccde22
eb5ebce
 
 
dd6b309
eb5ebce
265c29d
eb5ebce
 
801d9f2
dd6b309
 
eb5ebce
 
 
 
 
 
7ccde22
eb5ebce
7ccde22
74dd725
 
eb5ebce
 
 
74dd725
eb5ebce
74dd725
 
 
 
eb5ebce
74dd725
 
 
 
 
 
 
 
 
 
 
 
 
 
eb5ebce
74dd725
 
 
eb5ebce
ff6741a
eb5ebce
74dd725
eb5ebce
74dd725
 
 
ff6741a
 
 
 
 
 
74dd725
 
 
a138102
 
eb5ebce
 
 
 
 
 
 
 
3c95d1f
eb5ebce
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
801d9f2
eb5ebce
 
 
801d9f2
eb5ebce
 
801d9f2
eb5ebce
 
 
 
 
801d9f2
eb5ebce
3c95d1f
29bdbcf
eb5ebce
 
 
 
 
 
 
 
 
 
 
 
 
 
ff6741a
 
 
 
 
 
eb5ebce
 
 
ff6741a
eb5ebce
 
 
 
 
 
 
 
 
 
 
ff6741a
 
eb5ebce
 
 
 
801d9f2
eb5ebce
 
 
 
 
 
 
 
 
 
 
 
 
ff6741a
 
 
 
 
 
eb5ebce
ff6741a
 
 
 
 
 
 
 
 
 
 
eb5ebce
 
 
 
ff6741a
eb5ebce
 
 
0880e2f
eb5ebce
 
 
0880e2f
ff6741a
 
 
 
 
 
 
 
 
eb5ebce
 
0880e2f
eb5ebce
ff6741a
 
 
 
 
 
 
 
eb5ebce
ff6741a
eb5ebce
 
 
3c95d1f
eb5ebce
ff6741a
eb5ebce
 
 
0880e2f
3c95d1f
eb5ebce
 
 
 
 
3c95d1f
eb5ebce
 
0880e2f
3c95d1f
a138102
eb5ebce
 
a138102
 
 
 
ff6741a
 
4709458
a138102
 
0880e2f
eb5ebce
a138102
 
 
 
 
eb5ebce
a138102
eb5ebce
 
a138102
 
3c95d1f
a138102
eb5ebce
 
ff6741a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
import numpy as np
import pandas as pd
import faiss
import zipfile
import logging
from pathlib import Path
from sentence_transformers import SentenceTransformer, util
import streamlit as st
import time
import os
from urllib.parse import quote
import requests
import shutil
import concurrent.futures
from functools import lru_cache

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[logging.StreamHandler()]
)
logger = logging.getLogger("MetadataManager")

class MetadataManager:
    def __init__(self):
        self.metadata_path = Path("combined.parquet")
        self.df = None
        self.total_docs = 0
        
        logger.info("Initializing MetadataManager")
        self._load_metadata()
        logger.info(f"Total documents indexed: {self.total_docs}")

    def _load_metadata(self):
        """Load the combined parquet file directly"""
        logger.info("Loading metadata from combined.parquet")
        try:
            # Load the parquet file
            self.df = pd.read_parquet(self.metadata_path)
            
            # Clean and format the data
            self.df['source'] = self.df['source'].apply(
                lambda x: [
                    url.strip()
                    for url in str(x).split(';') 
                    if url.strip()
                ]
            )
            self.total_docs = len(self.df)
            
            logger.info(f"Successfully loaded {self.total_docs} documents")
        except Exception as e:
            logger.error(f"Failed to load metadata: {str(e)}")
            raise

    def get_metadata(self, global_indices):
        """Retrieve metadata for given indices with deduplication by title"""
        if isinstance(global_indices, np.ndarray) and global_indices.size == 0:
            return pd.DataFrame(columns=["title", "summary", 'authors', "similarity", "source"])
        
        try:
            # Directly index the DataFrame
            results = self.df.iloc[global_indices].copy()
            
            # Deduplicate by title to avoid near-duplicate results
            if len(results) > 1:
                results = results.drop_duplicates(subset=["title"])
            
            return results
        except Exception as e:
            logger.error(f"Metadata retrieval failed: {str(e)}")
            return pd.DataFrame(columns=["title", "summary", "similarity", "source", 'authors'])


class SemanticSearch:
    def __init__(self):
        self.shard_dir = Path("compressed_shards")
        self.model = None
        self.index_shards = []
        self.metadata_mgr = MetadataManager()
        self.shard_sizes = []
        self.cumulative_offsets = None
        self.total_vectors = 0
        self.logger = logging.getLogger("SemanticSearch")
        self.logger.info("Initializing SemanticSearch")
    
    @st.cache_resource
    def load_model(_self):
        return SentenceTransformer('all-MiniLM-L6-v2')
    
    def initialize_system(self):
        self.logger.info("Loading sentence transformer model")
        start_time = time.time()
        self.model = self.load_model()
        self.logger.info(f"Model loaded in {time.time() - start_time:.2f} seconds")
        self.logger.info("Loading FAISS indices")
        self._load_faiss_shards()
    
    def _load_faiss_shards(self):
        """Load FAISS shards concurrently and precompute cumulative offsets for global indexing."""
        self.logger.info(f"Searching for index files in {self.shard_dir}")
        if not self.shard_dir.exists():
            self.logger.error(f"Shard directory not found: {self.shard_dir}")
            return
        index_files = sorted(self.shard_dir.glob("*.index"))
        self.logger.info(f"Found {len(index_files)} index files")
        self.index_shards = []
        self.shard_sizes = []
        with concurrent.futures.ThreadPoolExecutor() as executor:
            future_to_file = {
                executor.submit(self._load_single_index, shard_path): shard_path
                for shard_path in index_files
            }
            for future in concurrent.futures.as_completed(future_to_file):
                shard_path = future_to_file[future]
                try:
                    index, size = future.result()
                    if index is not None:
                        self.index_shards.append(index)
                        self.shard_sizes.append(size)
                        self.logger.info(f"Loaded index {shard_path.name} with {size} vectors")
                except Exception as e:
                    self.logger.error(f"Error loading index {shard_path}: {str(e)}")
        self.total_vectors = sum(self.shard_sizes)
        self.logger.info(f"Total loaded vectors: {self.total_vectors} across {len(self.index_shards)} shards")
        self.cumulative_offsets = np.cumsum([0] + self.shard_sizes)
    
    def _load_single_index(self, shard_path):
        """Load a single FAISS index shard."""
        self.logger.info(f"Loading index: {shard_path}")
        start_time = time.time()
        file_size_mb = os.path.getsize(shard_path) / (1024 * 1024)
        self.logger.info(f"Index file size: {file_size_mb:.2f} MB")
        index = faiss.read_index(str(shard_path))
        size = index.ntotal
        self.logger.info(f"Index loaded in {time.time() - start_time:.2f} seconds")
        return index, size
    
    def _global_index(self, shard_idx, local_idx):
        """Convert a local index (within a shard) to a global index using precomputed offsets with validation."""
        if shard_idx < 0 or shard_idx >= len(self.index_shards):
            self.logger.error(f"Invalid shard index: {shard_idx}")
            return -1
        if local_idx < 0 or local_idx >= self.shard_sizes[shard_idx]:
            self.logger.warning(f"Local index {local_idx} may be out of bounds for shard {shard_idx}")
        return int(self.cumulative_offsets[shard_idx] + local_idx)
    
    def search(self, query, top_k=5):
        """Search for a query using parallel FAISS shard search with normalized vectors for proper cosine similarity."""
        self.logger.info(f"Searching for query: '{query}' (top_k={top_k})")
        start_time = time.time()
        if not query:
            self.logger.warning("Empty query provided")
            return pd.DataFrame()
        if not self.index_shards:
            self.logger.error("No index shards loaded")
            return pd.DataFrame()
        try:
            self.logger.info("Encoding query")
            query_embedding = self.model.encode([query], convert_to_numpy=True)
            # Normalize query embedding for proper cosine similarity comparison
            query_embedding = query_embedding / np.linalg.norm(query_embedding, axis=1, keepdims=True)
            self.logger.debug(f"Query encoded to shape {query_embedding.shape}")
        except Exception as e:
            self.logger.error(f"Query encoding failed: {str(e)}")
            return pd.DataFrame()
        
        all_distances = []
        all_global_indices = []
        with concurrent.futures.ThreadPoolExecutor() as executor:
            futures = {
                executor.submit(self._search_shard, shard_idx, index, query_embedding, top_k): shard_idx
                for shard_idx, index in enumerate(self.index_shards)
            }
            for future in concurrent.futures.as_completed(futures):
                result = future.result()
                if result is not None:
                    distances_part, global_indices_part = result
                    all_distances.extend(distances_part)
                    all_global_indices.extend(global_indices_part)
        
        # If no results found across all shards
        if not all_global_indices:
            self.logger.warning("No results found across any shards")
            return pd.DataFrame(columns=["title", "summary", "source", "authors", "similarity"])
            
        self.logger.info(f"Search found {len(all_global_indices)} results across all shards")
        
        # Sort all results by distance before processing
        combined = list(zip(all_distances, all_global_indices))
        combined.sort(reverse=True)  # Sort by distance (higher is better for cosine similarity)
        sorted_distances, sorted_indices = zip(*combined)
        
        # Limit to top-k across all shards
        top_distances = np.array(sorted_distances[:top_k])
        top_indices = np.array(sorted_indices[:top_k])
        
        results = self._process_results(top_distances, top_indices, top_k)
        self.logger.info(f"Search completed in {time.time() - start_time:.2f} seconds with {len(results)} final results")
        return results
    
    def _search_shard(self, shard_idx, index, query_embedding, top_k):
        """Search a single FAISS shard for the query embedding with proper error handling."""
        if index.ntotal == 0:
            self.logger.warning(f"Skipping empty shard {shard_idx}")
            return None
            
        try:
            shard_start = time.time()
            distances, indices = index.search(query_embedding, top_k)
            
            # Filter out invalid indices (-1 is returned by FAISS for insufficient results)
            valid_mask = (indices[0] >= 0) & (indices[0] < index.ntotal)
            valid_indices = indices[0][valid_mask]
            valid_distances = distances[0][valid_mask]
            
            if len(valid_indices) == 0:
                self.logger.debug(f"Shard {shard_idx}: No valid results found")
                return None
                
            if len(valid_indices) != top_k:
                self.logger.debug(f"Shard {shard_idx}: Found {len(valid_indices)} valid results out of {top_k}")
                
            global_indices = [self._global_index(shard_idx, idx) for idx in valid_indices]
            
            # Filter out any invalid global indices (could happen if _global_index validation fails)
            valid_global = [(d, i) for d, i in zip(valid_distances, global_indices) if i >= 0]
            if not valid_global:
                return None
                
            final_distances, final_indices = zip(*valid_global)
            
            self.logger.debug(f"Shard {shard_idx} search completed in {time.time() - shard_start:.3f}s")
            return final_distances, final_indices
        except Exception as e:
            self.logger.error(f"Search failed in shard {shard_idx}: {str(e)}")
            return None

    def _process_results(self, distances, global_indices, top_k):
        """Process raw search results with correct similarity calculation for cosine similarity."""
        process_start = time.time()
        if global_indices.size == 0 or distances.size == 0:
            self.logger.warning("No search results to process")
            return pd.DataFrame(columns=["title", "summary", "source", "authors", "similarity"])
    
        try:
            self.logger.info(f"Retrieving metadata for {len(global_indices)} indices")
            metadata_start = time.time()
            results = self.metadata_mgr.get_metadata(global_indices)
            self.logger.info(f"Metadata retrieved in {time.time() - metadata_start:.2f}s, got {len(results)} records")
    
            if len(results) == 0:
                self.logger.warning("No metadata found for indices")
                return pd.DataFrame(columns=["title", "summary", "source", "authors", "similarity"])
    
            # Handle distance-results alignment
            if len(results) != len(distances):
                self.logger.warning(f"Mismatch between distances ({len(distances)}) and results ({len(results)})")
                min_len = min(len(results), len(distances))
                results = results.iloc[:min_len]
                distances = distances[:min_len]

            # For inner product with normalized vectors, similarity is directly the distance
            # (FAISS IP search already returns higher scores for more similar items)
            results['similarity'] = 1 - (distances/2)
            
            # Deduplicate and sort
            required_columns = ["title", "summary", "authors", "source", "similarity"]
            pre_dedup = len(results)
            results = (
                results.drop_duplicates(subset=["title", "authors"])
                .sort_values("similarity", ascending=False)
                .head(top_k)
            )
            post_dedup = len(results)
            
            if pre_dedup > post_dedup:
                self.logger.info(f"Removed {pre_dedup - post_dedup} duplicate results")

            self.logger.info(f"Results processed in {time.time() - process_start:.2f}s")
            return results[required_columns].reset_index(drop=True)
            
        except Exception as e:
            self.logger.error(f"Result processing failed: {str(e)}", exc_info=True)
            return pd.DataFrame(columns=["title", "summary", "source", "authors", "similarity"])
    
    def search_with_threshold(self, query, top_k=10, similarity_threshold=0.6):
        """
        Search with a fixed similarity threshold, returning only results above the threshold.
        For cosine similarity with normalized vectors, threshold should be between 0 and 1.
        """
        # Get more results initially to ensure we have enough after filtering
        initial_results = self.search(query, top_k=top_k*2)
        
        if initial_results.empty:
            return initial_results
            
        # Filter by similarity threshold
        filtered_results = initial_results[initial_results['similarity'] >= similarity_threshold]
        
        # Return top-k of filtered results
        return filtered_results.head(top_k).reset_index(drop=True)
        
    def search_with_adaptive_threshold(self, query, top_k=10, percentile=75):
        """
        Search with an adaptive threshold based on the distribution of similarity scores.
        Returns results above the specified percentile of similarity scores.
        """
        # Get more results initially to determine distribution
        initial_results = self.search(query, top_k=top_k*3)
        
        if initial_results.empty or len(initial_results) < 2:
            return initial_results
            
        # Calculate threshold based on percentile of similarity scores
        threshold = np.percentile(initial_results['similarity'], percentile)
        self.logger.info(f"Adaptive threshold set to {threshold:.4f} (percentile: {percentile})")
        
        # Filter results above threshold
        filtered_results = initial_results[initial_results['similarity'] >= threshold]
        
        # Return top-k of filtered results
        return filtered_results.head(top_k).reset_index(drop=True)