Spaces:
Running
Running
// _ _ | |
// __ _____ __ ___ ___ __ _| |_ ___ | |
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
// \ V V / __/ (_| |\ V /| | (_| | || __/ | |
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
// | |
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. | |
// | |
// CONTACT: [email protected] | |
// | |
package compressionhelpers | |
import ( | |
"context" | |
"encoding/binary" | |
"fmt" | |
"github.com/pkg/errors" | |
"github.com/sirupsen/logrus" | |
"github.com/weaviate/weaviate/adapters/repos/db/helpers" | |
"github.com/weaviate/weaviate/adapters/repos/db/lsmkv" | |
"github.com/weaviate/weaviate/adapters/repos/db/vector/cache" | |
"github.com/weaviate/weaviate/adapters/repos/db/vector/hnsw/distancer" | |
"github.com/weaviate/weaviate/entities/storobj" | |
"github.com/weaviate/weaviate/entities/vectorindex/hnsw" | |
) | |
type CompressorDistancer interface { | |
DistanceToNode(id uint64) (float32, bool, error) | |
DistanceToFloat(vec []float32) (float32, bool, error) | |
} | |
type ReturnDistancerFn func() | |
type VectorCompressor interface { | |
Drop() error | |
GrowCache(size uint64) | |
SetCacheMaxSize(size int64) | |
GetCacheMaxSize() int64 | |
Delete(ctx context.Context, id uint64) | |
Preload(id uint64, vector []float32) | |
Prefetch(id uint64) | |
PrefillCache() | |
DistanceBetweenCompressedVectorsFromIDs(ctx context.Context, x, y uint64) (float32, error) | |
DistanceBetweenCompressedAndUncompressedVectorsFromID(ctx context.Context, x uint64, y []float32) (float32, error) | |
NewDistancer(vector []float32) (CompressorDistancer, ReturnDistancerFn) | |
NewDistancerFromID(id uint64) CompressorDistancer | |
NewBag() CompressionDistanceBag | |
ExposeFields() PQData | |
} | |
type quantizedVectorsCompressor[T byte | uint64] struct { | |
cache cache.Cache[T] | |
compressedStore *lsmkv.Store | |
quantizer quantizer[T] | |
} | |
func (compressor *quantizedVectorsCompressor[T]) Drop() error { | |
compressor.cache.Drop() | |
return nil | |
} | |
func (compressor *quantizedVectorsCompressor[T]) GrowCache(size uint64) { | |
compressor.cache.Grow(size) | |
} | |
func (compressor *quantizedVectorsCompressor[T]) SetCacheMaxSize(size int64) { | |
compressor.cache.UpdateMaxSize(size) | |
} | |
func (compressor *quantizedVectorsCompressor[T]) GetCacheMaxSize() int64 { | |
return compressor.cache.CopyMaxSize() | |
} | |
func (compressor *quantizedVectorsCompressor[T]) Delete(ctx context.Context, id uint64) { | |
compressor.cache.Delete(ctx, id) | |
idBytes := make([]byte, 8) | |
binary.BigEndian.PutUint64(idBytes, id) | |
compressor.compressedStore.Bucket(helpers.VectorsCompressedBucketLSM).Delete(idBytes) | |
} | |
func (compressor *quantizedVectorsCompressor[T]) Preload(id uint64, vector []float32) { | |
compressedVector := compressor.quantizer.Encode(vector) | |
idBytes := make([]byte, 8) | |
binary.BigEndian.PutUint64(idBytes, id) | |
compressor.compressedStore.Bucket(helpers.VectorsCompressedBucketLSM).Put(idBytes, compressor.quantizer.CompressedBytes(compressedVector)) | |
compressor.cache.Grow(id) | |
compressor.cache.Preload(id, compressedVector) | |
} | |
func (compressor *quantizedVectorsCompressor[T]) Prefetch(id uint64) { | |
compressor.cache.Prefetch(id) | |
} | |
func (compressor *quantizedVectorsCompressor[T]) DistanceBetweenCompressedVectors(x, y []T) (float32, error) { | |
return compressor.quantizer.DistanceBetweenCompressedVectors(x, y) | |
} | |
func (compressor *quantizedVectorsCompressor[T]) DistanceBetweenCompressedAndUncompressedVectors(x []T, y []float32) (float32, error) { | |
return compressor.quantizer.DistanceBetweenCompressedAndUncompressedVectors(y, x) | |
} | |
func (compressor *quantizedVectorsCompressor[T]) compressedVectorFromID(ctx context.Context, id uint64) ([]T, error) { | |
compressedVector, err := compressor.cache.Get(ctx, id) | |
if err != nil { | |
return nil, err | |
} | |
if len(compressedVector) == 0 { | |
return nil, fmt.Errorf("got a nil or zero-length vector at docID %d", id) | |
} | |
return compressedVector, nil | |
} | |
func (compressor *quantizedVectorsCompressor[T]) DistanceBetweenCompressedVectorsFromIDs(ctx context.Context, id1, id2 uint64) (float32, error) { | |
compressedVector1, err := compressor.compressedVectorFromID(ctx, id1) | |
if err != nil { | |
return 0, err | |
} | |
compressedVector2, err := compressor.compressedVectorFromID(ctx, id2) | |
if err != nil { | |
return 0, err | |
} | |
dist, err := compressor.DistanceBetweenCompressedVectors(compressedVector1, compressedVector2) | |
return dist, err | |
} | |
func (compressor *quantizedVectorsCompressor[T]) DistanceBetweenCompressedAndUncompressedVectorsFromID(ctx context.Context, id uint64, vector []float32) (float32, error) { | |
compressedVector, err := compressor.compressedVectorFromID(ctx, id) | |
if err != nil { | |
return 0, err | |
} | |
dist, err := compressor.DistanceBetweenCompressedAndUncompressedVectors(compressedVector, vector) | |
return dist, err | |
} | |
func (compressor *quantizedVectorsCompressor[T]) getCompressedVectorForID(ctx context.Context, id uint64) ([]T, error) { | |
idBytes := make([]byte, 8) | |
binary.BigEndian.PutUint64(idBytes, id) | |
compressedVector, err := compressor.compressedStore.Bucket(helpers.VectorsCompressedBucketLSM).Get(idBytes) | |
if err != nil { | |
return nil, errors.Wrap(err, "Getting vector for id") | |
} | |
if len(compressedVector) == 0 { | |
return nil, storobj.NewErrNotFoundf(id, "getCompressedVectorForID") | |
} | |
return compressor.quantizer.FromCompressedBytes(compressedVector), nil | |
} | |
func (compressor *quantizedVectorsCompressor[T]) NewDistancer(vector []float32) (CompressorDistancer, ReturnDistancerFn) { | |
d := &quantizedCompressorDistancer[T]{ | |
compressor: compressor, | |
distancer: compressor.quantizer.NewQuantizerDistancer(vector), | |
} | |
return d, func() { | |
compressor.returnDistancer(d) | |
} | |
} | |
func (compressor *quantizedVectorsCompressor[T]) NewDistancerFromID(id uint64) CompressorDistancer { | |
compressedVector, _ := compressor.compressedVectorFromID(context.Background(), id) | |
d := &quantizedCompressorDistancer[T]{ | |
compressor: compressor, | |
distancer: compressor.quantizer.NewCompressedQuantizerDistancer(compressedVector), | |
} | |
return d | |
} | |
func (compressor *quantizedVectorsCompressor[T]) returnDistancer(distancer CompressorDistancer) { | |
dst := distancer.(*quantizedCompressorDistancer[T]).distancer | |
if dst == nil { | |
return | |
} | |
compressor.quantizer.ReturnQuantizerDistancer(dst) | |
} | |
func (compressor *quantizedVectorsCompressor[T]) NewBag() CompressionDistanceBag { | |
return &quantizedDistanceBag[T]{ | |
compressor: compressor, | |
elements: make(map[uint64][]T), | |
} | |
} | |
func (compressor *quantizedVectorsCompressor[T]) initCompressedStore() error { | |
err := compressor.compressedStore.CreateOrLoadBucket(context.Background(), helpers.VectorsCompressedBucketLSM) | |
if err != nil { | |
return errors.Wrapf(err, "Create or load bucket (compressed vectors store)") | |
} | |
return nil | |
} | |
func (compressor *quantizedVectorsCompressor[T]) PrefillCache() { | |
cursor := compressor.compressedStore.Bucket(helpers.VectorsCompressedBucketLSM).Cursor() | |
for k, v := cursor.First(); k != nil; k, v = cursor.Next() { | |
id := binary.BigEndian.Uint64(k) | |
compressor.cache.Grow(id) | |
vc := make([]byte, len(v)) | |
copy(vc, v) | |
compressor.cache.Preload(id, compressor.quantizer.FromCompressedBytes(vc)) | |
} | |
cursor.Close() | |
} | |
func (compressor *quantizedVectorsCompressor[T]) ExposeFields() PQData { | |
return compressor.quantizer.ExposeFields() | |
} | |
func NewPQCompressor( | |
cfg hnsw.PQConfig, | |
distance distancer.Provider, | |
dimensions int, | |
vectorCacheMaxObjects int, | |
logger logrus.FieldLogger, | |
data [][]float32, | |
store *lsmkv.Store, | |
) (VectorCompressor, error) { | |
quantizer, err := NewProductQuantizer(cfg, distance, dimensions) | |
if err != nil { | |
return nil, err | |
} | |
pqVectorsCompressor := &quantizedVectorsCompressor[byte]{ | |
quantizer: quantizer, | |
compressedStore: store, | |
} | |
pqVectorsCompressor.initCompressedStore() | |
pqVectorsCompressor.cache = cache.NewShardedByteLockCache(pqVectorsCompressor.getCompressedVectorForID, vectorCacheMaxObjects, logger, 0) | |
pqVectorsCompressor.cache.Grow(uint64(len(data))) | |
quantizer.Fit(data) | |
return pqVectorsCompressor, nil | |
} | |
func RestorePQCompressor( | |
cfg hnsw.PQConfig, | |
distance distancer.Provider, | |
dimensions int, | |
vectorCacheMaxObjects int, | |
logger logrus.FieldLogger, | |
encoders []PQEncoder, | |
store *lsmkv.Store, | |
) (VectorCompressor, error) { | |
quantizer, err := NewProductQuantizerWithEncoders(cfg, distance, dimensions, encoders) | |
if err != nil { | |
return nil, err | |
} | |
pqVectorsCompressor := &quantizedVectorsCompressor[byte]{ | |
quantizer: quantizer, | |
compressedStore: store, | |
} | |
pqVectorsCompressor.initCompressedStore() | |
pqVectorsCompressor.cache = cache.NewShardedByteLockCache(pqVectorsCompressor.getCompressedVectorForID, vectorCacheMaxObjects, logger, 0) | |
return pqVectorsCompressor, nil | |
} | |
func NewBQCompressor( | |
distance distancer.Provider, | |
vectorCacheMaxObjects int, | |
logger logrus.FieldLogger, | |
store *lsmkv.Store, | |
) (VectorCompressor, error) { | |
quantizer := NewBinaryQuantizer(distance) | |
bqVectorsCompressor := &quantizedVectorsCompressor[uint64]{ | |
quantizer: &quantizer, | |
compressedStore: store, | |
} | |
bqVectorsCompressor.initCompressedStore() | |
bqVectorsCompressor.cache = cache.NewShardedUInt64LockCache(bqVectorsCompressor.getCompressedVectorForID, vectorCacheMaxObjects, logger, 0) | |
return bqVectorsCompressor, nil | |
} | |
type quantizedCompressorDistancer[T byte | uint64] struct { | |
compressor *quantizedVectorsCompressor[T] | |
distancer quantizerDistancer[T] | |
} | |
func (distancer *quantizedCompressorDistancer[T]) DistanceToNode(id uint64) (float32, bool, error) { | |
compressedVector, err := distancer.compressor.cache.Get(context.Background(), id) | |
if err != nil { | |
return 0, false, err | |
} | |
return distancer.distancer.Distance(compressedVector) | |
} | |
func (distancer *quantizedCompressorDistancer[T]) DistanceToFloat(vector []float32) (float32, bool, error) { | |
return distancer.distancer.DistanceToFloat(vector) | |
} | |