KevinStephenson
Adding in weaviate code
b110593
raw
history blame
10 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package compressionhelpers
import (
"context"
"encoding/binary"
"fmt"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/weaviate/weaviate/adapters/repos/db/helpers"
"github.com/weaviate/weaviate/adapters/repos/db/lsmkv"
"github.com/weaviate/weaviate/adapters/repos/db/vector/cache"
"github.com/weaviate/weaviate/adapters/repos/db/vector/hnsw/distancer"
"github.com/weaviate/weaviate/entities/storobj"
"github.com/weaviate/weaviate/entities/vectorindex/hnsw"
)
type CompressorDistancer interface {
DistanceToNode(id uint64) (float32, bool, error)
DistanceToFloat(vec []float32) (float32, bool, error)
}
type ReturnDistancerFn func()
type VectorCompressor interface {
Drop() error
GrowCache(size uint64)
SetCacheMaxSize(size int64)
GetCacheMaxSize() int64
Delete(ctx context.Context, id uint64)
Preload(id uint64, vector []float32)
Prefetch(id uint64)
PrefillCache()
DistanceBetweenCompressedVectorsFromIDs(ctx context.Context, x, y uint64) (float32, error)
DistanceBetweenCompressedAndUncompressedVectorsFromID(ctx context.Context, x uint64, y []float32) (float32, error)
NewDistancer(vector []float32) (CompressorDistancer, ReturnDistancerFn)
NewDistancerFromID(id uint64) CompressorDistancer
NewBag() CompressionDistanceBag
ExposeFields() PQData
}
type quantizedVectorsCompressor[T byte | uint64] struct {
cache cache.Cache[T]
compressedStore *lsmkv.Store
quantizer quantizer[T]
}
func (compressor *quantizedVectorsCompressor[T]) Drop() error {
compressor.cache.Drop()
return nil
}
func (compressor *quantizedVectorsCompressor[T]) GrowCache(size uint64) {
compressor.cache.Grow(size)
}
func (compressor *quantizedVectorsCompressor[T]) SetCacheMaxSize(size int64) {
compressor.cache.UpdateMaxSize(size)
}
func (compressor *quantizedVectorsCompressor[T]) GetCacheMaxSize() int64 {
return compressor.cache.CopyMaxSize()
}
func (compressor *quantizedVectorsCompressor[T]) Delete(ctx context.Context, id uint64) {
compressor.cache.Delete(ctx, id)
idBytes := make([]byte, 8)
binary.BigEndian.PutUint64(idBytes, id)
compressor.compressedStore.Bucket(helpers.VectorsCompressedBucketLSM).Delete(idBytes)
}
func (compressor *quantizedVectorsCompressor[T]) Preload(id uint64, vector []float32) {
compressedVector := compressor.quantizer.Encode(vector)
idBytes := make([]byte, 8)
binary.BigEndian.PutUint64(idBytes, id)
compressor.compressedStore.Bucket(helpers.VectorsCompressedBucketLSM).Put(idBytes, compressor.quantizer.CompressedBytes(compressedVector))
compressor.cache.Grow(id)
compressor.cache.Preload(id, compressedVector)
}
func (compressor *quantizedVectorsCompressor[T]) Prefetch(id uint64) {
compressor.cache.Prefetch(id)
}
func (compressor *quantizedVectorsCompressor[T]) DistanceBetweenCompressedVectors(x, y []T) (float32, error) {
return compressor.quantizer.DistanceBetweenCompressedVectors(x, y)
}
func (compressor *quantizedVectorsCompressor[T]) DistanceBetweenCompressedAndUncompressedVectors(x []T, y []float32) (float32, error) {
return compressor.quantizer.DistanceBetweenCompressedAndUncompressedVectors(y, x)
}
func (compressor *quantizedVectorsCompressor[T]) compressedVectorFromID(ctx context.Context, id uint64) ([]T, error) {
compressedVector, err := compressor.cache.Get(ctx, id)
if err != nil {
return nil, err
}
if len(compressedVector) == 0 {
return nil, fmt.Errorf("got a nil or zero-length vector at docID %d", id)
}
return compressedVector, nil
}
func (compressor *quantizedVectorsCompressor[T]) DistanceBetweenCompressedVectorsFromIDs(ctx context.Context, id1, id2 uint64) (float32, error) {
compressedVector1, err := compressor.compressedVectorFromID(ctx, id1)
if err != nil {
return 0, err
}
compressedVector2, err := compressor.compressedVectorFromID(ctx, id2)
if err != nil {
return 0, err
}
dist, err := compressor.DistanceBetweenCompressedVectors(compressedVector1, compressedVector2)
return dist, err
}
func (compressor *quantizedVectorsCompressor[T]) DistanceBetweenCompressedAndUncompressedVectorsFromID(ctx context.Context, id uint64, vector []float32) (float32, error) {
compressedVector, err := compressor.compressedVectorFromID(ctx, id)
if err != nil {
return 0, err
}
dist, err := compressor.DistanceBetweenCompressedAndUncompressedVectors(compressedVector, vector)
return dist, err
}
func (compressor *quantizedVectorsCompressor[T]) getCompressedVectorForID(ctx context.Context, id uint64) ([]T, error) {
idBytes := make([]byte, 8)
binary.BigEndian.PutUint64(idBytes, id)
compressedVector, err := compressor.compressedStore.Bucket(helpers.VectorsCompressedBucketLSM).Get(idBytes)
if err != nil {
return nil, errors.Wrap(err, "Getting vector for id")
}
if len(compressedVector) == 0 {
return nil, storobj.NewErrNotFoundf(id, "getCompressedVectorForID")
}
return compressor.quantizer.FromCompressedBytes(compressedVector), nil
}
func (compressor *quantizedVectorsCompressor[T]) NewDistancer(vector []float32) (CompressorDistancer, ReturnDistancerFn) {
d := &quantizedCompressorDistancer[T]{
compressor: compressor,
distancer: compressor.quantizer.NewQuantizerDistancer(vector),
}
return d, func() {
compressor.returnDistancer(d)
}
}
func (compressor *quantizedVectorsCompressor[T]) NewDistancerFromID(id uint64) CompressorDistancer {
compressedVector, _ := compressor.compressedVectorFromID(context.Background(), id)
d := &quantizedCompressorDistancer[T]{
compressor: compressor,
distancer: compressor.quantizer.NewCompressedQuantizerDistancer(compressedVector),
}
return d
}
func (compressor *quantizedVectorsCompressor[T]) returnDistancer(distancer CompressorDistancer) {
dst := distancer.(*quantizedCompressorDistancer[T]).distancer
if dst == nil {
return
}
compressor.quantizer.ReturnQuantizerDistancer(dst)
}
func (compressor *quantizedVectorsCompressor[T]) NewBag() CompressionDistanceBag {
return &quantizedDistanceBag[T]{
compressor: compressor,
elements: make(map[uint64][]T),
}
}
func (compressor *quantizedVectorsCompressor[T]) initCompressedStore() error {
err := compressor.compressedStore.CreateOrLoadBucket(context.Background(), helpers.VectorsCompressedBucketLSM)
if err != nil {
return errors.Wrapf(err, "Create or load bucket (compressed vectors store)")
}
return nil
}
func (compressor *quantizedVectorsCompressor[T]) PrefillCache() {
cursor := compressor.compressedStore.Bucket(helpers.VectorsCompressedBucketLSM).Cursor()
for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
id := binary.BigEndian.Uint64(k)
compressor.cache.Grow(id)
vc := make([]byte, len(v))
copy(vc, v)
compressor.cache.Preload(id, compressor.quantizer.FromCompressedBytes(vc))
}
cursor.Close()
}
func (compressor *quantizedVectorsCompressor[T]) ExposeFields() PQData {
return compressor.quantizer.ExposeFields()
}
func NewPQCompressor(
cfg hnsw.PQConfig,
distance distancer.Provider,
dimensions int,
vectorCacheMaxObjects int,
logger logrus.FieldLogger,
data [][]float32,
store *lsmkv.Store,
) (VectorCompressor, error) {
quantizer, err := NewProductQuantizer(cfg, distance, dimensions)
if err != nil {
return nil, err
}
pqVectorsCompressor := &quantizedVectorsCompressor[byte]{
quantizer: quantizer,
compressedStore: store,
}
pqVectorsCompressor.initCompressedStore()
pqVectorsCompressor.cache = cache.NewShardedByteLockCache(pqVectorsCompressor.getCompressedVectorForID, vectorCacheMaxObjects, logger, 0)
pqVectorsCompressor.cache.Grow(uint64(len(data)))
quantizer.Fit(data)
return pqVectorsCompressor, nil
}
func RestorePQCompressor(
cfg hnsw.PQConfig,
distance distancer.Provider,
dimensions int,
vectorCacheMaxObjects int,
logger logrus.FieldLogger,
encoders []PQEncoder,
store *lsmkv.Store,
) (VectorCompressor, error) {
quantizer, err := NewProductQuantizerWithEncoders(cfg, distance, dimensions, encoders)
if err != nil {
return nil, err
}
pqVectorsCompressor := &quantizedVectorsCompressor[byte]{
quantizer: quantizer,
compressedStore: store,
}
pqVectorsCompressor.initCompressedStore()
pqVectorsCompressor.cache = cache.NewShardedByteLockCache(pqVectorsCompressor.getCompressedVectorForID, vectorCacheMaxObjects, logger, 0)
return pqVectorsCompressor, nil
}
func NewBQCompressor(
distance distancer.Provider,
vectorCacheMaxObjects int,
logger logrus.FieldLogger,
store *lsmkv.Store,
) (VectorCompressor, error) {
quantizer := NewBinaryQuantizer(distance)
bqVectorsCompressor := &quantizedVectorsCompressor[uint64]{
quantizer: &quantizer,
compressedStore: store,
}
bqVectorsCompressor.initCompressedStore()
bqVectorsCompressor.cache = cache.NewShardedUInt64LockCache(bqVectorsCompressor.getCompressedVectorForID, vectorCacheMaxObjects, logger, 0)
return bqVectorsCompressor, nil
}
type quantizedCompressorDistancer[T byte | uint64] struct {
compressor *quantizedVectorsCompressor[T]
distancer quantizerDistancer[T]
}
func (distancer *quantizedCompressorDistancer[T]) DistanceToNode(id uint64) (float32, bool, error) {
compressedVector, err := distancer.compressor.cache.Get(context.Background(), id)
if err != nil {
return 0, false, err
}
return distancer.distancer.Distance(compressedVector)
}
func (distancer *quantizedCompressorDistancer[T]) DistanceToFloat(vector []float32) (float32, bool, error) {
return distancer.distancer.DistanceToFloat(vector)
}