KevinStephenson
Adding in weaviate code
b110593
raw
history blame
19.2 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
//go:build benchmarkSiftRecall
// +build benchmarkSiftRecall
package hnsw_test
import (
"context"
"fmt"
"io/ioutil"
"math"
"os"
"strconv"
"strings"
"sync"
"testing"
"time"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaviate/weaviate/adapters/repos/db/lsmkv"
"github.com/weaviate/weaviate/adapters/repos/db/vector/compressionhelpers"
"github.com/weaviate/weaviate/adapters/repos/db/vector/hnsw"
"github.com/weaviate/weaviate/adapters/repos/db/vector/hnsw/distancer"
"github.com/weaviate/weaviate/adapters/repos/db/vector/testinghelpers"
"github.com/weaviate/weaviate/entities/cyclemanager"
ent "github.com/weaviate/weaviate/entities/vectorindex/hnsw"
)
func distanceWrapper(provider distancer.Provider) func(x, y []float32) float32 {
return func(x, y []float32) float32 {
dist, _, _ := provider.SingleDist(x, y)
return dist
}
}
const rootPath = "doesnt-matter-as-committlogger-is-mocked-out"
func TestRecall(t *testing.T) {
defer func(path string) {
err := os.RemoveAll(path)
if err != nil {
fmt.Println(err)
}
}(rootPath)
fmt.Println("Sift1MPQKMeans 10K/1K")
efConstruction := 64
ef := 32
maxNeighbors := 32
dimensions := 128
vectors_size := 200000
queries_size := 100
switch_at := vectors_size
before := time.Now()
vectors, queries := testinghelpers.RandomVecs(vectors_size, queries_size, dimensions)
k := 10
distancer := distancer.NewL2SquaredProvider()
fmt.Printf("generating data took %s\n", time.Since(before))
uc := ent.UserConfig{}
uc.MaxConnections = maxNeighbors
uc.EFConstruction = efConstruction
uc.EF = ef
uc.VectorCacheMaxObjects = 10e12
index, _ := hnsw.New(hnsw.Config{
RootPath: rootPath,
ID: "recallbenchmark",
MakeCommitLoggerThunk: hnsw.MakeNoopCommitLogger,
DistanceProvider: distancer,
VectorForIDThunk: func(ctx context.Context, id uint64) ([]float32, error) {
return vectors[int(id)], nil
},
}, uc, newDummyStore(t))
init := time.Now()
compressionhelpers.Concurrently(uint64(switch_at), func(_, id uint64, _ *sync.Mutex) {
index.Add(uint64(id), vectors[id])
if id%1000 == 0 {
fmt.Println(id, time.Since(before))
}
})
before = time.Now()
uc.PQ.Enabled = true
index.UpdateUserConfig(uc, func() {}) /*should have configuration.pr.enabled = true*/
fmt.Printf("Time to compress: %s", time.Since(before))
fmt.Println()
compressionhelpers.Concurrently(uint64(vectors_size-switch_at), func(_, id uint64, _ *sync.Mutex) {
idx := switch_at + int(id)
index.Add(uint64(idx), vectors[idx])
if id%1000 == 0 {
fmt.Println(idx, time.Since(before))
}
})
fmt.Printf("Building the index took %s\n", time.Since(init))
lastRecall := float32(0.0)
for _, currentEF := range []int{32, 64, 128, 256, 512} {
uc.EF = currentEF
index.UpdateUserConfig(uc, func() {})
fmt.Println(currentEF)
var relevant uint64
var retrieved int
var querying time.Duration = 0
for i := 0; i < len(queries); i++ {
truth := testinghelpers.BruteForce(vectors, queries[i], k, distanceWrapper(distancer))
before = time.Now()
results, _, _ := index.SearchByVector(queries[i], k, nil)
querying += time.Since(before)
retrieved += k
relevant += testinghelpers.MatchesInLists(truth, results)
}
recall := float32(relevant) / float32(retrieved)
assert.True(t, recall > float32(lastRecall))
lastRecall = recall
}
assert.True(t, lastRecall > 0.95)
}
func TestHnswPqGist(t *testing.T) {
defer func(path string) {
err := os.RemoveAll(path)
if err != nil {
fmt.Println(err)
}
}(rootPath)
params := [][]int{
//{64, 64, 32},
{128, 128, 64},
{256, 256, 128},
{512, 512, 256},
}
dimensions := 960
vectors_size := 1000000
queries_size := 1000
switch_at := 200000
before := time.Now()
vectors, queries := testinghelpers.ReadVecs(vectors_size, queries_size, dimensions, "gist", "../diskAnn/testdata")
testinghelpers.Normalize(vectors)
testinghelpers.Normalize(queries)
for i, v := range vectors {
for j, x := range v {
if math.IsNaN(float64(x)) {
fmt.Println(i, j, v, x)
}
}
}
k := 100
distancer := distancer.NewCosineDistanceProvider()
truths := testinghelpers.BuildTruths(queries_size, vectors_size, queries, vectors, k, distanceWrapper(distancer), "../diskAnn/testdata/gist/cosine")
fmt.Printf("generating data took %s\n", time.Since(before))
for segmentRate := 3; segmentRate < 4; segmentRate++ {
fmt.Println(segmentRate)
fmt.Println()
for i := 0; i < len(params); i++ {
efConstruction := params[i][0]
ef := params[i][1]
maxNeighbors := params[i][2]
uc := ent.UserConfig{
MaxConnections: maxNeighbors,
EFConstruction: efConstruction,
EF: ef,
VectorCacheMaxObjects: 10e12,
PQ: ent.PQConfig{
Enabled: false,
Segments: dimensions / int(math.Pow(2, float64(segmentRate))),
Encoder: ent.PQEncoder{
Type: ent.PQEncoderTypeKMeans,
Distribution: ent.PQEncoderDistributionLogNormal,
},
},
}
index, _ := hnsw.New(hnsw.Config{
RootPath: rootPath,
ID: "recallbenchmark",
MakeCommitLoggerThunk: hnsw.MakeNoopCommitLogger,
DistanceProvider: distancer,
VectorForIDThunk: func(ctx context.Context, id uint64) ([]float32, error) {
return vectors[int(id)], nil
},
}, uc, newDummyStore(t))
init := time.Now()
total := 200000
compressionhelpers.Concurrently(uint64(switch_at), func(_, id uint64, _ *sync.Mutex) {
total++
if total%100000 == 0 {
fmt.Println(total)
}
index.Add(uint64(id), vectors[id])
})
before = time.Now()
uc.PQ.Enabled = true
index.UpdateUserConfig(uc, func() {})
fmt.Printf("Time to compress: %s", time.Since(before))
fmt.Println()
compressionhelpers.Concurrently(uint64(vectors_size-switch_at), func(_, id uint64, _ *sync.Mutex) {
idx := switch_at + int(id)
index.Add(uint64(idx), vectors[idx])
})
fmt.Printf("Building the index took %s\n", time.Since(init))
var relevant uint64
var retrieved int
var querying time.Duration = 0
compressionhelpers.Concurrently(uint64(len(queries)), func(_, i uint64, _ *sync.Mutex) {
before = time.Now()
results, _, _ := index.SearchByVector(queries[i], k, nil)
querying += time.Since(before)
retrieved += k
relevant += testinghelpers.MatchesInLists(truths[i], results)
})
recall := float32(relevant) / float32(retrieved)
latency := float32(querying.Microseconds()) / float32(queries_size)
fmt.Println(recall, latency)
assert.True(t, recall > 0.9)
assert.True(t, latency < 100000)
}
}
}
/*
10K
128 segments, 16 centroids -> 5.280255291s 0.90662 387.505
64 segments, 256 centroids -> 6.585159916s 0.9326827 410.413
100K
128 segments, 16 centroids -> 1m17.634662125s 0.88258 692.081
64 segments, 256 centroids -> 1m29.259369458s 0.92157 575.844
100000
128
Building the index took 47.7846745s
0.92627 664.66
{64, 64, 32, 256, 0},
{64, 64, 32, 1024, 1},
{64, 64, 32, 4096, 1},
{64, 64, 32, 16384, 1},
{64, 64, 32, 65536, 1},
generating data took 2.072473792s
0
Time to compress: 5m39.750884042s
Building the index took 16m30.632114542s
0.91401 747.82
1
Time to compress: 13m12.011102334s
Building the index took 28m12.879802125s
0.89564 1041.358
2
Time to compress: 58m15.252058416s
Building the index took 1h37m10.217039334s
0.90836 2299.629
3
Time to compress: 3h59m39.032524584s
Building the index took 5h54m55.046038916s
0.91295 4786.8
generating data took 2.119674416s
0
Start compressing...
Time to compress: 1m3.037853584s
Building the index took 3m53.429316209s
0.40992 169.937
1
Start compressing...
Time to compress: 2m4.653952334s
Building the index took 5m44.454856667s
0.46251252 207.299
2
Start compressing...
Time to compress: 4m7.585857584s
Building the index took 8m36.2004675s
0.50494 293.362
3
Start compressing...
Time to compress: 8m16.4155035s
Building the index took 14m37.089003166s
0.54421 390.49
4
Start compressing...
Time to compress: 16m32.313318708s
Building the index took 26m46.66661125s
0.57827 442.589
*/
func TestHnswPqSift(t *testing.T) {
defer func(path string) {
err := os.RemoveAll(path)
if err != nil {
fmt.Println(err)
}
}(rootPath)
params := [][]int{
{64, 64, 32, 256, 3},
{64, 64, 32, 512, 3},
{64, 64, 32, 1024, 3},
{64, 64, 32, 2048, 3},
{64, 64, 32, 4096, 3},
{64, 64, 32, 65536, 3},
}
dimensions := 128
vectors_size := 1000000
queries_size := 1000
switch_at := 200000
fmt.Println("Sift1M PQ")
before := time.Now()
vectors, queries := testinghelpers.ReadVecs(vectors_size, queries_size, dimensions, "sift", "../diskAnn/testdata")
k := 100
distancer := distancer.NewL2SquaredProvider()
truths := testinghelpers.BuildTruths(queries_size, vectors_size, queries, vectors, k, distanceWrapper(distancer), "../diskAnn/testdata")
fmt.Printf("generating data took %s\n", time.Since(before))
for i := 0; i < len(params); i++ {
fmt.Println(i)
efConstruction := params[i][0]
ef := params[i][1]
maxNeighbors := params[i][2]
centroids := params[i][3]
segmentRate := params[i][4]
if centroids > switch_at {
fmt.Println("Increasing switch at...")
switch_at = 650000
}
uc := ent.UserConfig{
MaxConnections: maxNeighbors,
EFConstruction: efConstruction,
EF: ef,
PQ: ent.PQConfig{
Enabled: false,
Segments: dimensions / int(math.Pow(2, float64(segmentRate))),
Encoder: ent.PQEncoder{
Type: ent.PQEncoderTypeTile,
Distribution: ent.PQEncoderDistributionLogNormal,
},
},
VectorCacheMaxObjects: 10e12,
}
index, _ := hnsw.New(hnsw.Config{
RootPath: rootPath,
ID: "recallbenchmark",
MakeCommitLoggerThunk: hnsw.MakeNoopCommitLogger,
DistanceProvider: distancer,
VectorForIDThunk: func(ctx context.Context, id uint64) ([]float32, error) {
return vectors[int(id)], nil
},
}, uc, newDummyStore(t))
init := time.Now()
compressionhelpers.Concurrently(uint64(switch_at), func(_, id uint64, _ *sync.Mutex) {
index.Add(uint64(id), vectors[id])
})
before = time.Now()
fmt.Println("Start compressing...")
cfg := ent.PQConfig{
Enabled: true,
Segments: dimensions / int(math.Pow(2, float64(segmentRate))),
Centroids: centroids,
BitCompression: false,
Encoder: ent.PQEncoder{
Type: ent.PQEncoderTypeKMeans,
Distribution: ent.PQEncoderDistributionLogNormal,
},
}
index.Compress(cfg) /*should have configuration.compressed = true*/
fmt.Printf("Time to compress: %s", time.Since(before))
fmt.Println()
compressionhelpers.Concurrently(uint64(vectors_size-switch_at), func(_, id uint64, _ *sync.Mutex) {
idx := switch_at + int(id)
index.Add(uint64(idx), vectors[idx])
})
fmt.Printf("Building the index took %s\n", time.Since(init))
var relevant uint64
var retrieved int
var querying time.Duration = 0
compressionhelpers.Concurrently(uint64(len(queries)), func(_, i uint64, _ *sync.Mutex) {
before = time.Now()
results, _, _ := index.SearchByVector(queries[i], k, nil)
querying += time.Since(before)
retrieved += k
relevant += testinghelpers.MatchesInLists(truths[i], results)
})
recall := float32(relevant) / float32(retrieved)
latency := float32(querying.Microseconds()) / float32(queries_size)
fmt.Println(recall, latency)
assert.True(t, recall > 0.9)
assert.True(t, latency < 100000)
}
}
func TestHnswPqSiftDeletes(t *testing.T) {
defer func(path string) {
err := os.RemoveAll(path)
if err != nil {
fmt.Println(err)
}
}(rootPath)
params := [][]int{
{64, 64, 32},
}
dimensions := 128
vectors_size := 10000
queries_size := 1000
switch_at := 2000
fmt.Println("Sift1M PQ Deletes")
before := time.Now()
vectors, queries := testinghelpers.ReadVecs(vectors_size, queries_size, dimensions, "sift", "../diskAnn/testdata")
k := 100
distancer := distancer.NewL2SquaredProvider()
truths := testinghelpers.BuildTruths(queries_size, vectors_size, queries, vectors, k, distanceWrapper(distancer), "../diskAnn/testdata")
fmt.Printf("generating data took %s\n", time.Since(before))
for segmentRate := 0; segmentRate < 1; segmentRate++ {
fmt.Println(segmentRate)
fmt.Println()
for i := 0; i < len(params); i++ {
efConstruction := params[i][0]
ef := params[i][1]
maxNeighbors := params[i][2]
uc := ent.UserConfig{
MaxConnections: maxNeighbors,
EFConstruction: efConstruction,
EF: ef,
PQ: ent.PQConfig{
Enabled: false,
Segments: dimensions / int(math.Pow(2, float64(segmentRate))),
Encoder: ent.PQEncoder{
Type: "tile",
Distribution: "log-normal",
},
},
VectorCacheMaxObjects: 10e12,
}
index, _ := hnsw.New(hnsw.Config{
RootPath: rootPath,
ID: "recallbenchmark",
MakeCommitLoggerThunk: hnsw.MakeNoopCommitLogger,
DistanceProvider: distancer,
VectorForIDThunk: func(ctx context.Context, id uint64) ([]float32, error) {
return vectors[int(id)], nil
},
}, uc, newDummyStore(t))
init := time.Now()
compressionhelpers.Concurrently(uint64(switch_at), func(_, id uint64, _ *sync.Mutex) {
index.Add(uint64(id), vectors[id])
})
before = time.Now()
uc.PQ.Enabled = true
index.UpdateUserConfig(uc, func() {}) /*should have configuration.compressed = true*/
fmt.Printf("Time to compress: %s", time.Since(before))
fmt.Println()
compressionhelpers.Concurrently(uint64(vectors_size-switch_at), func(_, id uint64, _ *sync.Mutex) {
idx := switch_at + int(id)
index.Add(uint64(idx), vectors[idx])
})
fmt.Printf("Building the index took %s\n", time.Since(init))
var relevant uint64
var retrieved int
var querying time.Duration = 0
compressionhelpers.Concurrently(uint64(len(queries)), func(_, i uint64, _ *sync.Mutex) {
before = time.Now()
results, _, _ := index.SearchByVector(queries[i], k, nil)
querying += time.Since(before)
retrieved += k
relevant += testinghelpers.MatchesInLists(truths[i], results)
})
recall := float32(relevant) / float32(retrieved)
latency := float32(querying.Microseconds()) / float32(queries_size)
fmt.Println(recall, latency)
assert.True(t, recall > 0.9)
assert.True(t, latency < 100000)
}
}
}
func TestHnswPqDeepImage(t *testing.T) {
defer func(path string) {
err := os.RemoveAll(path)
if err != nil {
fmt.Println(err)
}
}(rootPath)
vectors_size := 9990000
queries_size := 1000
vectors := parseFromTxt("../diskAnn/testdata/deep-image/train.txt", vectors_size)
queries := parseFromTxt("../diskAnn/testdata/deep-image/test.txt", queries_size)
dimensions := 96
params := [][]int{
{64, 64, 32},
{128, 128, 64},
{256, 256, 128},
{512, 512, 256},
}
switch_at := 1000000
fmt.Println("Sift1MPQKMeans 10K/10K")
before := time.Now()
k := 100
distancer := distancer.NewL2SquaredProvider()
truths := testinghelpers.BuildTruths(queries_size, vectors_size, queries, vectors, k, distanceWrapper(distancer), "../diskAnn/testdata/deep-image")
fmt.Printf("generating data took %s\n", time.Since(before))
for segmentRate := 1; segmentRate < 4; segmentRate++ {
fmt.Println(segmentRate)
fmt.Println()
for i := 0; i < len(params); i++ {
efConstruction := params[i][0]
ef := params[i][1]
maxNeighbors := params[i][2]
uc := ent.UserConfig{
MaxConnections: maxNeighbors,
EFConstruction: efConstruction,
EF: ef,
PQ: ent.PQConfig{
Enabled: false,
Segments: dimensions / int(math.Pow(2, float64(segmentRate))),
Encoder: ent.PQEncoder{
Type: ent.PQEncoderTypeKMeans,
Distribution: ent.PQEncoderDistributionNormal,
},
},
VectorCacheMaxObjects: 10e12,
}
index, _ := hnsw.New(hnsw.Config{
RootPath: rootPath,
ID: "recallbenchmark",
MakeCommitLoggerThunk: hnsw.MakeNoopCommitLogger,
DistanceProvider: distancer,
VectorForIDThunk: func(ctx context.Context, id uint64) ([]float32, error) {
return vectors[int(id)], nil
},
}, uc, newDummyStore(t))
init := time.Now()
compressionhelpers.Concurrently(uint64(switch_at), func(_, id uint64, _ *sync.Mutex) {
index.Add(uint64(id), vectors[id])
})
before = time.Now()
uc.PQ.Enabled = true
index.UpdateUserConfig(uc, func() {})
fmt.Printf("Time to compress: %s", time.Since(before))
fmt.Println()
compressionhelpers.Concurrently(uint64(vectors_size-switch_at), func(_, id uint64, _ *sync.Mutex) {
idx := switch_at + int(id)
index.Add(uint64(idx), vectors[idx])
})
fmt.Printf("Building the index took %s\n", time.Since(init))
var relevant uint64
var retrieved int
var querying time.Duration = 0
compressionhelpers.Concurrently(uint64(len(queries)), func(_, i uint64, _ *sync.Mutex) {
before = time.Now()
results, _, _ := index.SearchByVector(queries[i], k, nil)
querying += time.Since(before)
retrieved += k
relevant += testinghelpers.MatchesInLists(truths[i], results)
})
recall := float32(relevant) / float32(retrieved)
latency := float32(querying.Microseconds()) / float32(queries_size)
fmt.Println(recall, latency)
assert.True(t, recall > 0.9)
assert.True(t, latency < 100000)
}
}
}
func parseFromTxt(file string, size int) [][]float32 {
content, _ := ioutil.ReadFile(file)
strContent := string(content)
testArray := strings.Split(strContent, "\n")
test := make([][]float32, 0, len(testArray))
for j := 0; j < size; j++ {
elementArray := strings.Split(testArray[j], " ")
test = append(test, make([]float32, len(elementArray)))
for i := range elementArray {
f, _ := strconv.ParseFloat(elementArray[i], 16)
test[j][i] = float32(f)
}
}
return test
}
func newDummyStore(t *testing.T) *lsmkv.Store {
logger, _ := test.NewNullLogger()
storeDir := t.TempDir()
store, err := lsmkv.New(storeDir, storeDir, logger, nil,
cyclemanager.NewCallbackGroupNoop(), cyclemanager.NewCallbackGroupNoop())
require.Nil(t, err)
return store
}