SemanticSearchPOC / adapters /repos /db /vector /hnsw /compress_deletes_test.go
KevinStephenson
Adding in weaviate code
b110593
raw
history blame
5.43 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
//go:build !race
package hnsw
import (
"context"
"fmt"
"os"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaviate/weaviate/adapters/repos/db/vector/common"
"github.com/weaviate/weaviate/adapters/repos/db/vector/compressionhelpers"
"github.com/weaviate/weaviate/adapters/repos/db/vector/hnsw/distancer"
"github.com/weaviate/weaviate/adapters/repos/db/vector/testinghelpers"
"github.com/weaviate/weaviate/entities/cyclemanager"
"github.com/weaviate/weaviate/entities/storobj"
ent "github.com/weaviate/weaviate/entities/vectorindex/hnsw"
)
func Test_NoRaceCompressDoesNotCrash(t *testing.T) {
efConstruction := 64
ef := 32
maxNeighbors := 32
dimensions := 20
vectors_size := 10000
queries_size := 100
k := 100
delete_indices := make([]uint64, 0, 1000)
for i := 0; i < 1000; i++ {
delete_indices = append(delete_indices, uint64(i+10))
}
delete_indices = append(delete_indices, uint64(1))
vectors, queries := testinghelpers.RandomVecs(vectors_size, queries_size, dimensions)
distancer := distancer.NewL2SquaredProvider()
uc := ent.UserConfig{}
uc.MaxConnections = maxNeighbors
uc.EFConstruction = efConstruction
uc.EF = ef
uc.VectorCacheMaxObjects = 10e12
uc.PQ = ent.PQConfig{Enabled: true, Encoder: ent.PQEncoder{Type: "title", Distribution: "normal"}}
index, _ := New(Config{
RootPath: t.TempDir(),
ID: "recallbenchmark",
MakeCommitLoggerThunk: MakeNoopCommitLogger,
DistanceProvider: distancer,
VectorForIDThunk: func(ctx context.Context, id uint64) ([]float32, error) {
if int(id) >= len(vectors) {
return nil, storobj.NewErrNotFoundf(id, "out of range")
}
return vectors[int(id)], nil
},
TempVectorForIDThunk: func(ctx context.Context, id uint64, container *common.VectorSlice) ([]float32, error) {
copy(container.Slice, vectors[int(id)])
return container.Slice, nil
},
}, uc, cyclemanager.NewCallbackGroupNoop(), cyclemanager.NewCallbackGroupNoop(),
cyclemanager.NewCallbackGroupNoop(), testinghelpers.NewDummyStore(t))
defer index.Shutdown(context.Background())
compressionhelpers.Concurrently(uint64(len(vectors)), func(id uint64) {
index.Add(uint64(id), vectors[id])
})
index.Delete(delete_indices...)
cfg := ent.PQConfig{
Enabled: true,
Encoder: ent.PQEncoder{
Type: ent.PQEncoderTypeKMeans,
Distribution: ent.PQEncoderDistributionLogNormal,
},
Segments: dimensions,
Centroids: 256,
}
uc.PQ = cfg
index.compress(uc)
for _, v := range queries {
_, _, err := index.SearchByVector(v, k, nil)
assert.Nil(t, err)
}
}
func TestHnswPqNilVectors(t *testing.T) {
dimensions := 20
vectors_size := 10_000
queries_size := 10
vectors, _ := testinghelpers.RandomVecs(vectors_size, queries_size, dimensions)
// set some vectors to nil
for i := range vectors {
if i == 500 {
vectors[i] = nil
}
}
userConfig := ent.UserConfig{
MaxConnections: 30,
EFConstruction: 64,
EF: 32,
// The actual size does not matter for this test, but if it defaults to
// zero it will constantly think it's full and needs to be deleted - even
// after just being deleted, so make sure to use a positive number here.
VectorCacheMaxObjects: 1000000,
}
rootPath := "doesnt-matter-as-committlogger-is-mocked-out"
defer func(path string) {
err := os.RemoveAll(path)
if err != nil {
fmt.Println(err)
}
}(rootPath)
index, err := New(Config{
RootPath: rootPath,
ID: "nil-vector-test",
MakeCommitLoggerThunk: MakeNoopCommitLogger,
DistanceProvider: distancer.NewCosineDistanceProvider(),
VectorForIDThunk: func(ctx context.Context, id uint64) ([]float32, error) {
vec := vectors[int(id)]
if vec == nil {
return nil, storobj.NewErrNotFoundf(id, "nil vec")
}
return vec, nil
},
TempVectorForIDThunk: TempVectorForIDThunk(vectors),
}, userConfig, cyclemanager.NewCallbackGroupNoop(), cyclemanager.NewCallbackGroupNoop(), cyclemanager.NewCallbackGroupNoop(), testinghelpers.NewDummyStore(t))
require.NoError(t, err)
compressionhelpers.Concurrently(uint64(len(vectors)/2), func(id uint64) {
if vectors[id] == nil {
return
}
err := index.Add(uint64(id), vectors[id])
require.Nil(t, err)
})
userConfig.PQ = ent.PQConfig{
Enabled: true,
Encoder: ent.PQEncoder{
Type: ent.PQEncoderTypeTile,
Distribution: ent.PQEncoderDistributionLogNormal,
},
BitCompression: false,
Segments: dimensions,
Centroids: 256,
}
ch := make(chan error)
err = index.UpdateUserConfig(userConfig, func() {
close(ch)
})
require.NoError(t, err)
<-ch
start := uint64(len(vectors) / 2)
compressionhelpers.Concurrently(uint64(len(vectors)/2), func(id uint64) {
if vectors[id+start] == nil {
return
}
err = index.Add(uint64(id)+start, vectors[id+start])
require.Nil(t, err)
})
}