Spaces:
Running
Running
// _ _ | |
// __ _____ __ ___ ___ __ _| |_ ___ | |
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
// \ V V / __/ (_| |\ V /| | (_| | || __/ | |
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
// | |
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. | |
// | |
// CONTACT: [email protected] | |
// | |
//go:build integrationTest | |
// +build integrationTest | |
package lsmkv | |
import ( | |
"bytes" | |
"context" | |
"crypto/rand" | |
"fmt" | |
"reflect" | |
"sync" | |
"testing" | |
"time" | |
"github.com/google/uuid" | |
"github.com/stretchr/testify/assert" | |
"github.com/stretchr/testify/require" | |
"github.com/weaviate/weaviate/entities/cyclemanager" | |
) | |
// This test continuously writes into a bucket with a small memtable threshold, | |
// so that a lot of flushing is happening while writing. This is to ensure that | |
// there will be no lost writes or other inconsistencies under load | |
func TestConcurrentWriting_Replace(t *testing.T) { | |
dirName := t.TempDir() | |
amount := 2000 | |
sizePerValue := 128 | |
keys := make([][]byte, amount) | |
values := make([][]byte, amount) | |
bucket, err := NewBucket(testCtx(), dirName, "", nullLogger(), nil, | |
cyclemanager.NewCallbackGroupNoop(), cyclemanager.NewCallbackGroupNoop(), | |
WithStrategy(StrategyReplace), | |
WithMemtableThreshold(10000)) | |
require.Nil(t, err) | |
t.Run("generate random data", func(t *testing.T) { | |
for i := range keys { | |
uuid, err := uuid.New().MarshalBinary() | |
require.Nil(t, err) | |
keys[i] = uuid | |
values[i] = make([]byte, sizePerValue) | |
rand.Read(values[i]) | |
} | |
}) | |
t.Run("import", func(t *testing.T) { | |
wg := sync.WaitGroup{} | |
for i := range keys { | |
time.Sleep(50 * time.Microsecond) | |
wg.Add(1) | |
go func(index int) { | |
defer wg.Done() | |
err := bucket.Put(keys[index], values[index]) | |
assert.Nil(t, err) | |
}(i) | |
} | |
wg.Wait() | |
}) | |
t.Run("verify get", func(t *testing.T) { | |
correct := 0 | |
var missingKeys []int | |
for i := range keys { | |
value, err := bucket.Get(keys[i]) | |
assert.Nil(t, err) | |
if bytes.Equal(values[i], value) { | |
correct++ | |
} else { | |
missingKeys = append(missingKeys, i) | |
} | |
} | |
if len(missingKeys) > 0 { | |
fmt.Printf("missing keys: %v\n", missingKeys) | |
} | |
assert.Equal(t, amount, correct) | |
}) | |
t.Run("verify cursor", func(t *testing.T) { | |
correct := 0 | |
// put all key value/pairs in a map so we can access them by key | |
targets := map[string][]byte{} | |
for i := range keys { | |
targets[string(keys[i])] = values[i] | |
} | |
c := bucket.Cursor() | |
defer c.Close() | |
for k, v := c.First(); k != nil; k, v = c.Next() { | |
control := targets[string(k)] | |
if bytes.Equal(control, v) { | |
correct++ | |
} | |
} | |
assert.Equal(t, amount, correct) | |
}) | |
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) | |
defer cancel() | |
require.Nil(t, bucket.Shutdown(ctx)) | |
} | |
// This test continuously writes into a bucket with a small memtable threshold, | |
// so that a lot of flushing is happening while writing. This is to ensure that | |
// there will be no lost writes or other inconsistencies under load | |
func TestConcurrentWriting_Set(t *testing.T) { | |
dirName := t.TempDir() | |
amount := 2000 | |
valuesPerKey := 4 | |
sizePerValue := 32 | |
keys := make([][]byte, amount) | |
values := make([][][]byte, amount) | |
bucket, err := NewBucket(testCtx(), dirName, "", nullLogger(), nil, | |
cyclemanager.NewCallbackGroupNoop(), cyclemanager.NewCallbackGroupNoop(), | |
WithStrategy(StrategySetCollection), | |
WithMemtableThreshold(10000)) | |
require.Nil(t, err) | |
t.Run("generate random data", func(t *testing.T) { | |
for i := range keys { | |
uuid, err := uuid.New().MarshalBinary() | |
require.Nil(t, err) | |
keys[i] = uuid | |
values[i] = make([][]byte, valuesPerKey) | |
for j := range values[i] { | |
values[i][j] = make([]byte, sizePerValue) | |
rand.Read(values[i][j]) | |
} | |
} | |
}) | |
t.Run("import", func(t *testing.T) { | |
wg := sync.WaitGroup{} | |
for i := range keys { | |
time.Sleep(50 * time.Microsecond) | |
wg.Add(1) | |
go func(index int) { | |
defer wg.Done() | |
err := bucket.SetAdd(keys[index], values[index]) | |
assert.Nil(t, err) | |
}(i) | |
} | |
wg.Wait() | |
}) | |
t.Run("verify get", func(t *testing.T) { | |
correct := 0 | |
for i := range keys { | |
value, err := bucket.SetList(keys[i]) | |
assert.Nil(t, err) | |
if reflect.DeepEqual(values[i], value) { | |
correct++ | |
} | |
} | |
assert.Equal(t, amount, correct) | |
}) | |
t.Run("verify cursor", func(t *testing.T) { | |
correct := 0 | |
// put all key value/pairs in a map so we can access them by key | |
targets := map[string][][]byte{} | |
for i := range keys { | |
targets[string(keys[i])] = values[i] | |
} | |
c := bucket.SetCursor() | |
defer c.Close() | |
for k, v := c.First(); k != nil; k, v = c.Next() { | |
control := targets[string(k)] | |
if reflect.DeepEqual(control, v) { | |
correct++ | |
} | |
} | |
assert.Equal(t, amount, correct) | |
}) | |
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) | |
defer cancel() | |
require.Nil(t, bucket.Shutdown(ctx)) | |
} | |
// This test continuously writes into a bucket with a small memtable threshold, | |
// so that a lot of flushing is happening while writing. This is to ensure that | |
// there will be no lost writes or other inconsistencies under load | |
func TestConcurrentWriting_Map(t *testing.T) { | |
dirName := t.TempDir() | |
amount := 2000 | |
valuesPerKey := 4 | |
sizePerKey := 8 | |
sizePerValue := 32 | |
keys := make([][]byte, amount) | |
values := make([][]MapPair, amount) | |
bucket, err := NewBucket(testCtx(), dirName, "", nullLogger(), nil, | |
cyclemanager.NewCallbackGroupNoop(), cyclemanager.NewCallbackGroupNoop(), | |
WithStrategy(StrategyMapCollection), | |
WithMemtableThreshold(5000)) | |
require.Nil(t, err) | |
t.Run("generate random data", func(t *testing.T) { | |
for i := range keys { | |
uuid, err := uuid.New().MarshalBinary() | |
require.Nil(t, err) | |
keys[i] = uuid | |
values[i] = make([]MapPair, valuesPerKey) | |
for j := range values[i] { | |
values[i][j] = MapPair{ | |
Key: make([]byte, sizePerKey), | |
Value: make([]byte, sizePerValue), | |
} | |
rand.Read(values[i][j].Key) | |
rand.Read(values[i][j].Value) | |
} | |
} | |
}) | |
t.Run("import", func(t *testing.T) { | |
wg := sync.WaitGroup{} | |
for i := range keys { | |
for j := 0; j < valuesPerKey; j++ { | |
time.Sleep(50 * time.Microsecond) | |
wg.Add(1) | |
go func(rowIndex, valueIndex int) { | |
defer wg.Done() | |
err := bucket.MapSet(keys[rowIndex], values[rowIndex][valueIndex]) | |
assert.Nil(t, err) | |
}(i, j) | |
} | |
} | |
wg.Wait() | |
}) | |
t.Run("verify cursor", func(t *testing.T) { | |
correct := 0 | |
// put all key value/pairs in a map so we can access them by key | |
targets := map[string][]MapPair{} | |
for i := range keys { | |
targets[string(keys[i])] = values[i] | |
} | |
c := bucket.MapCursor() | |
defer c.Close() | |
for k, v := c.First(); k != nil; k, v = c.Next() { | |
control := targets[string(k)] | |
if mapElementsMatch(control, v) { | |
correct++ | |
} | |
} | |
assert.Equal(t, amount, correct) | |
}) | |
t.Run("verify get", func(t *testing.T) { | |
correct := 0 | |
for i := range keys { | |
value, err := bucket.MapList(keys[i]) | |
assert.Nil(t, err) | |
if mapElementsMatch(values[i], value) { | |
correct++ | |
} | |
} | |
assert.Equal(t, amount, correct) | |
}) | |
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) | |
defer cancel() | |
require.Nil(t, bucket.Shutdown(ctx)) | |
} | |
func mapElementsMatch(a, b []MapPair) bool { | |
if len(a) != len(b) { | |
return false | |
} | |
aMap := map[string][]byte{} | |
for _, kv := range a { | |
aMap[string(kv.Key)] = kv.Value | |
} | |
for _, kv := range b { | |
control := aMap[string(kv.Key)] | |
if !bytes.Equal(kv.Value, control) { | |
return false | |
} | |
} | |
return true | |
} | |