File size: 24,541 Bytes
5ee0a10
 
 
7ccde22
dd6b309
7ccde22
017ee94
7ccde22
dd6b309
 
fc636d4
265c29d
fc636d4
dd6b309
 
 
 
 
 
 
 
 
 
7ccde22
 
 
d286a45
 
7ccde22
 
017ee94
f9e4fd2
dd6b309
 
d286a45
 
7ccde22
dd6b309
 
7ccde22
d286a45
 
 
 
 
 
4e62c61
d286a45
 
4e62c61
 
d286a45
 
 
 
 
 
 
7ccde22
4e62c61
 
 
 
7ccde22
d286a45
4e62c61
 
 
 
 
 
 
 
 
 
 
d286a45
 
4e62c61
 
d286a45
 
4e62c61
d286a45
4e62c61
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d286a45
7ccde22
d286a45
dd6b309
d286a45
dd6b309
d286a45
 
dd6b309
d286a45
 
 
 
 
 
 
dd6b309
 
d286a45
 
dd6b309
 
 
d286a45
 
 
 
 
 
 
 
 
dd6b309
d286a45
 
 
dd6b309
d286a45
dd6b309
d286a45
 
 
 
 
dd6b309
 
 
 
 
 
 
 
 
 
 
 
7ccde22
017ee94
a223079
2dec497
 
dd6b309
a223079
7ccde22
2dec497
 
dd6b309
2dec497
a223079
2dec497
dd6b309
 
 
 
a223079
dd6b309
a223079
 
 
 
dd6b309
 
a223079
 
7ccde22
 
 
 
 
a223079
7ccde22
a223079
dd6b309
 
 
 
 
a223079
 
 
7ccde22
a223079
dd6b309
 
 
a223079
dd6b309
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a223079
 
dd6b309
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a223079
dd6b309
a223079
 
dd6b309
 
 
 
 
 
 
 
7ccde22
f9e4fd2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b1812af
f9e4fd2
b1812af
f9e4fd2
 
 
 
 
 
 
 
 
 
b1812af
f9e4fd2
 
 
 
 
 
 
 
b1812af
 
 
 
 
 
 
f9e4fd2
b1812af
f9e4fd2
 
b1812af
f9e4fd2
5ee0a10
017ee94
 
5ee0a10
 
dd6b309
017ee94
b2bcde5
dd6b309
 
 
 
5ee0a10
 
 
017ee94
5ee0a10
dd6b309
 
5ee0a10
dd6b309
 
 
017ee94
5ee0a10
017ee94
 
dd6b309
 
 
 
 
 
 
 
 
017ee94
dd6b309
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
017ee94
 
 
 
5ee0a10
 
a223079
dd6b309
 
 
 
 
 
 
 
 
a223079
 
 
dd6b309
a223079
dd6b309
a223079
dd6b309
a223079
 
017ee94
 
a223079
 
dd6b309
5ee0a10
a223079
dd6b309
a223079
 
 
dd6b309
a223079
dd6b309
 
 
 
 
 
 
 
a223079
 
dd6b309
a223079
dd6b309
 
a223079
dd6b309
a223079
 
dd6b309
 
 
 
 
 
a223079
b73a811
dd6b309
 
 
b73a811
 
 
dd6b309
 
2dec497
 
dd6b309
b73a811
 
 
2dec497
dd6b309
 
 
 
b73a811
dd6b309
 
 
 
 
2dec497
 
dd6b309
 
 
 
 
 
 
 
 
2dec497
 
dd6b309
b73a811
 
dd6b309
 
 
 
 
f9e4fd2
 
 
 
 
 
 
dd6b309
2dec497
dd6b309
d7bc2ed
dd6b309
b73a811
dd6b309
 
 
 
b73a811
 
 
dd6b309
 
f9e4fd2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
import numpy as np
import pandas as pd
import faiss
import zipfile
import logging
from pathlib import Path
from sentence_transformers import SentenceTransformer, util
import streamlit as st
import time
import os
from urllib.parse import quote
import requests


# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler()
    ]
)
logger = logging.getLogger("MetadataManager")

class MetadataManager:
    def __init__(self):
        self.cache_dir = Path("unzipped_cache")
        self.shard_dir = self.cache_dir / "metadata_shards"
        self.shard_map = {}
        self.loaded_shards = {}
        self.total_docs = 0
        self.api_cache = {}
        
        logger.info("Initializing MetadataManager")
        self._ensure_directories()
        self._unzip_if_needed()
        self._build_shard_map()
        logger.info(f"Total documents indexed: {self.total_docs}")
        logger.info(f"Total shards found: {len(self.shard_map)}")

    def _ensure_directories(self):
        """Create necessary directories if they don't exist"""
        self.cache_dir.mkdir(parents=True, exist_ok=True)
        self.shard_dir.mkdir(parents=True, exist_ok=True)

    def _unzip_if_needed(self):
        """Handle ZIP extraction with nested directory handling"""
        zip_path = Path("metadata_shards.zip")
        
        # Check if we need to unzip by looking for parquet files in any subdirectory
        if not any(self.shard_dir.rglob("*.parquet")):
            logger.info("No parquet files found, checking for zip archive")
            
            if not zip_path.exists():
                raise FileNotFoundError(f"Metadata ZIP file not found at {zip_path}")
                
            logger.info(f"Extracting {zip_path} to {self.shard_dir}")
            try:
                with zipfile.ZipFile(zip_path, 'r') as zip_ref:
                    # Check for nested directory structure in zip
                    zip_root = self._get_zip_root(zip_ref)
                    
                    # Extract while preserving structure
                    zip_ref.extractall(self.shard_dir)
                    
                    # Handle nested directory if exists
                    if zip_root:
                        nested_dir = self.shard_dir / zip_root
                        if nested_dir.exists():
                            # Move files up from nested directory
                            self._flatten_directory(nested_dir, self.shard_dir)
                            nested_dir.rmdir()

                    # Verify extraction
                    parquet_files = list(self.shard_dir.rglob("*.parquet"))
                    if not parquet_files:
                        raise RuntimeError("Extraction completed but no parquet files found")
                        
                    logger.info(f"Found {len(parquet_files)} parquet files after extraction")
                    
            except Exception as e:
                logger.error(f"Failed to extract zip file: {str(e)}")
                self._clean_failed_extraction()
                raise

    def _get_zip_root(self, zip_ref):
        """Identify common root directory in zip file"""
        try:
            first_file = zip_ref.namelist()[0]
            if '/' in first_file:
                return first_file.split('/')[0]
            return ""
        except Exception as e:
            logger.warning(f"Error detecting zip root: {str(e)}")
            return ""

    def _flatten_directory(self, src_dir, dest_dir):
        """Move files from nested directory to destination"""
        for item in src_dir.iterdir():
            if item.is_dir():
                self._flatten_directory(item, dest_dir)
                item.rmdir()
            else:
                target = dest_dir / item.name
                if target.exists():
                    target.unlink()
                item.rename(target)

    def _clean_failed_extraction(self):
        """Remove any extracted files after failed attempt"""
        logger.info("Cleaning up failed extraction")
        for item in self.shard_dir.iterdir():
            if item.is_dir():
                shutil.rmtree(item)
            else:
                item.unlink()

    def _build_shard_map(self):
        """Create validated index range to shard mapping"""
        logger.info("Building shard map from parquet files")
        parquet_files = list(self.shard_dir.glob("*.parquet"))
        
        if not parquet_files:
            raise FileNotFoundError("No parquet files found after extraction")
            
        # Sort files by numerical order
        parquet_files = sorted(parquet_files, key=lambda x: int(x.stem.split("_")[1]))
        
        # Track expected next index
        expected_start = 0
        
        for f in parquet_files:
            try:
                parts = f.stem.split("_")
                if len(parts) != 3:
                    raise ValueError("Invalid filename format")
                    
                start = int(parts[1])
                end = int(parts[2])
                
                # Validate continuity
                if start != expected_start:
                    raise ValueError(f"Non-contiguous shard start: expected {expected_start}, got {start}")
                    
                # Validate range
                if end <= start:
                    raise ValueError(f"Invalid shard range: {start}-{end}")
                    
                self.shard_map[(start, end)] = f.name
                self.total_docs = end + 1
                expected_start = end + 1
                
                logger.debug(f"Mapped shard {f.name}: indices {start}-{end}")
                
            except Exception as e:
                logger.error(f"Error processing shard {f.name}: {str(e)}")
                raise RuntimeError("Invalid shard structure") from e
                
        logger.info(f"Validated {len(self.shard_map)} continuous shards")
        logger.info(f"Total document count: {self.total_docs}")
        
        # Log shard statistics
        logger.info(f"Shard map built with {len(self.shard_map)} shards")
        logger.info(f"Total document count: {self.total_docs}")
        
        # Validate shard boundaries for gaps or overlaps
        sorted_ranges = sorted(self.shard_map.keys())
        for i in range(1, len(sorted_ranges)):
            prev_end = sorted_ranges[i-1][1]
            curr_start = sorted_ranges[i][0]
            if curr_start != prev_end + 1:
                logger.warning(f"Gap or overlap detected between shards: {prev_end} to {curr_start}")

    def get_metadata(self, global_indices):
        """Retrieve metadata with validation"""
        # Check for empty numpy array properly
        if isinstance(global_indices, np.ndarray) and global_indices.size == 0:
            logger.warning("Empty indices array passed to get_metadata")
            return pd.DataFrame(columns=["title", "summary", "source", "similarity"])
        
        # Convert numpy array to list for processing
        indices_list = global_indices.tolist() if isinstance(global_indices, np.ndarray) else global_indices
        logger.info(f"Retrieving metadata for {len(indices_list)} indices")
        
        # Filter valid indices
        valid_indices = [idx for idx in indices_list if 0 <= idx < self.total_docs]
        invalid_count = len(indices_list) - len(valid_indices)
        if invalid_count > 0:
            logger.warning(f"Filtered out {invalid_count} invalid indices")
        
        if not valid_indices:
            logger.warning("No valid indices remain after filtering")
            return pd.DataFrame(columns=["title", "summary", "source", "similarity"])
    
        # Group indices by shard with boundary check
        shard_groups = {}
        unassigned_indices = []
        
        for idx in valid_indices:
            found = False
            for (start, end), shard in self.shard_map.items():
                if start <= idx <= end:
                    if shard not in shard_groups:
                        shard_groups[shard] = []
                    shard_groups[shard].append(idx - start)
                    found = True
                    break
            if not found:
                unassigned_indices.append(idx)
                logger.warning(f"Index {idx} not found in any shard range")
        
        if unassigned_indices:
            logger.warning(f"Could not assign {len(unassigned_indices)} indices to any shard")
    
        # Load and process shards
        results = []
        for shard, local_indices in shard_groups.items():
            try:
                logger.info(f"Processing shard {shard} with {len(local_indices)} indices")
                start_time = time.time()
                
                if shard not in self.loaded_shards:
                    logger.info(f"Loading shard file: {shard}")
                    shard_path = self.shard_dir / shard
                    
                    # Verify file exists
                    if not shard_path.exists():
                        logger.error(f"Shard file not found: {shard_path}")
                        continue
                        
                    # Log file size
                    file_size_mb = os.path.getsize(shard_path) / (1024 * 1024)
                    logger.info(f"Shard file size: {file_size_mb:.2f} MB")
                    
                    # Attempt to read the parquet file
                    try:
                        self.loaded_shards[shard] = pd.read_parquet(
                            shard_path,
                            columns=["title", "summary", "source"]
                        )
                        logger.info(f"Successfully loaded shard {shard} with {len(self.loaded_shards[shard])} rows")
                    except Exception as e:
                        logger.error(f"Failed to read parquet file {shard}: {str(e)}")
                        
                        # Try to read file schema for debugging
                        try:
                            schema = pd.read_parquet(shard_path, engine='pyarrow').dtypes
                            logger.info(f"Parquet schema: {schema}")
                        except:
                            pass
                        continue
                
                if local_indices:
                    # Validate indices are within dataframe bounds
                    df_len = len(self.loaded_shards[shard])
                    valid_local_indices = [idx for idx in local_indices if 0 <= idx < df_len]
                    
                    if len(valid_local_indices) != len(local_indices):
                        logger.warning(f"Filtered {len(local_indices) - len(valid_local_indices)} out-of-bounds indices")
                    
                    if valid_local_indices:
                        logger.debug(f"Retrieving rows at indices: {valid_local_indices}")
                        chunk = self.loaded_shards[shard].iloc[valid_local_indices]
                        results.append(chunk)
                        logger.info(f"Retrieved {len(chunk)} records from shard {shard}")
                
                logger.info(f"Shard processing completed in {time.time() - start_time:.2f} seconds")
                    
            except Exception as e:
                logger.error(f"Error processing shard {shard}: {str(e)}", exc_info=True)
                continue
    
        # Combine results
        if results:
            combined = pd.concat(results).reset_index(drop=True)
            logger.info(f"Combined metadata: {len(combined)} records from {len(results)} shards")
            return combined
        else:
            logger.warning("No metadata records retrieved")
            return pd.DataFrame(columns=["title", "summary", "source", "similarity"])


    def _resolve_paper_url(self, title):
        """Find paper URL using multiple strategies"""
        # Check cache first
        if title in self.api_cache:
            return self.api_cache[title]

        links = {}

        # Try arXiv first
        arxiv_url = self._get_arxiv_url(title)
        if arxiv_url:
            links["arxiv"] = arxiv_url

        # Attempt to get a direct link using Semantic Scholar's API
        semantic_url = self._get_semantic_scholar_url(title)
        if semantic_url:
            links["semantic_search"] = semantic_url


        # Fallback to Google Scholar search
        scholar_url = f"https://scholar.google.com/scholar?q={quote(title)}"
        links["google"] = scholar_url

        self.api_cache[title] = links

        return links


    def _get_arxiv_url(self, title):
        """Search arXiv API for paper"""
        try:
            response = requests.get(
                "http://export.arxiv.org/api/query",
                params={
                    "search_query": f'ti:"{title}"',
                    "max_results": 1,
                    "sortBy": "relevance"
                },
                timeout=5
            )
            response.raise_for_status()
            
            # Parse XML response
            from xml.etree import ElementTree as ET
            root = ET.fromstring(response.content)
            entry = root.find('{http://www.w3.org/2005/Atom}entry')
            if entry is not None:
                arxiv_id = entry.find('{http://www.w3.org/2005/Atom}id').text
                return arxiv_id.replace('http:', 'https:')  # Force HTTPS
        except Exception as e:
            logger.error(f"arXiv API failed for '{title}': {str(e)}")
        return None



    def _get_semantic_scholar_url(self, title):
        """Search Semantic Scholar API for a paper by title and return its URL."""
        try:
            response = requests.get(
                "https://api.semanticscholar.org/graph/v1/paper/search",
                params={
                    "query": title,
                    "limit": 1,
                    "fields": "paperId,url,title"
                },
                timeout=5
            )
            response.raise_for_status()  # This raises for 429 or other errors
            data = response.json()
            
            if "data" in data and len(data["data"]) > 0:
                paper = data["data"][0]
                if paper.get("url"):
                    return paper["url"]
                elif paper.get("paperId"):
                    return f"https://www.semanticscholar.org/paper/{paper['paperId']}"
        except requests.exceptions.HTTPError as http_err:
            if response.status_code == 429:
                # logger.error(f"Rate limit exceeded for Semantic Scholar API for '{title}'. Falling back.")
                # Optionally, add a sleep delay here for backoff
                time.sleep(1)  # simple backoff delay; consider exponential backoff
            else:
                # logger.error(f"Semantic Scholar API failed for '{title}': {http_err}")
        except Exception as e:
            # logger.error(f"Semantic Scholar API failed for '{title}': {e}")
        return None


    
class SemanticSearch:
    def __init__(self):
        self.shard_dir = Path("compressed_shards")
        self.model = None
        self.index_shards = []
        self.metadata_mgr = MetadataManager()
        self.shard_sizes = []
        
        # Configure search logger
        self.logger = logging.getLogger("SemanticSearch")
        self.logger.info("Initializing SemanticSearch")
        
    @st.cache_resource
    def load_model(_self):
        return SentenceTransformer('all-MiniLM-L6-v2')

    def initialize_system(self):
        self.logger.info("Loading sentence transformer model")
        start_time = time.time()
        self.model = self.load_model()
        self.logger.info(f"Model loaded in {time.time() - start_time:.2f} seconds")
        
        self.logger.info("Loading FAISS indices")
        self._load_faiss_shards()

    def _load_faiss_shards(self):
        """Load all FAISS index shards"""
        self.logger.info(f"Searching for index files in {self.shard_dir}")
        
        if not self.shard_dir.exists():
            self.logger.error(f"Shard directory not found: {self.shard_dir}")
            return
            
        index_files = list(self.shard_dir.glob("*.index"))
        self.logger.info(f"Found {len(index_files)} index files")
        
        self.shard_sizes = []
        self.index_shards = []
        
        for shard_path in sorted(index_files):
            try:
                self.logger.info(f"Loading index: {shard_path}")
                start_time = time.time()
                
                # Log file size
                file_size_mb = os.path.getsize(shard_path) / (1024 * 1024)
                self.logger.info(f"Index file size: {file_size_mb:.2f} MB")
                
                index = faiss.read_index(str(shard_path))
                self.index_shards.append(index)
                self.shard_sizes.append(index.ntotal)
                
                self.logger.info(f"Loaded index with {index.ntotal} vectors in {time.time() - start_time:.2f} seconds")
            except Exception as e:
                self.logger.error(f"Failed to load index {shard_path}: {str(e)}")
        
        total_vectors = sum(self.shard_sizes)
        self.logger.info(f"Total loaded vectors: {total_vectors} across {len(self.index_shards)} shards")

    def _global_index(self, shard_idx, local_idx):
        """Convert local index to global index"""
        return sum(self.shard_sizes[:shard_idx]) + local_idx

    def search(self, query, top_k=5):
        """Search with validation"""
        self.logger.info(f"Searching for query: '{query}' (top_k={top_k})")
        start_time = time.time()
        
        if not query:
            self.logger.warning("Empty query provided")
            return pd.DataFrame()
            
        if not self.index_shards:
            self.logger.error("No index shards loaded")
            return pd.DataFrame()
        
        try:
            self.logger.info("Encoding query")
            query_embedding = self.model.encode([query], convert_to_numpy=True)
            self.logger.debug(f"Query encoded to shape {query_embedding.shape}")
        except Exception as e:
            self.logger.error(f"Query encoding failed: {str(e)}")
            return pd.DataFrame()
    
        all_distances = []
        all_global_indices = []
    
        # Search with index validation
        self.logger.info(f"Searching across {len(self.index_shards)} shards")
        for shard_idx, index in enumerate(self.index_shards):
            if index.ntotal == 0:
                self.logger.warning(f"Skipping empty shard {shard_idx}")
                continue
                
            try:
                shard_start = time.time()
                distances, indices = index.search(query_embedding, top_k)
                
                valid_mask = (indices[0] >= 0) & (indices[0] < index.ntotal)
                valid_indices = indices[0][valid_mask].tolist()
                valid_distances = distances[0][valid_mask].tolist()
                
                if len(valid_indices) != top_k:
                    self.logger.debug(f"Shard {shard_idx}: Found {len(valid_indices)} valid results out of {top_k}")
                
                global_indices = [self._global_index(shard_idx, idx) for idx in valid_indices]
                
                all_distances.extend(valid_distances)
                all_global_indices.extend(global_indices)
                
                self.logger.debug(f"Shard {shard_idx} search completed in {time.time() - shard_start:.3f}s")
            except Exception as e:
                self.logger.error(f"Search failed in shard {shard_idx}: {str(e)}")
                continue
    
        self.logger.info(f"Search found {len(all_global_indices)} results across all shards")
        
        # Process results
        results = self._process_results(
            np.array(all_distances), 
            np.array(all_global_indices), 
            top_k
        )
        
        self.logger.info(f"Search completed in {time.time() - start_time:.2f} seconds with {len(results)} final results")
        return results

    def _process_results(self, distances, global_indices, top_k):
        """Process raw search results into formatted DataFrame"""
        process_start = time.time()
        
        # Proper numpy array emptiness checks
        if global_indices.size == 0 or distances.size == 0:
            self.logger.warning("No search results to process")
            return pd.DataFrame(columns=["title", "summary", "source", "similarity"])
        
        try:
            # Get metadata for matched indices
            self.logger.info(f"Retrieving metadata for {len(global_indices)} indices")
            metadata_start = time.time()
            results = self.metadata_mgr.get_metadata(global_indices)
            self.logger.info(f"Metadata retrieved in {time.time() - metadata_start:.2f}s, got {len(results)} records")
            
            # Empty results check
            if len(results) == 0:
                self.logger.warning("No metadata found for indices")
                return pd.DataFrame(columns=["title", "summary", "source", "similarity"])
                
            # Ensure distances match results length
            if len(results) != len(distances):
                self.logger.warning(f"Mismatch between distances ({len(distances)}) and results ({len(results)})")
                
                if len(results) < len(distances):
                    self.logger.info("Truncating distances array to match results length")
                    distances = distances[:len(results)]
                else:
                    # Should not happen but handle it anyway
                    self.logger.error("More results than distances - this shouldn't happen")
                    distances = np.pad(distances, (0, len(results) - len(distances)), 'constant', constant_values=1.0)
                
            # Calculate similarity scores
            self.logger.debug("Calculating similarity scores")
            results['similarity'] = 1 - (distances / 2)
            
            # Log similarity statistics
            if not results.empty:
                self.logger.debug(f"Similarity stats: min={results['similarity'].min():.3f}, " +
                                 f"max={results['similarity'].max():.3f}, " +
                                 f"mean={results['similarity'].mean():.3f}")


            results['source'] = results['title'].apply(
                lambda title: self._format_source_links(
                    self.metadata_mgr._resolve_paper_url(title)
                )
            )
            
            # Deduplicate and sort results
            pre_dedup = len(results)
            results = results.drop_duplicates(subset=["title", "source"]).sort_values("similarity", ascending=False).head(top_k)
            post_dedup = len(results)
            
            if pre_dedup > post_dedup:
                self.logger.info(f"Removed {pre_dedup - post_dedup} duplicate results")
            
            self.logger.info(f"Results processed in {time.time() - process_start:.2f}s, returning {len(results)} items")
            return results.reset_index(drop=True)
            
        except Exception as e:
            self.logger.error(f"Result processing failed: {str(e)}", exc_info=True)
            return pd.DataFrame(columns=["title", "summary", "source", "similarity"])


    def _format_source_links(self, links):
        """Generate an HTML snippet for the available source links."""
        html_parts = []
        if "arxiv" in links:
            html_parts.append(
                f"<a class='source-link' href='{links['arxiv']}' target='_blank' rel='noopener noreferrer'> πŸ“œ arXiv</a>"
            )
        if "semantic" in links:
            html_parts.append(
                f"<a class='source-link' href='{links['semantic']}' target='_blank' rel='noopener noreferrer'> 🌐 Semantic Scholar</a>"
            )
        if "google" in links:
            html_parts.append(
                f"<a class='source-link' href='{links['google']}' target='_blank' rel='noopener noreferrer'> πŸ” Google Scholar</a>"
            )
        return " | ".join(html_parts)