Spaces:
Sleeping
Sleeping
| // _ _ | |
| // __ _____ __ ___ ___ __ _| |_ ___ | |
| // \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
| // \ V V / __/ (_| |\ V /| | (_| | || __/ | |
| // \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
| // | |
| // Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. | |
| // | |
| // CONTACT: [email protected] | |
| // | |
| package scaler | |
| import ( | |
| "context" | |
| "os" | |
| "path" | |
| "strconv" | |
| "testing" | |
| "github.com/stretchr/testify/assert" | |
| "github.com/weaviate/weaviate/entities/backup" | |
| "github.com/weaviate/weaviate/usecases/sharding" | |
| ) | |
| func TestScalerScale(t *testing.T) { | |
| ctx := context.Background() | |
| t.Run("NoShardingState", func(t *testing.T) { | |
| f := newFakeFactory() | |
| f.ShardingState.M = nil | |
| scaler := f.Scaler("") | |
| old := sharding.Config{} | |
| _, err := scaler.Scale(ctx, "C", old, 1, 2) | |
| assert.NotNil(t, err) | |
| assert.Contains(t, err.Error(), "no sharding state") | |
| }) | |
| t.Run("SameReplicationFactor", func(t *testing.T) { | |
| scaler := newFakeFactory().Scaler("") | |
| old := sharding.Config{} | |
| _, err := scaler.Scale(ctx, "C", old, 2, 2) | |
| assert.Nil(t, err) | |
| }) | |
| t.Run("ScaleInNotSupported", func(t *testing.T) { | |
| scaler := newFakeFactory().Scaler("") | |
| old := sharding.Config{} | |
| _, err := scaler.Scale(ctx, "C", old, 2, 1) | |
| assert.NotNil(t, err) | |
| assert.Contains(t, err.Error(), "not supported") | |
| }) | |
| } | |
| func TestScalerScaleOut(t *testing.T) { | |
| var ( | |
| dataDir = t.TempDir() | |
| ctx = context.Background() | |
| cls = "C" | |
| old = sharding.Config{} | |
| bak = backup.ClassDescriptor{ | |
| Name: "C", | |
| Shards: []*backup.ShardDescriptor{ | |
| { | |
| Name: "S1", Files: []string{"f1"}, | |
| PropLengthTrackerPath: "f4", | |
| ShardVersionPath: "f4", | |
| DocIDCounterPath: "f4", | |
| }, | |
| }, | |
| } | |
| ) | |
| for i := 1; i < 5; i++ { | |
| file, err := os.Create(path.Join(dataDir, "f"+strconv.Itoa(i))) | |
| assert.Nil(t, err) | |
| file.Close() | |
| } | |
| t.Run("UnresolvedName", func(t *testing.T) { | |
| f := newFakeFactory() | |
| delete(f.NodeHostMap, "N3") | |
| scaler := f.Scaler(dataDir) | |
| _, err := scaler.Scale(ctx, "C", old, 1, 3) | |
| assert.ErrorIs(t, err, ErrUnresolvedName) | |
| }) | |
| t.Run("GetLocalShards", func(t *testing.T) { | |
| f := newFakeFactory() | |
| f.Source.On("ShardsBackup", anyVal, anyVal, cls, []string{"S1"}).Return(bak, errAny) | |
| // shard doesn't exist locally | |
| f.Client.On("IncreaseReplicationFactor", anyVal, "H3", cls, anyVal, anyVal).Return(nil) | |
| f.Client.On("IncreaseReplicationFactor", anyVal, "H4", cls, anyVal, anyVal).Return(nil) | |
| f.Source.On("ReleaseBackup", anyVal, anyVal, "C").Return(nil) | |
| scaler := f.Scaler(dataDir) | |
| _, err := scaler.Scale(ctx, "C", old, 1, 3) | |
| assert.ErrorIs(t, err, errAny) | |
| }) | |
| t.Run("IncreaseFactor", func(t *testing.T) { | |
| f := newFakeFactory() | |
| f.Source.On("ShardsBackup", anyVal, anyVal, cls, []string{"S1"}).Return(bak, nil) | |
| // sync update to remote node N2 | |
| f.Client.On("CreateShard", anyVal, "H2", cls, "S1").Return(nil) | |
| f.Client.On("PutFile", anyVal, "H2", cls, "S1", "f1", anyVal).Return(nil) | |
| f.Client.On("PutFile", anyVal, "H2", cls, "S1", "f4", anyVal).Return(nil) | |
| f.Client.On("ReInitShard", anyVal, "H2", cls, "S1").Return(nil) | |
| // sync update to remote node N3 | |
| f.Client.On("CreateShard", anyVal, "H3", cls, "S1").Return(nil) | |
| f.Client.On("PutFile", anyVal, "H3", cls, "S1", "f1", anyVal).Return(nil) | |
| f.Client.On("PutFile", anyVal, "H3", cls, "S1", "f4", anyVal).Return(nil) | |
| f.Client.On("ReInitShard", anyVal, "H3", cls, "S1").Return(nil) | |
| // shard doesn't exist locally | |
| f.Client.On("IncreaseReplicationFactor", anyVal, "H3", cls, anyVal, anyVal).Return(errAny) | |
| f.Client.On("IncreaseReplicationFactor", anyVal, "H4", cls, anyVal, anyVal).Return(nil) | |
| f.Source.On("ReleaseBackup", anyVal, anyVal, "C").Return(nil) | |
| scaler := f.Scaler(dataDir) | |
| _, err := scaler.Scale(ctx, "C", old, 1, 3) | |
| assert.ErrorIs(t, err, errAny) | |
| }) | |
| t.Run("Success", func(t *testing.T) { | |
| f := newFakeFactory() | |
| f.Source.On("ShardsBackup", anyVal, anyVal, cls, []string{"S1"}).Return(bak, nil) | |
| // sync update to remote node N2 | |
| f.Client.On("CreateShard", anyVal, "H2", cls, "S1").Return(nil) | |
| f.Client.On("PutFile", anyVal, "H2", cls, "S1", "f1", anyVal).Return(nil) | |
| f.Client.On("PutFile", anyVal, "H2", cls, "S1", "f4", anyVal).Return(nil) | |
| f.Client.On("ReInitShard", anyVal, "H2", cls, "S1").Return(nil) | |
| // sync update to remote node N3 | |
| f.Client.On("CreateShard", anyVal, "H3", cls, "S1").Return(nil) | |
| f.Client.On("PutFile", anyVal, "H3", cls, "S1", "f1", anyVal).Return(nil) | |
| f.Client.On("PutFile", anyVal, "H3", cls, "S1", "f4", anyVal).Return(nil) | |
| f.Client.On("ReInitShard", anyVal, "H3", cls, "S1").Return(nil) | |
| // shard doesn't exist locally | |
| f.Client.On("IncreaseReplicationFactor", anyVal, "H3", cls, anyVal, anyVal).Return(nil) | |
| f.Client.On("IncreaseReplicationFactor", anyVal, "H4", cls, anyVal, anyVal).Return(nil) | |
| f.Source.On("ReleaseBackup", anyVal, anyVal, "C").Return(nil) | |
| scaler := f.Scaler(dataDir) | |
| _, err := scaler.Scale(ctx, "C", old, 1, 3) | |
| assert.Nil(t, err) | |
| }) | |
| t.Run("ReleaseBackupAsync", func(t *testing.T) { | |
| f := newFakeFactory() | |
| f.Source.On("ShardsBackup", anyVal, anyVal, cls, []string{"S1"}).Return(bak, nil) | |
| // sync update to remote node N2 | |
| f.Client.On("CreateShard", anyVal, "H2", cls, "S1").Return(nil) | |
| f.Client.On("PutFile", anyVal, "H2", cls, "S1", "f1", anyVal).Return(nil) | |
| f.Client.On("PutFile", anyVal, "H2", cls, "S1", "f4", anyVal).Return(nil) | |
| f.Client.On("ReInitShard", anyVal, "H2", cls, "S1").Return(nil) | |
| // sync update to remote node N3 | |
| f.Client.On("CreateShard", anyVal, "H3", cls, "S1").Return(nil) | |
| f.Client.On("PutFile", anyVal, "H3", cls, "S1", "f1", anyVal).Return(nil) | |
| f.Client.On("PutFile", anyVal, "H3", cls, "S1", "f4", anyVal).Return(nil) | |
| f.Client.On("ReInitShard", anyVal, "H3", cls, "S1").Return(nil) | |
| // shard doesn't exist locally | |
| f.Client.On("IncreaseReplicationFactor", anyVal, "H3", cls, anyVal, anyVal).Return(nil) | |
| f.Client.On("IncreaseReplicationFactor", anyVal, "H4", cls, anyVal, anyVal).Return(nil) | |
| f.Source.On("ReleaseBackup", anyVal, anyVal, "C").Return(errAny) | |
| scaler := f.Scaler(dataDir) | |
| _, err := scaler.Scale(ctx, "C", old, 1, 3) | |
| assert.Nil(t, err) | |
| }) | |
| } | |