Spaces:
Running
Running
// _ _ | |
// __ _____ __ ___ ___ __ _| |_ ___ | |
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
// \ V V / __/ (_| |\ V /| | (_| | || __/ | |
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
// | |
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. | |
// | |
// CONTACT: [email protected] | |
// | |
package schema | |
import ( | |
"context" | |
"encoding/json" | |
"fmt" | |
"testing" | |
"github.com/sirupsen/logrus" | |
"github.com/sirupsen/logrus/hooks/test" | |
"github.com/stretchr/testify/assert" | |
"github.com/stretchr/testify/require" | |
"github.com/weaviate/weaviate/entities/models" | |
"github.com/weaviate/weaviate/entities/schema" | |
ucs "github.com/weaviate/weaviate/usecases/schema" | |
"github.com/weaviate/weaviate/usecases/sharding" | |
) | |
func TestRepositoryMigrate(t *testing.T) { | |
var ( | |
ctx = context.Background() | |
logger, _ = test.NewNullLogger() | |
dirName = t.TempDir() | |
canceledCtx, cancel = context.WithCancel(ctx) | |
) | |
cancel() | |
schema := ucs.NewState(3) | |
addClass(&schema, "C1", 0, 1, 0) | |
addClass(&schema, "C2", 0, 3, 3) | |
t.Run("SaveOldSchema", func(t *testing.T) { | |
repo, _ := newRepo(dirName, 0, logger) | |
defer repo.Close() | |
if err := repo.saveSchemaV1(schema); err != nil { | |
t.Fatalf("save all schema: %v", err) | |
} | |
}) | |
t.Run("LoadOldchema", func(t *testing.T) { | |
repo, err := newRepo(dirName, -1, logger) | |
if err != nil { | |
t.Fatalf("create new repo %v", err) | |
} | |
defer repo.Close() | |
_, err = repo.Load(canceledCtx) | |
assert.ErrorIs(t, err, context.Canceled) | |
state, err := repo.Load(ctx) | |
assert.Nil(t, err) | |
assert.Equal(t, schema, state) | |
}) | |
t.Run("LoadSchema", func(t *testing.T) { | |
repo, err := newRepo(dirName, -1, logger) | |
if err != nil { | |
t.Fatalf("create new repo %v", err) | |
} | |
defer repo.Close() | |
state, err := repo.Load(ctx) | |
assert.Nil(t, err) | |
assert.Equal(t, schema, state) | |
}) | |
t.Run("LoadSchemaWithHigherVersion", func(t *testing.T) { | |
_, err := newRepo(dirName, 1, logger) | |
assert.NotNil(t, err) | |
}) | |
} | |
func TestRepositorySaveLoad(t *testing.T) { | |
var ( | |
ctx = context.Background() | |
canceledCtx, cancel = context.WithCancel(ctx) | |
logger, _ = test.NewNullLogger() | |
dirName = t.TempDir() | |
) | |
cancel() | |
repo, err := newRepo(dirName, -1, logger) | |
if err != nil { | |
t.Fatalf("create new repo: %v", err) | |
} | |
// load empty schema | |
res, err := repo.Load(ctx) | |
if err != nil { | |
t.Fatalf("loading schema from empty file: %v", err) | |
} | |
if len(res.ShardingState) != 0 || len(res.ObjectSchema.Classes) != 0 { | |
t.Fatalf("expected empty schema got %v", res) | |
} | |
// save and load non empty schema | |
schema := ucs.NewState(3) | |
addClass(&schema, "C1", 0, 1, 0) | |
addClass(&schema, "C2", 0, 3, 3) | |
err = repo.Save(canceledCtx, schema) | |
assert.ErrorIs(t, err, context.Canceled) | |
if err = repo.Save(ctx, schema); err != nil { | |
t.Fatalf("save schema: %v", err) | |
} | |
if err = repo.Save(ctx, schema); err != nil { | |
t.Fatalf("save schema: %v", err) | |
} | |
res, err = repo.Load(context.Background()) | |
if err != nil { | |
t.Fatalf("load schema: %v", err) | |
} | |
assert.Equal(t, schema, res) | |
// delete class | |
deleteClass(&schema, "C2") | |
repo.DeleteClass(ctx, "C2") // second call to test impotency | |
if err := repo.DeleteClass(ctx, "C2"); err != nil { | |
t.Errorf("delete bucket: %v", err) | |
} | |
repo.asserEqualSchema(t, schema, "delete class") | |
} | |
func TestRepositoryUpdateClass(t *testing.T) { | |
var ( | |
ctx = context.Background() | |
logger, _ = test.NewNullLogger() | |
dirName = t.TempDir() | |
) | |
repo, err := newRepo(dirName, -1, logger) | |
if err != nil { | |
t.Fatalf("create new repo: %v", err) | |
} | |
// save and load non empty schema | |
schema := ucs.NewState(3) | |
cls, ss := addClass(&schema, "C1", 0, 1, 0) | |
payload, err := ucs.CreateClassPayload(cls, ss) | |
assert.Nil(t, err) | |
if err := repo.NewClass(ctx, payload); err != nil { | |
t.Fatalf("create new class: %v", err) | |
} | |
if err := repo.NewClass(ctx, payload); err == nil { | |
t.Fatal("create new class: must fail since class already exits") | |
} | |
repo.asserEqualSchema(t, schema, "create class") | |
// update class | |
deleteClass(&schema, "C1") | |
cls, ss = addClass(&schema, "C1", 0, 2, 1) | |
payload, err = ucs.CreateClassPayload(cls, ss) | |
assert.Nil(t, err) | |
payload.Name = "C3" | |
if err := repo.UpdateClass(ctx, payload); err == nil { | |
t.Fatal("updating class by adding shards to non existing class must fail") | |
} | |
payload.Name = "C1" | |
if err := repo.UpdateClass(ctx, payload); err != nil { | |
t.Errorf("update class: %v", err) | |
} | |
repo.asserEqualSchema(t, schema, "update class") | |
// overwrite class | |
deleteClass(&schema, "C1") | |
cls, ss = addClass(&schema, "C1", 2, 2, 3) | |
payload, err = ucs.CreateClassPayload(cls, ss) | |
assert.Nil(t, err) | |
payload.ReplaceShards = true | |
if err := repo.UpdateClass(ctx, payload); err != nil { | |
t.Errorf("update class: %v", err) | |
} | |
repo.asserEqualSchema(t, schema, "overwrite class") | |
// delete class | |
deleteClass(&schema, "C1") | |
repo.DeleteClass(ctx, "C1") // second call to test impotency | |
if err := repo.DeleteClass(ctx, "C1"); err != nil { | |
t.Errorf("delete bucket: %v", err) | |
} | |
repo.asserEqualSchema(t, schema, "delete class") | |
} | |
func TestRepositoryUpdateShards(t *testing.T) { | |
var ( | |
ctx = context.Background() | |
logger, _ = test.NewNullLogger() | |
dirName = t.TempDir() | |
) | |
repo, err := newRepo(dirName, -1, logger) | |
if err != nil { | |
t.Fatalf("create new repo: %v", err) | |
} | |
schema := ucs.NewState(2) | |
cls, ss := addClass(&schema, "C1", 0, 2, 1) | |
payload, err := ucs.CreateClassPayload(cls, ss) | |
assert.Nil(t, err) | |
if err := repo.NewClass(ctx, payload); err != nil { | |
t.Errorf("update class: %v", err) | |
} | |
repo.asserEqualSchema(t, schema, "update class") | |
// add two shards | |
deleteClass(&schema, "C1") | |
_, ss = addClass(&schema, "C1", 0, 2, 5) | |
shards := serializeShards(ss.Physical) | |
if err := repo.NewShards(ctx, "C1", shards); err != nil { | |
t.Fatalf("add new shards: %v", err) | |
} | |
if err := repo.NewShards(ctx, "C3", shards); err == nil { | |
t.Fatal("add new shards to a non existing class must fail") | |
} | |
repo.asserEqualSchema(t, schema, "adding new shards") | |
t.Run("fails updating non existent shards", func(t *testing.T) { | |
nonExistentShards := createShards(4, 2, models.TenantActivityStatusCOLD) | |
nonExistentShardPairs := serializeShards(nonExistentShards) | |
err := repo.UpdateShards(ctx, "C1", nonExistentShardPairs) | |
require.NotNil(t, err) | |
assert.ErrorContains(t, err, "shard not found") | |
}) | |
existentShards := createShards(3, 2, models.TenantActivityStatusCOLD) | |
existentShardPairs := serializeShards(existentShards) | |
t.Run("fails updating shards of non existent class", func(t *testing.T) { | |
err := repo.UpdateShards(ctx, "ClassNonExistent", existentShardPairs) | |
require.NotNil(t, err) | |
assert.ErrorContains(t, err, "class not found") | |
}) | |
t.Run("succeeds updating shards", func(t *testing.T) { | |
err := repo.UpdateShards(ctx, "C1", existentShardPairs) | |
require.Nil(t, err) | |
replaceShards(ss, existentShards) | |
repo.asserEqualSchema(t, schema, "update shards") | |
}) | |
xset := removeShards(ss, []int{0, 3, 4}) | |
if err := repo.DeleteShards(ctx, "C1", xset); err != nil { | |
t.Fatalf("delete shards: %v", err) | |
} | |
repo.asserEqualSchema(t, schema, "remove shards") | |
if err := repo.DeleteShards(ctx, "C3", xset); err != nil { | |
t.Fatalf("delete shards from unknown class: %v", err) | |
} | |
} | |
func createClass(name string, start, nProps, nShards int) (models.Class, sharding.State) { | |
cls := models.Class{Class: name} | |
for i := start; i < start+nProps; i++ { | |
prop := models.Property{ | |
Name: fmt.Sprintf("property-%d", i), | |
DataType: schema.DataTypeText.PropString(), | |
Tokenization: models.PropertyTokenizationWhitespace, | |
} | |
cls.Properties = append(cls.Properties, &prop) | |
} | |
ss := sharding.State{IndexID: name} | |
ss.Physical = createShards(start, nShards, models.TenantActivityStatusHOT) | |
return cls, ss | |
} | |
func createShards(start, nShards int, activityStatus string) map[string]sharding.Physical { | |
if nShards < 1 { | |
return nil | |
} | |
shards := make(map[string]sharding.Physical, nShards) | |
for i := start; i < start+nShards; i++ { | |
name := fmt.Sprintf("shard-%d", i) | |
node := fmt.Sprintf("node-%d", i) | |
shards[name] = sharding.Physical{ | |
Name: name, | |
BelongsToNodes: []string{node}, | |
Status: activityStatus, | |
} | |
} | |
return shards | |
} | |
func replaceShards(ss *sharding.State, shards map[string]sharding.Physical) { | |
for name, shard := range shards { | |
ss.Physical[name] = shard | |
} | |
} | |
func removeShards(ss *sharding.State, shards []int) []string { | |
res := make([]string, len(shards)) | |
for i, j := range shards { | |
name := fmt.Sprintf("shard-%d", j) | |
delete(ss.Physical, name) | |
res[i] = name | |
} | |
return res | |
} | |
func addClass(schema *ucs.State, name string, start, nProps, nShards int) (*models.Class, *sharding.State) { | |
cls, ss := createClass(name, start, nProps, nShards) | |
if schema.ObjectSchema == nil { | |
schema.ObjectSchema = &models.Schema{} | |
} | |
if schema.ShardingState == nil { | |
schema.ShardingState = make(map[string]*sharding.State) | |
} | |
schema.ObjectSchema.Classes = append(schema.ObjectSchema.Classes, &cls) | |
schema.ShardingState[name] = &ss | |
return &cls, &ss | |
} | |
func deleteClass(schema *ucs.State, name string) { | |
idx := -1 | |
for i, cls := range schema.ObjectSchema.Classes { | |
if cls.Class == name { | |
idx = i | |
break | |
} | |
} | |
if idx == -1 { | |
return | |
} | |
schema.ObjectSchema.Classes = append(schema.ObjectSchema.Classes[:idx], schema.ObjectSchema.Classes[idx+1:]...) | |
delete(schema.ShardingState, name) | |
} | |
func (r *store) asserEqualSchema(t *testing.T, expected ucs.State, msg string) { | |
t.Helper() | |
actual, err := r.Load(context.Background()) | |
if err != nil { | |
t.Fatalf("load schema: %s: %v", msg, err) | |
} | |
assert.Equal(t, expected, actual) | |
} | |
func serializeShards(shards map[string]sharding.Physical) []ucs.KeyValuePair { | |
xs := make([]ucs.KeyValuePair, 0, len(shards)) | |
for k, v := range shards { | |
val, _ := json.Marshal(&v) | |
xs = append(xs, ucs.KeyValuePair{Key: k, Value: val}) | |
} | |
return xs | |
} | |
func newRepo(homeDir string, version int, logger logrus.FieldLogger) (*store, error) { | |
r := NewStore(homeDir, logger) | |
if version > -1 { | |
r.version = version | |
} | |
return r, r.Open() | |
} | |