Spaces:
Runtime error
Runtime error
File size: 10,361 Bytes
9c8703c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 |
from pinecone import Pinecone # 3P
from dotenv import load_dotenv, find_dotenv # 3P
import os
import numpy as np
from typing import Dict
from collections import defaultdict
from backend.interfaces import Retriever, Query, MergedResult
from backend.embeddings import DefaultDenseEmbeddingModel, DefaultSparseEmbeddingModel
from backend.survey import Preferences # or: from backend.src_ubc.survey import Preferences
from backend.utils import * # (better: import only what you need)
load_dotenv(find_dotenv())
class DefaultRetriever(Retriever):
def __init__(self, config: dict = None):
super().__init__(config)
pc = Pinecone(api_key=os.getenv("PINECONE_API_KEY_UBC", ""))
# Initialize Pinecone client or any other necessary components here
self.dense_db = pc.Index("museum-ai-dense")
self.sparse_db = pc.Index("museum-ai-sparse")
self.dense_embedding_model = DefaultDenseEmbeddingModel({})
self.sparse_embedding_model = DefaultSparseEmbeddingModel({})
def process(self, query: str, k=20, dense_weight=.2, sparse_weight=.8) -> list[dict]:
results = self._retrieve_with_text(query, dense_weight, sparse_weight, k)
return results
# DO NOT NEED
def _retrieve_parent_documents(self, results: list[dict], query: Query, k: int = 5):
section_id_freqency = {}
for result in results:
section_id = result["metadata"].get("section_id")
if section_id:
section_id_freqency[section_id] = (
section_id_freqency.get(section_id, 0) + 1
)
# Sort section IDs by frequency
sorted_section_ids = sorted(
section_id_freqency.items(), key=lambda x: x[1], reverse=True
)
# Retrieve top k section IDs
top_section_ids = [section_id for section_id, _ in sorted_section_ids[:k]]
section_results = []
for section_id in top_section_ids:
section_results.extend(self._retrieve_by_section_id(section_id))
def _search_tags(self, tags: Preferences, top_k=100):
"""
Perform a metadata search based on the provided Preferences object.
Args:
tags: A Preferences object with attributes:
- time_period: List[str]
- themes: List[str]
- exhibits: List[str]
- art_medium: List[str]
- additional_interests: List[str]
top_k: Number of results to return.
Returns:
List of matching documents from dense_db.
"""
# Convert the Preferences object to a dictionary.
tag_dict = tags.__dict__
# Remove any keys with empty lists.
tag_dict = {k: v for k, v in tag_dict.items() if v}
# Dummy vector for metadata filtering (using the dimensions expected by your dense index)
dummy_vector = np.zeros(512).tolist()
# Build metadata filter conditions. Each condition looks for documents where a given field contains at least one of the values.
filter_conditions = []
for key, values in tag_dict.items():
filter_conditions.append({key: {"$in": values}})
# Use $or operator so that if any condition matches the document is returned
metadata_filter = {"$or": filter_conditions} if filter_conditions else {}
# Perform the semantic search using the dummy vector and only filter by metadata.
response = self.dense_db.query(
namespace="umag",
vector=dummy_vector,
top_k=top_k,
include_metadata=True,
filter=metadata_filter
)
for i in range(len(response.matches)):
# normalize the score based on the number of preferences
response.matches[i]['score'] = get_number_tag_matches(tags, response.matches[i]) / tags.count_preferences()
# sort matches by tag score
response.matches.sort(key=lambda match: match['score'], reverse=True)
return response.matches
def _rrf_merge(self, dense_results, sparse_results, tag_results, dense_weight=.3, sparse_weight=.2, tag_weight=.5, k=60):
def rank_dict(results):
return {doc['id']: rank for rank, doc in enumerate(results)}
# Create rank mappings
dense_ranks = rank_dict(dense_results)
sparse_ranks = rank_dict(sparse_results)
tag_ranks = rank_dict(tag_results)
# Create lookup for original docs and scores
id_to_doc = {}
dense_scores = {doc['id']: doc.get('score', 0) for doc in dense_results}
sparse_scores = {doc['id']: doc.get('score', 0) for doc in sparse_results}
tag_scores = {doc['id']: doc.get('score', 0) for doc in tag_results}
for result_set in [sparse_results, dense_results, tag_results]:
for doc in result_set:
if doc['id'] not in id_to_doc:
id_to_doc[doc['id']] = doc
merged = {}
# Merge all IDs
all_ids = set(sparse_ranks) | set(dense_ranks) | set(tag_ranks)
for id_ in all_ids:
sparse_rank = sparse_ranks.get(id_)
dense_rank = dense_ranks.get(id_)
tag_rank = tag_ranks.get(id_)
sparse_score_rrf = 1 / (k + sparse_rank) if sparse_rank is not None else 0
dense_score_rrf = 1 / (k + dense_rank) if dense_rank is not None else 0
tag_score_rrf = 1 / (k + tag_rank) if tag_rank is not None else 0
final_score = sparse_score_rrf*sparse_weight + dense_score_rrf*dense_weight + tag_score_rrf*tag_weight
base_doc = id_to_doc[id_]
merged[id_] = {
'id': id_,
'metadata': base_doc.get('metadata', {}),
'original_sparse_score': sparse_scores.get(id_, 0),
'original_dense_score': dense_scores.get(id_, 0),
'original_tag_score': tag_scores.get(id_, 0),
'sparse_rrf_score': sparse_score_rrf,
'dense_rrf_score': dense_score_rrf,
'tag_rrf_score': tag_score_rrf,
'final_score': final_score,
'sparse_rank': sparse_rank,
'dense_rank': dense_rank,
'tag_rank': tag_rank,
}
return sorted(merged.values(), key=lambda x: x['final_score'], reverse=True)
def _retrieve_with_text(self, query: str, tags: Preferences, dense_weight=.25, sparse_weight=.25, tag_weight=.5, k=20) -> list[dict]:
db_size = self.dense_db.describe_index_stats(namespace="umag")['namespaces']['umag']['vector_count']
tag_results = self._search_tags(tags, top_k=db_size)
# if there are no additional interests, only use tag results
if len(tags.additional_interests) != 0:
dense_results = self.dense_db.query(
namespace="umag",
top_k=50,
include_values=False,
include_metadata=True,
vector=self.dense_embedding_model.encode_text([query])[0],
).matches
sparse_results = self.sparse_db.query(
namespace="umag",
top_k=50,
include_values=True,
include_metadata=True,
sparse_vector=self.sparse_embedding_model.encode_text([query])[0],
).matches
merged_results = self._rrf_merge(sparse_results, dense_results, tag_results, dense_weight, sparse_weight, tag_weight)
else:
dense_results = []
sparse_results = []
merged_results = self._rrf_merge(sparse_results, dense_results, tag_results, dense_weight=0, sparse_weight=0, tag_weight=1)
# Extract only text from the top k results
relevant_content = []
for results in merged_results[:k]:
try:
relevant_content.append(results['metadata']['content'])
except KeyError:
# Skip this result if 'metadata' or 'content' key is missing
continue
return relevant_content
# For debugging purposes, you can uncomment the following lines to see the merged results
# # extract chunk id and text from top k results
# final_results = []
# for results in merged_results[:k]:
# try:
# result_dict = {
# 'id': results['id'],
# 'text': results['metadata']['content'],
# 'final_score': results['final_score'],
# 'dense_score': results['original_dense_score'],
# 'sparse_score': results['original_sparse_score'],
# 'tag_score': results['original_tag_score'],
# 'rrf_dense_score': results['dense_rrf_score'],
# 'rrf_sparse_score': results['sparse_rrf_score'],
# 'rrf_tag_score': results['tag_rrf_score'],
# 'dense_rank': results['dense_rank'],
# 'sparse_rank': results['sparse_rank'],
# 'tag_rank': results['tag_rank'],
# }
# final_results.append(result_dict)
# except KeyError:
# # Skip this result if 'metadata' or 'content' key is missing
# continue
# return final_results
def _retrieve_with_image(self, query: Query, k=20) -> list[dict]:
dense_results = self.dense_db.query(
namespace="umag",
top_k=k,
include_values=False,
include_metadata=True,
vector=self.dense_embedding_model.encode_image([query["content"]])[
0
].tolist(),
)
return [
MergedResult(
id=match["id"],
content=match["metadata"].get("img_path", ""),
metadata=match["metadata"],
dense_score=match["score"],
sparse_score=None,
final_score=match["score"],
sources=["dense"],
)
for match in dense_results.get("matches", [])
]
|