File size: 19,223 Bytes
5ee0a10
eb5ebce
5ee0a10
7ccde22
dd6b309
7ccde22
eb5ebce
 
 
dd6b309
eb5ebce
265c29d
eb5ebce
 
 
801d9f2
dd6b309
 
eb5ebce
 
 
 
 
 
7ccde22
eb5ebce
7ccde22
eb5ebce
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0880e2f
 
eb5ebce
 
0880e2f
eb5ebce
0880e2f
eb5ebce
0880e2f
eb5ebce
 
 
 
 
 
 
0880e2f
eb5ebce
 
 
 
 
 
 
 
 
0880e2f
eb5ebce
 
0880e2f
eb5ebce
 
 
 
 
2906fee
801d9f2
eb5ebce
 
 
 
 
 
 
 
2906fee
eb5ebce
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2906fee
eb5ebce
 
 
 
 
 
 
 
 
 
3c95d1f
eb5ebce
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
801d9f2
eb5ebce
 
 
801d9f2
eb5ebce
 
801d9f2
eb5ebce
 
 
 
 
801d9f2
eb5ebce
3c95d1f
29bdbcf
eb5ebce
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
801d9f2
eb5ebce
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0880e2f
eb5ebce
 
 
 
 
 
0880e2f
eb5ebce
 
0880e2f
eb5ebce
 
 
 
 
 
3c95d1f
eb5ebce
 
 
 
 
0880e2f
3c95d1f
eb5ebce
 
 
 
 
3c95d1f
eb5ebce
 
0880e2f
3c95d1f
eb5ebce
 
 
 
 
 
3c95d1f
eb5ebce
 
3c95d1f
 
2906fee
3c95d1f
0880e2f
 
 
 
3c95d1f
eb5ebce
3c95d1f
 
eb5ebce
 
 
3c95d1f
eb5ebce
3c95d1f
eb5ebce
 
3c95d1f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
import numpy as np
import pandas as pd
import faiss
import zipfile
import logging
from pathlib import Path
from sentence_transformers import SentenceTransformer, util
import streamlit as st
import time
import os
from urllib.parse import quote
import requests
import shutil
import concurrent.futures
# Optional: Uncomment if you want to use lru_cache for instance methods
from functools import lru_cache

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[logging.StreamHandler()]
)
logger = logging.getLogger("MetadataManager")

class MetadataManager:
    def __init__(self):
        self.cache_dir = Path("unzipped_cache")
        self.shard_dir = self.cache_dir / "metadata_shards"
        self.shard_map = {}
        self.loaded_shards = {}
        self.total_docs = 0
        self.api_cache = {}
        
        logger.info("Initializing MetadataManager")
        self._ensure_directories()
        self._unzip_if_needed()
        self._build_shard_map()
        logger.info(f"Total documents indexed: {self.total_docs}")
        logger.info(f"Total shards found: {len(self.shard_map)}")
    
    def _ensure_directories(self):
        """Create necessary directories if they don't exist."""
        self.cache_dir.mkdir(parents=True, exist_ok=True)
        self.shard_dir.mkdir(parents=True, exist_ok=True)
    
    def _unzip_if_needed(self):
        """Extract the ZIP archive if no parquet files are found."""
        zip_path = Path("metadata_shards.zip")
        if not any(self.shard_dir.rglob("*.parquet")):
            logger.info("No parquet files found, checking for zip archive")
            if not zip_path.exists():
                raise FileNotFoundError(f"Metadata ZIP file not found at {zip_path}")
            logger.info(f"Extracting {zip_path} to {self.shard_dir}")
            try:
                with zipfile.ZipFile(zip_path, 'r') as zip_ref:
                    zip_root = self._get_zip_root(zip_ref)
                    zip_ref.extractall(self.shard_dir)
                    if zip_root:
                        nested_dir = self.shard_dir / zip_root
                        if nested_dir.exists():
                            self._flatten_directory(nested_dir, self.shard_dir)
                            nested_dir.rmdir()
                    parquet_files = list(self.shard_dir.rglob("*.parquet"))
                    if not parquet_files:
                        raise RuntimeError("Extraction completed but no parquet files found")
                    logger.info(f"Found {len(parquet_files)} parquet files after extraction")
            except Exception as e:
                logger.error(f"Failed to extract zip file: {str(e)}")
                self._clean_failed_extraction()
                raise
    
    def _get_zip_root(self, zip_ref):
        """Identify the common root directory within the ZIP file."""
        try:
            first_file = zip_ref.namelist()[0]
            if '/' in first_file:
                return first_file.split('/')[0]
            return ""
        except Exception as e:
            logger.warning(f"Error detecting zip root: {str(e)}")
            return ""
    
    def _flatten_directory(self, src_dir, dest_dir):
        """Move files from a nested directory up to the destination."""
        for item in src_dir.iterdir():
            if item.is_dir():
                self._flatten_directory(item, dest_dir)
                item.rmdir()
            else:
                target = dest_dir / item.name
                if target.exists():
                    target.unlink()
                item.rename(target)
    
    def _clean_failed_extraction(self):
        """Clean up files from a failed extraction attempt."""
        logger.info("Cleaning up failed extraction")
        for item in self.shard_dir.iterdir():
            if item.is_dir():
                shutil.rmtree(item)
            else:
                item.unlink()
    
    def _build_shard_map(self):
        """Build a map from global index ranges to shard filenames."""
        logger.info("Building shard map from parquet files")
        parquet_files = list(self.shard_dir.glob("*.parquet"))
        if not parquet_files:
            raise FileNotFoundError("No parquet files found after extraction")
        parquet_files = sorted(parquet_files, key=lambda x: int(x.stem.split("_")[1]))
        expected_start = 0
        for f in parquet_files:
            try:
                parts = f.stem.split("_")
                if len(parts) != 3:
                    raise ValueError("Invalid filename format")
                start = int(parts[1])
                end = int(parts[2])
                if start != expected_start:
                    raise ValueError(f"Non-contiguous shard start: expected {expected_start}, got {start}")
                if end <= start:
                    raise ValueError(f"Invalid shard range: {start}-{end}")
                self.shard_map[(start, end)] = f.name
                self.total_docs = end + 1
                expected_start = end + 1
                logger.debug(f"Mapped shard {f.name}: indices {start}-{end}")
            except Exception as e:
                logger.error(f"Error processing shard {f.name}: {str(e)}")
                raise RuntimeError("Invalid shard structure") from e
        logger.info(f"Validated {len(self.shard_map)} continuous shards")
        logger.info(f"Total document count: {self.total_docs}")
        sorted_ranges = sorted(self.shard_map.keys())
        for i in range(1, len(sorted_ranges)):
            prev_end = sorted_ranges[i-1][1]
            curr_start = sorted_ranges[i][0]
            if curr_start != prev_end + 1:
                logger.warning(f"Gap or overlap detected between shards: {prev_end} to {curr_start}")
    
    def _process_shard(self, shard, local_indices):
        """Load a shard (if not already loaded) and retrieve the specified rows."""
        try:
            if shard not in self.loaded_shards:
                shard_path = self.shard_dir / shard
                if not shard_path.exists():
                    logger.error(f"Shard file not found: {shard_path}")
                    return pd.DataFrame(columns=["title", "summary", "similarity","authors", "source"])
                    
                file_size_mb = os.path.getsize(shard_path) / (1024 * 1024)
                logger.info(f"Loading shard file: {shard} (size: {file_size_mb:.2f} MB)")
                
                try:
                    self.loaded_shards[shard] = pd.read_parquet(shard_path, columns=["title", "summary", "source", "authors"])
                    logger.info(f"Loaded shard {shard} with {len(self.loaded_shards[shard])} rows")
                    
                except Exception as e:
                    logger.error(f"Failed to read parquet file {shard}: {str(e)}")
                    try:
                        schema = pd.read_parquet(shard_path, engine='pyarrow').dtypes
                        logger.info(f"Parquet schema: {schema}")
                    except Exception:
                        pass
                    return pd.DataFrame(columns=["title", "summary", "similarity", "source", "authors"])
            df = self.loaded_shards[shard]
            df_len = len(df)
            valid_local_indices = [idx for idx in local_indices if 0 <= idx < df_len]
            if len(valid_local_indices) != len(local_indices):
                logger.warning(f"Filtered {len(local_indices) - len(valid_local_indices)} out-of-bounds indices in shard {shard}")
            if valid_local_indices:
                chunk = df.iloc[valid_local_indices]
                logger.info(f"Retrieved {len(chunk)} records from shard {shard}")
                return chunk
                
        except Exception as e:
            logger.error(f"Error processing shard {shard}: {str(e)}", exc_info=True)
        return pd.DataFrame(columns=["title", "summary", "similarity", "source", "authors"])
    
    def get_metadata(self, global_indices):
        """Retrieve metadata for a batch of global indices using parallel shard processing."""
        if isinstance(global_indices, np.ndarray) and global_indices.size == 0:
            logger.warning("Empty indices array passed to get_metadata")
            return pd.DataFrame(columns=["title", "summary", "similarity", "source"])
        
        indices_list = global_indices.tolist() if isinstance(global_indices, np.ndarray) else global_indices
        logger.info(f"Retrieving metadata for {len(indices_list)} indices")
        valid_indices = [idx for idx in indices_list if 0 <= idx < self.total_docs]
        invalid_count = len(indices_list) - len(valid_indices)
        if invalid_count > 0:
            logger.warning(f"Filtered out {invalid_count} invalid indices")
        if not valid_indices:
            logger.warning("No valid indices remain after filtering")
            return pd.DataFrame(columns=["title", "summary", "similarity", "source"])
        
        # Group indices by shard
        shard_groups = {}
        for idx in valid_indices:
            found = False
            for (start, end), shard in self.shard_map.items():
                if start <= idx <= end:
                    shard_groups.setdefault(shard, []).append(idx - start)
                    found = True
                    break
            if not found:
                logger.warning(f"Index {idx} not found in any shard range")
        
        # Process shards concurrently
        results = []
        with concurrent.futures.ThreadPoolExecutor() as executor:
            futures = [executor.submit(self._process_shard, shard, local_indices)
                       for shard, local_indices in shard_groups.items()]
            for future in concurrent.futures.as_completed(futures):
                df_chunk = future.result()
                if not df_chunk.empty:
                    results.append(df_chunk)
        
        if results:
            combined = pd.concat(results).reset_index(drop=True)
            logger.info(f"Combined metadata: {len(combined)} records from {len(results)} shards")
            return combined
        else:
            logger.warning("No metadata records retrieved")
            return pd.DataFrame(columns=["title", "summary", "similarity", "source"])
    
    
class SemanticSearch:
    def __init__(self):
        self.shard_dir = Path("compressed_shards")
        self.model = None
        self.index_shards = []
        self.metadata_mgr = MetadataManager()
        self.shard_sizes = []
        self.cumulative_offsets = None
        self.total_vectors = 0
        self.logger = logging.getLogger("SemanticSearch")
        self.logger.info("Initializing SemanticSearch")
    
    @st.cache_resource
    def load_model(_self):
        return SentenceTransformer('all-MiniLM-L6-v2')
    
    def initialize_system(self):
        self.logger.info("Loading sentence transformer model")
        start_time = time.time()
        self.model = self.load_model()
        self.logger.info(f"Model loaded in {time.time() - start_time:.2f} seconds")
        self.logger.info("Loading FAISS indices")
        self._load_faiss_shards()
    
    def _load_faiss_shards(self):
        """Load FAISS shards concurrently and precompute cumulative offsets for global indexing."""
        self.logger.info(f"Searching for index files in {self.shard_dir}")
        if not self.shard_dir.exists():
            self.logger.error(f"Shard directory not found: {self.shard_dir}")
            return
        index_files = sorted(self.shard_dir.glob("*.index"))
        self.logger.info(f"Found {len(index_files)} index files")
        self.index_shards = []
        self.shard_sizes = []
        with concurrent.futures.ThreadPoolExecutor() as executor:
            future_to_file = {
                executor.submit(self._load_single_index, shard_path): shard_path
                for shard_path in index_files
            }
            for future in concurrent.futures.as_completed(future_to_file):
                shard_path = future_to_file[future]
                try:
                    index, size = future.result()
                    if index is not None:
                        self.index_shards.append(index)
                        self.shard_sizes.append(size)
                        self.logger.info(f"Loaded index {shard_path.name} with {size} vectors")
                except Exception as e:
                    self.logger.error(f"Error loading index {shard_path}: {str(e)}")
        self.total_vectors = sum(self.shard_sizes)
        self.logger.info(f"Total loaded vectors: {self.total_vectors} across {len(self.index_shards)} shards")
        self.cumulative_offsets = np.cumsum([0] + self.shard_sizes)
    
    def _load_single_index(self, shard_path):
        """Load a single FAISS index shard."""
        self.logger.info(f"Loading index: {shard_path}")
        start_time = time.time()
        file_size_mb = os.path.getsize(shard_path) / (1024 * 1024)
        self.logger.info(f"Index file size: {file_size_mb:.2f} MB")
        index = faiss.read_index(str(shard_path))
        size = index.ntotal
        self.logger.info(f"Index loaded in {time.time() - start_time:.2f} seconds")
        return index, size
    
    def _global_index(self, shard_idx, local_idx):
        """Convert a local index (within a shard) to a global index using precomputed offsets."""
        return int(self.cumulative_offsets[shard_idx] + local_idx)
    
    def search(self, query, top_k=5):
        """Search for a query using parallel FAISS shard search."""
        self.logger.info(f"Searching for query: '{query}' (top_k={top_k})")
        start_time = time.time()
        if not query:
            self.logger.warning("Empty query provided")
            return pd.DataFrame()
        if not self.index_shards:
            self.logger.error("No index shards loaded")
            return pd.DataFrame()
        try:
            self.logger.info("Encoding query")
            query_embedding = self.model.encode([query], convert_to_numpy=True)
            self.logger.debug(f"Query encoded to shape {query_embedding.shape}")
        except Exception as e:
            self.logger.error(f"Query encoding failed: {str(e)}")
            return pd.DataFrame()
        
        all_distances = []
        all_global_indices = []
        # Run shard searches in parallel
        with concurrent.futures.ThreadPoolExecutor() as executor:
            futures = {
                executor.submit(self._search_shard, shard_idx, index, query_embedding, top_k): shard_idx
                for shard_idx, index in enumerate(self.index_shards)
            }
            for future in concurrent.futures.as_completed(futures):
                result = future.result()
                if result is not None:
                    distances_part, global_indices_part = result
                    all_distances.extend(distances_part)
                    all_global_indices.extend(global_indices_part)
        self.logger.info(f"Search found {len(all_global_indices)} results across all shards")
        results = self._process_results(np.array(all_distances), np.array(all_global_indices), top_k)
        self.logger.info(f"Search completed in {time.time() - start_time:.2f} seconds with {len(results)} final results")
        return results
    
    def _search_shard(self, shard_idx, index, query_embedding, top_k):
        """Search a single FAISS shard for the query embedding."""
        if index.ntotal == 0:
            self.logger.warning(f"Skipping empty shard {shard_idx}")
            return None
            
        try:
            shard_start = time.time()
            distances, indices = index.search(query_embedding, top_k)
            valid_mask = (indices[0] >= 0) & (indices[0] < index.ntotal)
            valid_indices = indices[0][valid_mask].tolist()
            valid_distances = distances[0][valid_mask].tolist()
            
            if len(valid_indices) != top_k:
                self.logger.debug(f"Shard {shard_idx}: Found {len(valid_indices)} valid results out of {top_k}")
                
            global_indices = [self._global_index(shard_idx, idx) for idx in valid_indices]
            self.logger.debug(f"Shard {shard_idx} search completed in {time.time() - shard_start:.3f}s")
            return valid_distances, global_indices
        except Exception as e:
            self.logger.error(f"Search failed in shard {shard_idx}: {str(e)}")
            return None

    def _process_results(self, distances, global_indices, top_k):
        """Process raw search results: retrieve metadata, calculate similarity, and deduplicate."""
        process_start = time.time()
        if global_indices.size == 0 or distances.size == 0:
            self.logger.warning("No search results to process")
            return pd.DataFrame(columns=["title", "summary", "source", "authors", "similarity"])
    
        try:
            self.logger.info(f"Retrieving metadata for {len(global_indices)} indices")
            metadata_start = time.time()
            results = self.metadata_mgr.get_metadata(global_indices)
            self.logger.info(f"Metadata retrieved in {time.time() - metadata_start:.2f}s, got {len(results)} records")
    
            if len(results) == 0:
                self.logger.warning("No metadata found for indices")
                return pd.DataFrame(columns=["title", "summary", "source", "authors", "similarity"])
    
            if len(results) != len(distances):
                self.logger.warning(f"Mismatch between distances ({len(distances)}) and results ({len(results)})")
                if len(results) < len(distances):
                    distances = distances[:len(results)]
                else:
                    distances = np.pad(distances, (0, len(results) - len(distances)), 'constant', constant_values=1.0)
    
            self.logger.debug("Calculating similarity scores")
            results['similarity'] = 1 - (distances / 2)
    
            # Ensure all required columns
            results['source'] = results["source"]
    
            required_columns = ["title", "summary", "authors", "source", "similarity"]
            for col in required_columns:
                if col not in results.columns:
                    results[col] = None  # Fill missing columns with None
    
            pre_dedup = len(results)
            results = results.drop_duplicates(subset=["title", "authors", "source"]).sort_values("similarity", ascending=False).head(top_k)
    
            post_dedup = len(results)
            if pre_dedup > post_dedup:
                self.logger.info(f"Removed {pre_dedup - post_dedup} duplicate results")
    
            self.logger.info(f"Results processed in {time.time() - process_start:.2f}s, returning {len(results)} items")
            return results[required_columns].reset_index(drop=True)
        except Exception as e:
            self.logger.error(f"Result processing failed: {str(e)}", exc_info=True)
            return pd.DataFrame(columns=["title", "summary", "source", "similarity"])