KevinStephenson
Adding in weaviate code
b110593
raw
history blame
6.34 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package scaler
import (
"context"
"os"
"path"
"strconv"
"testing"
"github.com/stretchr/testify/assert"
"github.com/weaviate/weaviate/entities/backup"
"github.com/weaviate/weaviate/usecases/sharding"
)
func TestScalerScale(t *testing.T) {
ctx := context.Background()
t.Run("NoShardingState", func(t *testing.T) {
f := newFakeFactory()
f.ShardingState.M = nil
scaler := f.Scaler("")
old := sharding.Config{}
_, err := scaler.Scale(ctx, "C", old, 1, 2)
assert.NotNil(t, err)
assert.Contains(t, err.Error(), "no sharding state")
})
t.Run("SameReplicationFactor", func(t *testing.T) {
scaler := newFakeFactory().Scaler("")
old := sharding.Config{}
_, err := scaler.Scale(ctx, "C", old, 2, 2)
assert.Nil(t, err)
})
t.Run("ScaleInNotSupported", func(t *testing.T) {
scaler := newFakeFactory().Scaler("")
old := sharding.Config{}
_, err := scaler.Scale(ctx, "C", old, 2, 1)
assert.NotNil(t, err)
assert.Contains(t, err.Error(), "not supported")
})
}
func TestScalerScaleOut(t *testing.T) {
var (
dataDir = t.TempDir()
ctx = context.Background()
cls = "C"
old = sharding.Config{}
bak = backup.ClassDescriptor{
Name: "C",
Shards: []*backup.ShardDescriptor{
{
Name: "S1", Files: []string{"f1"},
PropLengthTrackerPath: "f4",
ShardVersionPath: "f4",
DocIDCounterPath: "f4",
},
},
}
)
for i := 1; i < 5; i++ {
file, err := os.Create(path.Join(dataDir, "f"+strconv.Itoa(i)))
assert.Nil(t, err)
file.Close()
}
t.Run("UnresolvedName", func(t *testing.T) {
f := newFakeFactory()
delete(f.NodeHostMap, "N3")
scaler := f.Scaler(dataDir)
_, err := scaler.Scale(ctx, "C", old, 1, 3)
assert.ErrorIs(t, err, ErrUnresolvedName)
})
t.Run("GetLocalShards", func(t *testing.T) {
f := newFakeFactory()
f.Source.On("ShardsBackup", anyVal, anyVal, cls, []string{"S1"}).Return(bak, errAny)
// shard doesn't exist locally
f.Client.On("IncreaseReplicationFactor", anyVal, "H3", cls, anyVal, anyVal).Return(nil)
f.Client.On("IncreaseReplicationFactor", anyVal, "H4", cls, anyVal, anyVal).Return(nil)
f.Source.On("ReleaseBackup", anyVal, anyVal, "C").Return(nil)
scaler := f.Scaler(dataDir)
_, err := scaler.Scale(ctx, "C", old, 1, 3)
assert.ErrorIs(t, err, errAny)
})
t.Run("IncreaseFactor", func(t *testing.T) {
f := newFakeFactory()
f.Source.On("ShardsBackup", anyVal, anyVal, cls, []string{"S1"}).Return(bak, nil)
// sync update to remote node N2
f.Client.On("CreateShard", anyVal, "H2", cls, "S1").Return(nil)
f.Client.On("PutFile", anyVal, "H2", cls, "S1", "f1", anyVal).Return(nil)
f.Client.On("PutFile", anyVal, "H2", cls, "S1", "f4", anyVal).Return(nil)
f.Client.On("ReInitShard", anyVal, "H2", cls, "S1").Return(nil)
// sync update to remote node N3
f.Client.On("CreateShard", anyVal, "H3", cls, "S1").Return(nil)
f.Client.On("PutFile", anyVal, "H3", cls, "S1", "f1", anyVal).Return(nil)
f.Client.On("PutFile", anyVal, "H3", cls, "S1", "f4", anyVal).Return(nil)
f.Client.On("ReInitShard", anyVal, "H3", cls, "S1").Return(nil)
// shard doesn't exist locally
f.Client.On("IncreaseReplicationFactor", anyVal, "H3", cls, anyVal, anyVal).Return(errAny)
f.Client.On("IncreaseReplicationFactor", anyVal, "H4", cls, anyVal, anyVal).Return(nil)
f.Source.On("ReleaseBackup", anyVal, anyVal, "C").Return(nil)
scaler := f.Scaler(dataDir)
_, err := scaler.Scale(ctx, "C", old, 1, 3)
assert.ErrorIs(t, err, errAny)
})
t.Run("Success", func(t *testing.T) {
f := newFakeFactory()
f.Source.On("ShardsBackup", anyVal, anyVal, cls, []string{"S1"}).Return(bak, nil)
// sync update to remote node N2
f.Client.On("CreateShard", anyVal, "H2", cls, "S1").Return(nil)
f.Client.On("PutFile", anyVal, "H2", cls, "S1", "f1", anyVal).Return(nil)
f.Client.On("PutFile", anyVal, "H2", cls, "S1", "f4", anyVal).Return(nil)
f.Client.On("ReInitShard", anyVal, "H2", cls, "S1").Return(nil)
// sync update to remote node N3
f.Client.On("CreateShard", anyVal, "H3", cls, "S1").Return(nil)
f.Client.On("PutFile", anyVal, "H3", cls, "S1", "f1", anyVal).Return(nil)
f.Client.On("PutFile", anyVal, "H3", cls, "S1", "f4", anyVal).Return(nil)
f.Client.On("ReInitShard", anyVal, "H3", cls, "S1").Return(nil)
// shard doesn't exist locally
f.Client.On("IncreaseReplicationFactor", anyVal, "H3", cls, anyVal, anyVal).Return(nil)
f.Client.On("IncreaseReplicationFactor", anyVal, "H4", cls, anyVal, anyVal).Return(nil)
f.Source.On("ReleaseBackup", anyVal, anyVal, "C").Return(nil)
scaler := f.Scaler(dataDir)
_, err := scaler.Scale(ctx, "C", old, 1, 3)
assert.Nil(t, err)
})
t.Run("ReleaseBackupAsync", func(t *testing.T) {
f := newFakeFactory()
f.Source.On("ShardsBackup", anyVal, anyVal, cls, []string{"S1"}).Return(bak, nil)
// sync update to remote node N2
f.Client.On("CreateShard", anyVal, "H2", cls, "S1").Return(nil)
f.Client.On("PutFile", anyVal, "H2", cls, "S1", "f1", anyVal).Return(nil)
f.Client.On("PutFile", anyVal, "H2", cls, "S1", "f4", anyVal).Return(nil)
f.Client.On("ReInitShard", anyVal, "H2", cls, "S1").Return(nil)
// sync update to remote node N3
f.Client.On("CreateShard", anyVal, "H3", cls, "S1").Return(nil)
f.Client.On("PutFile", anyVal, "H3", cls, "S1", "f1", anyVal).Return(nil)
f.Client.On("PutFile", anyVal, "H3", cls, "S1", "f4", anyVal).Return(nil)
f.Client.On("ReInitShard", anyVal, "H3", cls, "S1").Return(nil)
// shard doesn't exist locally
f.Client.On("IncreaseReplicationFactor", anyVal, "H3", cls, anyVal, anyVal).Return(nil)
f.Client.On("IncreaseReplicationFactor", anyVal, "H4", cls, anyVal, anyVal).Return(nil)
f.Source.On("ReleaseBackup", anyVal, anyVal, "C").Return(errAny)
scaler := f.Scaler(dataDir)
_, err := scaler.Scale(ctx, "C", old, 1, 3)
assert.Nil(t, err)
})
}