|
import abc |
|
from typing import List, Optional |
|
|
|
import numpy as np |
|
import openai |
|
|
|
|
|
class BaseSentenceVectorizer(abc.ABC): |
|
''' |
|
Base Class for Vectorizers. The main purpose is to vectorize text (doc/query) |
|
for ANN/KNN indexes. `__call__` method takes `List[Example]` as a single input, then extracts |
|
`field_to_vectorize` from every Example and convert them into embeddings. |
|
You can customize extraction logic in the `_extract_text_from_examples` method. |
|
''' |
|
|
|
field_to_vectorize = 'text_to_vectorize' |
|
|
|
def __init__(self) -> None: |
|
pass |
|
|
|
@abc.abstractmethod |
|
def __call__(self, inp_examples: List["Example"]) -> np.ndarray: |
|
pass |
|
|
|
def _extract_text_from_examples(self, inp_examples: List) -> List[str]: |
|
if isinstance(inp_examples[0], str): |
|
return inp_examples |
|
return [" ".join([example[key] for key in example._input_keys]) for example in inp_examples] |
|
|
|
|
|
class SentenceTransformersVectorizer(BaseSentenceVectorizer): |
|
''' |
|
Vectorizer bsaed on `SentenceTransformers` models. You can pick any model from this link: |
|
https://huggingface.co/sentence-transformers |
|
More details about models: |
|
https://www.sbert.net/docs/pretrained_models.html |
|
''' |
|
def __init__( |
|
self, |
|
model_name_or_path: str = 'all-MiniLM-L6-v2', |
|
vectorize_bs: int = 256, |
|
max_gpu_devices: int = 1, |
|
normalize_embeddings: bool = False |
|
): |
|
|
|
|
|
|
|
|
|
try: |
|
from sentence_transformers import SentenceTransformer |
|
except ImportError as e: |
|
raise ImportError( |
|
"You need to install sentence_transformers library to use pretrained embedders. " |
|
"Please check the official doc https://www.sbert.net/ " |
|
"or simply run `pip install sentence-transformers" |
|
) |
|
from dsp.utils.ann_utils import determine_devices |
|
|
|
self.num_devices, self.is_gpu = determine_devices(max_gpu_devices) |
|
self.proxy_device = 'cuda' if self.is_gpu else 'cpu' |
|
|
|
self.model = SentenceTransformer(model_name_or_path, device=self.proxy_device) |
|
|
|
self.model_name_or_path = model_name_or_path |
|
self.vectorize_bs = vectorize_bs |
|
self.normalize_embeddings = normalize_embeddings |
|
|
|
def __call__(self, inp_examples: List) -> np.ndarray: |
|
text_to_vectorize = self._extract_text_from_examples(inp_examples) |
|
|
|
if self.is_gpu and self.num_devices > 1: |
|
target_devices = list(range(self.num_devices)) |
|
pool = self.model.start_multi_process_pool(target_devices=target_devices) |
|
|
|
emb = self.model.encode_multi_process( |
|
sentences=text_to_vectorize, |
|
pool=pool, |
|
batch_size=self.vectorize_bs |
|
) |
|
self.model.stop_multi_process_pool(pool) |
|
|
|
if self.normalize_embeddings: |
|
emb = emb / np.linalg.norm(emb) |
|
|
|
return emb |
|
else: |
|
emb = self.model.encode( |
|
sentences=text_to_vectorize, |
|
batch_size=self.vectorize_bs, |
|
normalize_embeddings=self.normalize_embeddings |
|
) |
|
return emb |
|
|
|
|
|
class NaiveGetFieldVectorizer(BaseSentenceVectorizer): |
|
''' |
|
If embeddings were precomputed, then we could just extract them from the proper field |
|
(set by `field_with_embedding`) from each `Example`. |
|
''' |
|
def __init__(self, field_with_embedding: str = 'vectorized'): |
|
self.field_with_embedding = field_with_embedding |
|
|
|
def __call__(self, inp_examples: List["Example"]) -> np.ndarray: |
|
embeddings = [ |
|
getattr(cur_example, self.field_with_embedding).reshape(1, -1) |
|
for cur_example in inp_examples |
|
] |
|
embeddings = np.concatenate(embeddings, axis=0).astype(np.float32) |
|
return embeddings |
|
|
|
|
|
class CohereVectorizer(BaseSentenceVectorizer): |
|
''' |
|
This vectorizer uses the Cohere API to convert texts to embeddings. |
|
More about the available models: https://docs.cohere.com/reference/embed |
|
`api_key` should be passed as an argument and can be retrieved |
|
from https://dashboard.cohere.com/api-keys |
|
''' |
|
def __init__( |
|
self, |
|
api_key: str, |
|
model: str = 'embed-english-v3.0', |
|
embed_batch_size: int = 96, |
|
embedding_type: str = 'search_document' |
|
): |
|
self.model = model |
|
self.embed_batch_size = embed_batch_size |
|
self.embedding_type = embedding_type |
|
|
|
import cohere |
|
self.client = cohere.Client(api_key) |
|
|
|
def __call__(self, inp_examples: List["Example"]) -> np.ndarray: |
|
text_to_vectorize = self._extract_text_from_examples(inp_examples) |
|
|
|
embeddings_list = [] |
|
|
|
n_batches = (len(text_to_vectorize) - 1) // self.embed_batch_size + 1 |
|
for cur_batch_idx in range(n_batches): |
|
start_idx = cur_batch_idx * self.embed_batch_size |
|
end_idx = (cur_batch_idx + 1) * self.embed_batch_size |
|
cur_batch = text_to_vectorize[start_idx: end_idx] |
|
|
|
response = self.client.embed( |
|
texts=cur_batch, |
|
model=self.model, |
|
input_type=self.embedding_type |
|
) |
|
|
|
embeddings_list.extend(response.embeddings) |
|
|
|
embeddings = np.array(embeddings_list, dtype=np.float32) |
|
return embeddings |
|
|
|
|
|
try: |
|
OPENAI_LEGACY = int(openai.version.__version__[0]) == 0 |
|
except Exception: |
|
OPENAI_LEGACY = True |
|
|
|
|
|
class OpenAIVectorizer(BaseSentenceVectorizer): |
|
''' |
|
This vectorizer uses OpenAI API to convert texts to embeddings. Changing `model` is not |
|
recommended. More about the model: https://openai.com/blog/new-and-improved-embedding-model/ |
|
`api_key` should be passed as an argument or as env variable (`OPENAI_API_KEY`). |
|
''' |
|
def __init__( |
|
self, |
|
model: str = 'text-embedding-ada-002', |
|
embed_batch_size: int = 1024, |
|
api_key: Optional[str] = None |
|
): |
|
self.model = model |
|
self.embed_batch_size = embed_batch_size |
|
|
|
if OPENAI_LEGACY: |
|
self.Embedding = openai.Embedding |
|
else: |
|
self.Embedding = openai.embeddings |
|
|
|
if api_key: |
|
openai.api_key = api_key |
|
|
|
def __call__(self, inp_examples: List["Example"]) -> np.ndarray: |
|
text_to_vectorize = self._extract_text_from_examples(inp_examples) |
|
|
|
embeddings_list = [] |
|
|
|
n_batches = (len(text_to_vectorize) - 1) // self.embed_batch_size + 1 |
|
for cur_batch_idx in range(n_batches): |
|
start_idx = cur_batch_idx * self.embed_batch_size |
|
end_idx = (cur_batch_idx + 1) * self.embed_batch_size |
|
cur_batch = text_to_vectorize[start_idx: end_idx] |
|
|
|
response = self.Embedding.create( |
|
model=self.model, |
|
input=cur_batch |
|
) |
|
|
|
cur_batch_embeddings = [cur_obj['embedding'] for cur_obj in response['data']] |
|
embeddings_list.extend(cur_batch_embeddings) |
|
|
|
embeddings = np.array(embeddings_list, dtype=np.float32) |
|
return embeddings |
|
|