File size: 7,769 Bytes
f5776d3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
import abc
from typing import List, Optional
import numpy as np
import openai
class BaseSentenceVectorizer(abc.ABC):
'''
Base Class for Vectorizers. The main purpose is to vectorize text (doc/query)
for ANN/KNN indexes. `__call__` method takes `List[Example]` as a single input, then extracts
`field_to_vectorize` from every Example and convert them into embeddings.
You can customize extraction logic in the `_extract_text_from_examples` method.
'''
# embeddings will be computed based on the string in this attribute of Example object
field_to_vectorize = 'text_to_vectorize'
def __init__(self) -> None:
pass
@abc.abstractmethod
def __call__(self, inp_examples: List["Example"]) -> np.ndarray:
pass
def _extract_text_from_examples(self, inp_examples: List) -> List[str]:
if isinstance(inp_examples[0], str):
return inp_examples
return [" ".join([example[key] for key in example._input_keys]) for example in inp_examples]
class SentenceTransformersVectorizer(BaseSentenceVectorizer):
'''
Vectorizer bsaed on `SentenceTransformers` models. You can pick any model from this link:
https://huggingface.co/sentence-transformers
More details about models:
https://www.sbert.net/docs/pretrained_models.html
'''
def __init__(
self,
model_name_or_path: str = 'all-MiniLM-L6-v2',
vectorize_bs: int = 256,
max_gpu_devices: int = 1,
normalize_embeddings: bool = False
):
# this isn't a good practice, but with top-level import the whole DSP
# module import will be slow (>5 sec), because SentenceTransformer is doing
# it's directory/file-related magic under the hood :(
try:
from sentence_transformers import SentenceTransformer
except ImportError as e:
raise ImportError(
"You need to install sentence_transformers library to use pretrained embedders. "
"Please check the official doc https://www.sbert.net/ "
"or simply run `pip install sentence-transformers"
)
from dsp.utils.ann_utils import determine_devices
self.num_devices, self.is_gpu = determine_devices(max_gpu_devices)
self.proxy_device = 'cuda' if self.is_gpu else 'cpu'
self.model = SentenceTransformer(model_name_or_path, device=self.proxy_device)
self.model_name_or_path = model_name_or_path
self.vectorize_bs = vectorize_bs
self.normalize_embeddings = normalize_embeddings
def __call__(self, inp_examples: List) -> np.ndarray:
text_to_vectorize = self._extract_text_from_examples(inp_examples)
if self.is_gpu and self.num_devices > 1:
target_devices = list(range(self.num_devices))
pool = self.model.start_multi_process_pool(target_devices=target_devices)
# Compute the embeddings using the multi-process pool
emb = self.model.encode_multi_process(
sentences=text_to_vectorize,
pool=pool,
batch_size=self.vectorize_bs
)
self.model.stop_multi_process_pool(pool)
# for some reason, multi-GPU setup doesn't accept normalize_embeddings parameter
if self.normalize_embeddings:
emb = emb / np.linalg.norm(emb)
return emb
else:
emb = self.model.encode(
sentences=text_to_vectorize,
batch_size=self.vectorize_bs,
normalize_embeddings=self.normalize_embeddings
)
return emb
class NaiveGetFieldVectorizer(BaseSentenceVectorizer):
'''
If embeddings were precomputed, then we could just extract them from the proper field
(set by `field_with_embedding`) from each `Example`.
'''
def __init__(self, field_with_embedding: str = 'vectorized'):
self.field_with_embedding = field_with_embedding
def __call__(self, inp_examples: List["Example"]) -> np.ndarray:
embeddings = [
getattr(cur_example, self.field_with_embedding).reshape(1, -1)
for cur_example in inp_examples
]
embeddings = np.concatenate(embeddings, axis=0).astype(np.float32)
return embeddings
class CohereVectorizer(BaseSentenceVectorizer):
'''
This vectorizer uses the Cohere API to convert texts to embeddings.
More about the available models: https://docs.cohere.com/reference/embed
`api_key` should be passed as an argument and can be retrieved
from https://dashboard.cohere.com/api-keys
'''
def __init__(
self,
api_key: str,
model: str = 'embed-english-v3.0',
embed_batch_size: int = 96,
embedding_type: str = 'search_document' # for details check Cohere embed docs
):
self.model = model
self.embed_batch_size = embed_batch_size
self.embedding_type = embedding_type
import cohere
self.client = cohere.Client(api_key)
def __call__(self, inp_examples: List["Example"]) -> np.ndarray:
text_to_vectorize = self._extract_text_from_examples(inp_examples)
embeddings_list = []
n_batches = (len(text_to_vectorize) - 1) // self.embed_batch_size + 1
for cur_batch_idx in range(n_batches):
start_idx = cur_batch_idx * self.embed_batch_size
end_idx = (cur_batch_idx + 1) * self.embed_batch_size
cur_batch = text_to_vectorize[start_idx: end_idx]
response = self.client.embed(
texts=cur_batch,
model=self.model,
input_type=self.embedding_type
)
embeddings_list.extend(response.embeddings)
embeddings = np.array(embeddings_list, dtype=np.float32)
return embeddings
try:
OPENAI_LEGACY = int(openai.version.__version__[0]) == 0
except Exception:
OPENAI_LEGACY = True
class OpenAIVectorizer(BaseSentenceVectorizer):
'''
This vectorizer uses OpenAI API to convert texts to embeddings. Changing `model` is not
recommended. More about the model: https://openai.com/blog/new-and-improved-embedding-model/
`api_key` should be passed as an argument or as env variable (`OPENAI_API_KEY`).
'''
def __init__(
self,
model: str = 'text-embedding-ada-002',
embed_batch_size: int = 1024,
api_key: Optional[str] = None
):
self.model = model
self.embed_batch_size = embed_batch_size
if OPENAI_LEGACY:
self.Embedding = openai.Embedding
else:
self.Embedding = openai.embeddings
if api_key:
openai.api_key = api_key
def __call__(self, inp_examples: List["Example"]) -> np.ndarray:
text_to_vectorize = self._extract_text_from_examples(inp_examples)
# maybe it's better to preallocate numpy matrix, but we don't know emb_dim
embeddings_list = []
n_batches = (len(text_to_vectorize) - 1) // self.embed_batch_size + 1
for cur_batch_idx in range(n_batches): # tqdm.tqdm?
start_idx = cur_batch_idx * self.embed_batch_size
end_idx = (cur_batch_idx + 1) * self.embed_batch_size
cur_batch = text_to_vectorize[start_idx: end_idx]
# OpenAI API call:
response = self.Embedding.create(
model=self.model,
input=cur_batch
)
cur_batch_embeddings = [cur_obj['embedding'] for cur_obj in response['data']]
embeddings_list.extend(cur_batch_embeddings)
embeddings = np.array(embeddings_list, dtype=np.float32)
return embeddings
|