SemanticSearchPOC / adapters /repos /db /lsmkv /concurrent_writing_integration_test.go
KevinStephenson
Adding in weaviate code
b110593
raw
history blame
7.89 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
//go:build integrationTest
// +build integrationTest
package lsmkv
import (
"bytes"
"context"
"crypto/rand"
"fmt"
"reflect"
"sync"
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaviate/weaviate/entities/cyclemanager"
)
// This test continuously writes into a bucket with a small memtable threshold,
// so that a lot of flushing is happening while writing. This is to ensure that
// there will be no lost writes or other inconsistencies under load
func TestConcurrentWriting_Replace(t *testing.T) {
dirName := t.TempDir()
amount := 2000
sizePerValue := 128
keys := make([][]byte, amount)
values := make([][]byte, amount)
bucket, err := NewBucket(testCtx(), dirName, "", nullLogger(), nil,
cyclemanager.NewCallbackGroupNoop(), cyclemanager.NewCallbackGroupNoop(),
WithStrategy(StrategyReplace),
WithMemtableThreshold(10000))
require.Nil(t, err)
t.Run("generate random data", func(t *testing.T) {
for i := range keys {
uuid, err := uuid.New().MarshalBinary()
require.Nil(t, err)
keys[i] = uuid
values[i] = make([]byte, sizePerValue)
rand.Read(values[i])
}
})
t.Run("import", func(t *testing.T) {
wg := sync.WaitGroup{}
for i := range keys {
time.Sleep(50 * time.Microsecond)
wg.Add(1)
go func(index int) {
defer wg.Done()
err := bucket.Put(keys[index], values[index])
assert.Nil(t, err)
}(i)
}
wg.Wait()
})
t.Run("verify get", func(t *testing.T) {
correct := 0
var missingKeys []int
for i := range keys {
value, err := bucket.Get(keys[i])
assert.Nil(t, err)
if bytes.Equal(values[i], value) {
correct++
} else {
missingKeys = append(missingKeys, i)
}
}
if len(missingKeys) > 0 {
fmt.Printf("missing keys: %v\n", missingKeys)
}
assert.Equal(t, amount, correct)
})
t.Run("verify cursor", func(t *testing.T) {
correct := 0
// put all key value/pairs in a map so we can access them by key
targets := map[string][]byte{}
for i := range keys {
targets[string(keys[i])] = values[i]
}
c := bucket.Cursor()
defer c.Close()
for k, v := c.First(); k != nil; k, v = c.Next() {
control := targets[string(k)]
if bytes.Equal(control, v) {
correct++
}
}
assert.Equal(t, amount, correct)
})
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
require.Nil(t, bucket.Shutdown(ctx))
}
// This test continuously writes into a bucket with a small memtable threshold,
// so that a lot of flushing is happening while writing. This is to ensure that
// there will be no lost writes or other inconsistencies under load
func TestConcurrentWriting_Set(t *testing.T) {
dirName := t.TempDir()
amount := 2000
valuesPerKey := 4
sizePerValue := 32
keys := make([][]byte, amount)
values := make([][][]byte, amount)
bucket, err := NewBucket(testCtx(), dirName, "", nullLogger(), nil,
cyclemanager.NewCallbackGroupNoop(), cyclemanager.NewCallbackGroupNoop(),
WithStrategy(StrategySetCollection),
WithMemtableThreshold(10000))
require.Nil(t, err)
t.Run("generate random data", func(t *testing.T) {
for i := range keys {
uuid, err := uuid.New().MarshalBinary()
require.Nil(t, err)
keys[i] = uuid
values[i] = make([][]byte, valuesPerKey)
for j := range values[i] {
values[i][j] = make([]byte, sizePerValue)
rand.Read(values[i][j])
}
}
})
t.Run("import", func(t *testing.T) {
wg := sync.WaitGroup{}
for i := range keys {
time.Sleep(50 * time.Microsecond)
wg.Add(1)
go func(index int) {
defer wg.Done()
err := bucket.SetAdd(keys[index], values[index])
assert.Nil(t, err)
}(i)
}
wg.Wait()
})
t.Run("verify get", func(t *testing.T) {
correct := 0
for i := range keys {
value, err := bucket.SetList(keys[i])
assert.Nil(t, err)
if reflect.DeepEqual(values[i], value) {
correct++
}
}
assert.Equal(t, amount, correct)
})
t.Run("verify cursor", func(t *testing.T) {
correct := 0
// put all key value/pairs in a map so we can access them by key
targets := map[string][][]byte{}
for i := range keys {
targets[string(keys[i])] = values[i]
}
c := bucket.SetCursor()
defer c.Close()
for k, v := c.First(); k != nil; k, v = c.Next() {
control := targets[string(k)]
if reflect.DeepEqual(control, v) {
correct++
}
}
assert.Equal(t, amount, correct)
})
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
require.Nil(t, bucket.Shutdown(ctx))
}
// This test continuously writes into a bucket with a small memtable threshold,
// so that a lot of flushing is happening while writing. This is to ensure that
// there will be no lost writes or other inconsistencies under load
func TestConcurrentWriting_Map(t *testing.T) {
dirName := t.TempDir()
amount := 2000
valuesPerKey := 4
sizePerKey := 8
sizePerValue := 32
keys := make([][]byte, amount)
values := make([][]MapPair, amount)
bucket, err := NewBucket(testCtx(), dirName, "", nullLogger(), nil,
cyclemanager.NewCallbackGroupNoop(), cyclemanager.NewCallbackGroupNoop(),
WithStrategy(StrategyMapCollection),
WithMemtableThreshold(5000))
require.Nil(t, err)
t.Run("generate random data", func(t *testing.T) {
for i := range keys {
uuid, err := uuid.New().MarshalBinary()
require.Nil(t, err)
keys[i] = uuid
values[i] = make([]MapPair, valuesPerKey)
for j := range values[i] {
values[i][j] = MapPair{
Key: make([]byte, sizePerKey),
Value: make([]byte, sizePerValue),
}
rand.Read(values[i][j].Key)
rand.Read(values[i][j].Value)
}
}
})
t.Run("import", func(t *testing.T) {
wg := sync.WaitGroup{}
for i := range keys {
for j := 0; j < valuesPerKey; j++ {
time.Sleep(50 * time.Microsecond)
wg.Add(1)
go func(rowIndex, valueIndex int) {
defer wg.Done()
err := bucket.MapSet(keys[rowIndex], values[rowIndex][valueIndex])
assert.Nil(t, err)
}(i, j)
}
}
wg.Wait()
})
t.Run("verify cursor", func(t *testing.T) {
correct := 0
// put all key value/pairs in a map so we can access them by key
targets := map[string][]MapPair{}
for i := range keys {
targets[string(keys[i])] = values[i]
}
c := bucket.MapCursor()
defer c.Close()
for k, v := c.First(); k != nil; k, v = c.Next() {
control := targets[string(k)]
if mapElementsMatch(control, v) {
correct++
}
}
assert.Equal(t, amount, correct)
})
t.Run("verify get", func(t *testing.T) {
correct := 0
for i := range keys {
value, err := bucket.MapList(keys[i])
assert.Nil(t, err)
if mapElementsMatch(values[i], value) {
correct++
}
}
assert.Equal(t, amount, correct)
})
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
require.Nil(t, bucket.Shutdown(ctx))
}
func mapElementsMatch(a, b []MapPair) bool {
if len(a) != len(b) {
return false
}
aMap := map[string][]byte{}
for _, kv := range a {
aMap[string(kv.Key)] = kv.Value
}
for _, kv := range b {
control := aMap[string(kv.Key)]
if !bytes.Equal(kv.Value, control) {
return false
}
}
return true
}