Spaces:
Running
Running
// _ _ | |
// __ _____ __ ___ ___ __ _| |_ ___ | |
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
// \ V V / __/ (_| |\ V /| | (_| | || __/ | |
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
// | |
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. | |
// | |
// CONTACT: [email protected] | |
// | |
package scaler | |
import ( | |
"context" | |
"os" | |
"path" | |
"strconv" | |
"testing" | |
"github.com/stretchr/testify/assert" | |
"github.com/weaviate/weaviate/entities/backup" | |
"github.com/weaviate/weaviate/usecases/sharding" | |
) | |
func TestScalerScale(t *testing.T) { | |
ctx := context.Background() | |
t.Run("NoShardingState", func(t *testing.T) { | |
f := newFakeFactory() | |
f.ShardingState.M = nil | |
scaler := f.Scaler("") | |
old := sharding.Config{} | |
_, err := scaler.Scale(ctx, "C", old, 1, 2) | |
assert.NotNil(t, err) | |
assert.Contains(t, err.Error(), "no sharding state") | |
}) | |
t.Run("SameReplicationFactor", func(t *testing.T) { | |
scaler := newFakeFactory().Scaler("") | |
old := sharding.Config{} | |
_, err := scaler.Scale(ctx, "C", old, 2, 2) | |
assert.Nil(t, err) | |
}) | |
t.Run("ScaleInNotSupported", func(t *testing.T) { | |
scaler := newFakeFactory().Scaler("") | |
old := sharding.Config{} | |
_, err := scaler.Scale(ctx, "C", old, 2, 1) | |
assert.NotNil(t, err) | |
assert.Contains(t, err.Error(), "not supported") | |
}) | |
} | |
func TestScalerScaleOut(t *testing.T) { | |
var ( | |
dataDir = t.TempDir() | |
ctx = context.Background() | |
cls = "C" | |
old = sharding.Config{} | |
bak = backup.ClassDescriptor{ | |
Name: "C", | |
Shards: []*backup.ShardDescriptor{ | |
{ | |
Name: "S1", Files: []string{"f1"}, | |
PropLengthTrackerPath: "f4", | |
ShardVersionPath: "f4", | |
DocIDCounterPath: "f4", | |
}, | |
}, | |
} | |
) | |
for i := 1; i < 5; i++ { | |
file, err := os.Create(path.Join(dataDir, "f"+strconv.Itoa(i))) | |
assert.Nil(t, err) | |
file.Close() | |
} | |
t.Run("UnresolvedName", func(t *testing.T) { | |
f := newFakeFactory() | |
delete(f.NodeHostMap, "N3") | |
scaler := f.Scaler(dataDir) | |
_, err := scaler.Scale(ctx, "C", old, 1, 3) | |
assert.ErrorIs(t, err, ErrUnresolvedName) | |
}) | |
t.Run("GetLocalShards", func(t *testing.T) { | |
f := newFakeFactory() | |
f.Source.On("ShardsBackup", anyVal, anyVal, cls, []string{"S1"}).Return(bak, errAny) | |
// shard doesn't exist locally | |
f.Client.On("IncreaseReplicationFactor", anyVal, "H3", cls, anyVal, anyVal).Return(nil) | |
f.Client.On("IncreaseReplicationFactor", anyVal, "H4", cls, anyVal, anyVal).Return(nil) | |
f.Source.On("ReleaseBackup", anyVal, anyVal, "C").Return(nil) | |
scaler := f.Scaler(dataDir) | |
_, err := scaler.Scale(ctx, "C", old, 1, 3) | |
assert.ErrorIs(t, err, errAny) | |
}) | |
t.Run("IncreaseFactor", func(t *testing.T) { | |
f := newFakeFactory() | |
f.Source.On("ShardsBackup", anyVal, anyVal, cls, []string{"S1"}).Return(bak, nil) | |
// sync update to remote node N2 | |
f.Client.On("CreateShard", anyVal, "H2", cls, "S1").Return(nil) | |
f.Client.On("PutFile", anyVal, "H2", cls, "S1", "f1", anyVal).Return(nil) | |
f.Client.On("PutFile", anyVal, "H2", cls, "S1", "f4", anyVal).Return(nil) | |
f.Client.On("ReInitShard", anyVal, "H2", cls, "S1").Return(nil) | |
// sync update to remote node N3 | |
f.Client.On("CreateShard", anyVal, "H3", cls, "S1").Return(nil) | |
f.Client.On("PutFile", anyVal, "H3", cls, "S1", "f1", anyVal).Return(nil) | |
f.Client.On("PutFile", anyVal, "H3", cls, "S1", "f4", anyVal).Return(nil) | |
f.Client.On("ReInitShard", anyVal, "H3", cls, "S1").Return(nil) | |
// shard doesn't exist locally | |
f.Client.On("IncreaseReplicationFactor", anyVal, "H3", cls, anyVal, anyVal).Return(errAny) | |
f.Client.On("IncreaseReplicationFactor", anyVal, "H4", cls, anyVal, anyVal).Return(nil) | |
f.Source.On("ReleaseBackup", anyVal, anyVal, "C").Return(nil) | |
scaler := f.Scaler(dataDir) | |
_, err := scaler.Scale(ctx, "C", old, 1, 3) | |
assert.ErrorIs(t, err, errAny) | |
}) | |
t.Run("Success", func(t *testing.T) { | |
f := newFakeFactory() | |
f.Source.On("ShardsBackup", anyVal, anyVal, cls, []string{"S1"}).Return(bak, nil) | |
// sync update to remote node N2 | |
f.Client.On("CreateShard", anyVal, "H2", cls, "S1").Return(nil) | |
f.Client.On("PutFile", anyVal, "H2", cls, "S1", "f1", anyVal).Return(nil) | |
f.Client.On("PutFile", anyVal, "H2", cls, "S1", "f4", anyVal).Return(nil) | |
f.Client.On("ReInitShard", anyVal, "H2", cls, "S1").Return(nil) | |
// sync update to remote node N3 | |
f.Client.On("CreateShard", anyVal, "H3", cls, "S1").Return(nil) | |
f.Client.On("PutFile", anyVal, "H3", cls, "S1", "f1", anyVal).Return(nil) | |
f.Client.On("PutFile", anyVal, "H3", cls, "S1", "f4", anyVal).Return(nil) | |
f.Client.On("ReInitShard", anyVal, "H3", cls, "S1").Return(nil) | |
// shard doesn't exist locally | |
f.Client.On("IncreaseReplicationFactor", anyVal, "H3", cls, anyVal, anyVal).Return(nil) | |
f.Client.On("IncreaseReplicationFactor", anyVal, "H4", cls, anyVal, anyVal).Return(nil) | |
f.Source.On("ReleaseBackup", anyVal, anyVal, "C").Return(nil) | |
scaler := f.Scaler(dataDir) | |
_, err := scaler.Scale(ctx, "C", old, 1, 3) | |
assert.Nil(t, err) | |
}) | |
t.Run("ReleaseBackupAsync", func(t *testing.T) { | |
f := newFakeFactory() | |
f.Source.On("ShardsBackup", anyVal, anyVal, cls, []string{"S1"}).Return(bak, nil) | |
// sync update to remote node N2 | |
f.Client.On("CreateShard", anyVal, "H2", cls, "S1").Return(nil) | |
f.Client.On("PutFile", anyVal, "H2", cls, "S1", "f1", anyVal).Return(nil) | |
f.Client.On("PutFile", anyVal, "H2", cls, "S1", "f4", anyVal).Return(nil) | |
f.Client.On("ReInitShard", anyVal, "H2", cls, "S1").Return(nil) | |
// sync update to remote node N3 | |
f.Client.On("CreateShard", anyVal, "H3", cls, "S1").Return(nil) | |
f.Client.On("PutFile", anyVal, "H3", cls, "S1", "f1", anyVal).Return(nil) | |
f.Client.On("PutFile", anyVal, "H3", cls, "S1", "f4", anyVal).Return(nil) | |
f.Client.On("ReInitShard", anyVal, "H3", cls, "S1").Return(nil) | |
// shard doesn't exist locally | |
f.Client.On("IncreaseReplicationFactor", anyVal, "H3", cls, anyVal, anyVal).Return(nil) | |
f.Client.On("IncreaseReplicationFactor", anyVal, "H4", cls, anyVal, anyVal).Return(nil) | |
f.Source.On("ReleaseBackup", anyVal, anyVal, "C").Return(errAny) | |
scaler := f.Scaler(dataDir) | |
_, err := scaler.Scale(ctx, "C", old, 1, 3) | |
assert.Nil(t, err) | |
}) | |
} | |