Spaces:
Running
Running
// _ _ | |
// __ _____ __ ___ ___ __ _| |_ ___ | |
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
// \ V V / __/ (_| |\ V /| | (_| | || __/ | |
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
// | |
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. | |
// | |
// CONTACT: [email protected] | |
// | |
package schema | |
import ( | |
"context" | |
"encoding/json" | |
"testing" | |
"github.com/stretchr/testify/assert" | |
"github.com/stretchr/testify/require" | |
"github.com/weaviate/weaviate/entities/models" | |
"github.com/weaviate/weaviate/entities/schema" | |
"github.com/weaviate/weaviate/entities/schema/test_utils" | |
"github.com/weaviate/weaviate/usecases/cluster" | |
"github.com/weaviate/weaviate/usecases/sharding" | |
) | |
func TestIncommingTxCommit(t *testing.T) { | |
type test struct { | |
name string | |
before func(t *testing.T, SM *Manager) | |
tx *cluster.Transaction | |
assertSchema func(t *testing.T, sm *Manager) | |
expectedErrContains string | |
} | |
vFalse := false | |
vTrue := true | |
propertyName := "object_prop" | |
objectProperty := &models.Property{ | |
Name: propertyName, | |
DataType: schema.DataTypeObject.PropString(), | |
IndexFilterable: &vTrue, | |
IndexSearchable: &vFalse, | |
Tokenization: "", | |
NestedProperties: []*models.NestedProperty{ | |
{ | |
Name: "nested_int", | |
DataType: schema.DataTypeInt.PropString(), | |
IndexFilterable: &vTrue, | |
IndexSearchable: &vFalse, | |
Tokenization: "", | |
}, | |
{ | |
Name: "nested_text", | |
DataType: schema.DataTypeText.PropString(), | |
IndexFilterable: &vTrue, | |
IndexSearchable: &vTrue, | |
Tokenization: models.PropertyTokenizationWord, | |
}, | |
{ | |
Name: "nested_objects", | |
DataType: schema.DataTypeObjectArray.PropString(), | |
IndexFilterable: &vTrue, | |
IndexSearchable: &vFalse, | |
Tokenization: "", | |
NestedProperties: []*models.NestedProperty{ | |
{ | |
Name: "nested_bool_lvl2", | |
DataType: schema.DataTypeBoolean.PropString(), | |
IndexFilterable: &vTrue, | |
IndexSearchable: &vFalse, | |
Tokenization: "", | |
}, | |
{ | |
Name: "nested_numbers_lvl2", | |
DataType: schema.DataTypeNumberArray.PropString(), | |
IndexFilterable: &vTrue, | |
IndexSearchable: &vFalse, | |
Tokenization: "", | |
}, | |
}, | |
}, | |
}, | |
} | |
updatedObjectProperty := &models.Property{ | |
Name: propertyName, | |
DataType: schema.DataTypeObject.PropString(), | |
IndexFilterable: &vFalse, // different setting than existing class/prop | |
IndexSearchable: &vFalse, | |
Tokenization: "", | |
NestedProperties: []*models.NestedProperty{ | |
{ | |
Name: "nested_number", | |
DataType: schema.DataTypeNumber.PropString(), | |
IndexFilterable: &vTrue, | |
IndexSearchable: &vFalse, | |
Tokenization: "", | |
}, | |
{ | |
Name: "nested_text", | |
DataType: schema.DataTypeText.PropString(), | |
IndexFilterable: &vTrue, | |
IndexSearchable: &vTrue, | |
Tokenization: models.PropertyTokenizationField, // different setting than existing class/prop | |
}, | |
{ | |
Name: "nested_objects", | |
DataType: schema.DataTypeObjectArray.PropString(), | |
IndexFilterable: &vTrue, | |
IndexSearchable: &vFalse, | |
Tokenization: "", | |
NestedProperties: []*models.NestedProperty{ | |
{ | |
Name: "nested_date_lvl2", | |
DataType: schema.DataTypeDate.PropString(), | |
IndexFilterable: &vTrue, | |
IndexSearchable: &vFalse, | |
Tokenization: "", | |
}, | |
{ | |
Name: "nested_numbers_lvl2", | |
DataType: schema.DataTypeNumberArray.PropString(), | |
IndexFilterable: &vFalse, // different setting than existing class/prop | |
IndexSearchable: &vFalse, | |
Tokenization: "", | |
}, | |
}, | |
}, | |
}, | |
} | |
expectedObjectProperty := &models.Property{ | |
Name: propertyName, | |
DataType: schema.DataTypeObject.PropString(), | |
IndexFilterable: &vTrue, | |
IndexSearchable: &vFalse, | |
Tokenization: "", | |
NestedProperties: []*models.NestedProperty{ | |
{ | |
Name: "nested_int", | |
DataType: schema.DataTypeInt.PropString(), | |
IndexFilterable: &vTrue, | |
IndexSearchable: &vFalse, | |
Tokenization: "", | |
}, | |
{ | |
Name: "nested_number", | |
DataType: schema.DataTypeNumber.PropString(), | |
IndexFilterable: &vTrue, | |
IndexSearchable: &vFalse, | |
Tokenization: "", | |
}, | |
{ | |
Name: "nested_text", | |
DataType: schema.DataTypeText.PropString(), | |
IndexFilterable: &vTrue, | |
IndexSearchable: &vTrue, | |
Tokenization: models.PropertyTokenizationWord, // from existing class/prop | |
}, | |
{ | |
Name: "nested_objects", | |
DataType: schema.DataTypeObjectArray.PropString(), | |
IndexFilterable: &vTrue, | |
IndexSearchable: &vFalse, | |
Tokenization: "", | |
NestedProperties: []*models.NestedProperty{ | |
{ | |
Name: "nested_bool_lvl2", | |
DataType: schema.DataTypeBoolean.PropString(), | |
IndexFilterable: &vTrue, | |
IndexSearchable: &vFalse, | |
Tokenization: "", | |
}, | |
{ | |
Name: "nested_date_lvl2", | |
DataType: schema.DataTypeDate.PropString(), | |
IndexFilterable: &vTrue, | |
IndexSearchable: &vFalse, | |
Tokenization: "", | |
}, | |
{ | |
Name: "nested_numbers_lvl2", | |
DataType: schema.DataTypeNumberArray.PropString(), | |
IndexFilterable: &vTrue, // from existing class/prop | |
IndexSearchable: &vFalse, | |
Tokenization: "", | |
}, | |
}, | |
}, | |
}, | |
} | |
tests := []test{ | |
{ | |
name: "successful add class", | |
tx: &cluster.Transaction{ | |
Type: AddClass, | |
Payload: AddClassPayload{ | |
Class: &models.Class{ | |
Class: "SecondClass", | |
VectorIndexType: "hnsw", | |
}, | |
State: &sharding.State{}, | |
}, | |
}, | |
assertSchema: func(t *testing.T, sm *Manager) { | |
class, err := sm.GetClass(context.Background(), nil, "SecondClass") | |
require.Nil(t, err) | |
assert.Equal(t, "SecondClass", class.Class) | |
}, | |
}, | |
{ | |
name: "add class with incorrect payload", | |
tx: &cluster.Transaction{ | |
Type: AddClass, | |
Payload: "wrong-payload", | |
}, | |
expectedErrContains: "expected commit payload to be", | |
}, | |
{ | |
name: "add class with vector parse error", | |
tx: &cluster.Transaction{ | |
Type: AddClass, | |
Payload: AddClassPayload{ | |
Class: &models.Class{ | |
Class: "SecondClass", | |
VectorIndexType: "some-weird-pq-based-index", | |
}, | |
State: &sharding.State{}, | |
}, | |
}, | |
expectedErrContains: "unsupported vector index type", | |
}, | |
{ | |
name: "add class with sharding parse error", | |
tx: &cluster.Transaction{ | |
Type: AddClass, | |
Payload: AddClassPayload{ | |
Class: &models.Class{ | |
Class: "SecondClass", | |
VectorIndexType: "hnsw", | |
ShardingConfig: "this-cant-be-a-string", | |
}, | |
State: &sharding.State{}, | |
}, | |
}, | |
expectedErrContains: "parse sharding config", | |
}, | |
{ | |
name: "successful add property", | |
tx: &cluster.Transaction{ | |
Type: AddProperty, | |
Payload: AddPropertyPayload{ | |
ClassName: "FirstClass", | |
Property: &models.Property{ | |
DataType: schema.DataTypeText.PropString(), | |
Tokenization: models.PropertyTokenizationWhitespace, | |
Name: "new_prop", | |
}, | |
}, | |
}, | |
assertSchema: func(t *testing.T, sm *Manager) { | |
class, err := sm.GetClass(context.Background(), nil, "FirstClass") | |
require.Nil(t, err) | |
assert.Equal(t, "new_prop", class.Properties[0].Name) | |
}, | |
}, | |
{ | |
name: "add property with incorrect payload", | |
tx: &cluster.Transaction{ | |
Type: AddProperty, | |
Payload: "wrong-payload", | |
}, | |
expectedErrContains: "expected commit payload to be", | |
}, | |
{ | |
name: "successful delete class", | |
tx: &cluster.Transaction{ | |
Type: DeleteClass, | |
Payload: DeleteClassPayload{ | |
ClassName: "FirstClass", | |
}, | |
}, | |
assertSchema: func(t *testing.T, sm *Manager) { | |
class, err := sm.GetClass(context.Background(), nil, "FirstClass") | |
require.Nil(t, err) | |
assert.Nil(t, class) | |
}, | |
}, | |
{ | |
name: "delete class with incorrect payload", | |
tx: &cluster.Transaction{ | |
Type: DeleteClass, | |
Payload: "wrong-payload", | |
}, | |
expectedErrContains: "expected commit payload to be", | |
}, | |
{ | |
name: "successful update class", | |
tx: &cluster.Transaction{ | |
Type: UpdateClass, | |
Payload: UpdateClassPayload{ | |
ClassName: "FirstClass", | |
Class: &models.Class{ | |
Class: "FirstClass", | |
VectorIndexType: "hnsw", | |
Properties: []*models.Property{ | |
{ | |
Name: "added_through_update", | |
DataType: []string{"int"}, | |
}, | |
}, | |
}, | |
State: &sharding.State{}, | |
}, | |
}, | |
assertSchema: func(t *testing.T, sm *Manager) { | |
class, err := sm.GetClass(context.Background(), nil, "FirstClass") | |
require.Nil(t, err) | |
assert.Equal(t, "added_through_update", class.Properties[0].Name) | |
}, | |
}, | |
{ | |
name: "update class with incorrect payload", | |
tx: &cluster.Transaction{ | |
Type: UpdateClass, | |
Payload: "wrong-payload", | |
}, | |
expectedErrContains: "expected commit payload to be", | |
}, | |
{ | |
name: "update class with invalid vector index", | |
tx: &cluster.Transaction{ | |
Type: UpdateClass, | |
Payload: UpdateClassPayload{ | |
ClassName: "FirstClass", | |
Class: &models.Class{ | |
Class: "FirstClass", | |
VectorIndexType: "nope", | |
}, | |
State: &sharding.State{}, | |
}, | |
}, | |
expectedErrContains: "parse vector index", | |
}, | |
{ | |
name: "update class with invalid sharding config", | |
tx: &cluster.Transaction{ | |
Type: UpdateClass, | |
Payload: UpdateClassPayload{ | |
ClassName: "FirstClass", | |
Class: &models.Class{ | |
Class: "FirstClass", | |
VectorIndexType: "hnsw", | |
ShardingConfig: "this-cant-be-a-string", | |
}, | |
State: &sharding.State{}, | |
}, | |
}, | |
expectedErrContains: "parse sharding config", | |
}, | |
{ | |
name: "invalid commit type", | |
tx: &cluster.Transaction{ | |
Type: "i-dont-exist", | |
}, | |
expectedErrContains: "unrecognized commit type", | |
}, | |
{ | |
name: "successfully add tenants", | |
tx: &cluster.Transaction{ | |
Type: addTenants, | |
Payload: AddTenantsPayload{ | |
Class: "FirstClass", | |
Tenants: []TenantCreate{{Name: "P1"}, {Name: "P2"}}, | |
}, | |
}, | |
assertSchema: func(t *testing.T, sm *Manager) { | |
st := sm.CopyShardingState("FirstClass") | |
require.NotNil(t, st) | |
require.Contains(t, st.Physical, "P1") | |
require.Contains(t, st.Physical, "P2") | |
}, | |
}, | |
{ | |
name: "add partition to an unknown class", | |
tx: &cluster.Transaction{ | |
Type: addTenants, | |
Payload: AddTenantsPayload{ | |
Class: "UnknownClass", | |
Tenants: []TenantCreate{{Name: "P1"}, {Name: "P2"}}, | |
}, | |
}, | |
expectedErrContains: "UnknownClass", | |
}, | |
{ | |
name: "add tenants with incorrect payload", | |
tx: &cluster.Transaction{ | |
Type: addTenants, | |
Payload: AddPropertyPayload{}, | |
}, | |
expectedErrContains: "expected commit payload to be", | |
}, | |
{ | |
name: "successfully update tenants", | |
before: func(t *testing.T, sm *Manager) { | |
err := sm.handleCommit(context.Background(), &cluster.Transaction{ | |
Type: addTenants, | |
Payload: AddTenantsPayload{ | |
Class: "FirstClass", | |
Tenants: []TenantCreate{ | |
{Name: "P1"}, | |
{Name: "P2", Status: models.TenantActivityStatusHOT}, | |
}, | |
}, | |
}) | |
require.Nil(t, err) | |
}, | |
tx: &cluster.Transaction{ | |
Type: updateTenants, | |
Payload: UpdateTenantsPayload{ | |
Class: "FirstClass", | |
Tenants: []TenantUpdate{ | |
{Name: "P1", Status: models.TenantActivityStatusCOLD}, | |
{Name: "P2", Status: models.TenantActivityStatusCOLD}, | |
}, | |
}, | |
}, | |
assertSchema: func(t *testing.T, sm *Manager) { | |
st := sm.CopyShardingState("FirstClass") | |
require.NotNil(t, st) | |
require.Contains(t, st.Physical, "P1") | |
require.Contains(t, st.Physical, "P2") | |
assert.Equal(t, st.Physical["P1"].Status, models.TenantActivityStatusCOLD) | |
assert.Equal(t, st.Physical["P2"].Status, models.TenantActivityStatusCOLD) | |
}, | |
}, | |
{ | |
name: "update tenants of unknown class", | |
tx: &cluster.Transaction{ | |
Type: updateTenants, | |
Payload: UpdateTenantsPayload{ | |
Class: "UnknownClass", | |
Tenants: []TenantUpdate{ | |
{Name: "P1", Status: models.TenantActivityStatusCOLD}, | |
{Name: "P2", Status: models.TenantActivityStatusCOLD}, | |
}, | |
}, | |
}, | |
expectedErrContains: "UnknownClass", | |
}, | |
{ | |
name: "update tenants with incorrect payload", | |
tx: &cluster.Transaction{ | |
Type: updateTenants, | |
Payload: AddPropertyPayload{}, | |
}, | |
expectedErrContains: "expected commit payload to be", | |
}, | |
{ | |
name: "merge object property of unknown class", | |
tx: &cluster.Transaction{ | |
Type: mergeObjectProperty, | |
Payload: MergeObjectPropertyPayload{ | |
ClassName: "UnknownClass", | |
Property: updatedObjectProperty, | |
}, | |
}, | |
expectedErrContains: "class not found", | |
}, | |
{ | |
name: "merge object property of unknown property", | |
tx: &cluster.Transaction{ | |
Type: mergeObjectProperty, | |
Payload: MergeObjectPropertyPayload{ | |
ClassName: "FirstClass", | |
Property: updatedObjectProperty, | |
}, | |
}, | |
expectedErrContains: "property not found", | |
}, | |
{ | |
name: "merge object property", | |
before: func(t *testing.T, sm *Manager) { | |
err := sm.handleCommit(context.Background(), &cluster.Transaction{ | |
Type: AddProperty, | |
Payload: AddPropertyPayload{ | |
ClassName: "FirstClass", | |
Property: objectProperty, | |
}, | |
}) | |
require.Nil(t, err) | |
}, | |
tx: &cluster.Transaction{ | |
Type: mergeObjectProperty, | |
Payload: MergeObjectPropertyPayload{ | |
ClassName: "FirstClass", | |
Property: updatedObjectProperty, | |
}, | |
}, | |
assertSchema: func(t *testing.T, sm *Manager) { | |
updatedClass := sm.getClassByName("FirstClass") | |
require.NotNil(t, updatedClass) | |
require.Len(t, updatedClass.Properties, 1) | |
mergedProperty := updatedClass.Properties[0] | |
require.NotNil(t, mergedProperty) | |
assert.Equal(t, expectedObjectProperty.DataType, mergedProperty.DataType) | |
assert.Equal(t, expectedObjectProperty.IndexFilterable, mergedProperty.IndexFilterable) | |
assert.Equal(t, expectedObjectProperty.IndexSearchable, mergedProperty.IndexSearchable) | |
assert.Equal(t, expectedObjectProperty.Tokenization, mergedProperty.Tokenization) | |
test_utils.AssertNestedPropsMatch(t, expectedObjectProperty.NestedProperties, mergedProperty.NestedProperties) | |
}, | |
}, | |
{ | |
name: "merge object property with invalid payload", | |
tx: &cluster.Transaction{ | |
Type: mergeObjectProperty, | |
Payload: AddPropertyPayload{ | |
ClassName: "FirstClass", | |
Property: updatedObjectProperty, | |
}, | |
}, | |
expectedErrContains: "expected commit payload to be", | |
}, | |
} | |
for _, test := range tests { | |
t.Run(test.name, func(t *testing.T) { | |
schemaBefore := &State{ | |
ObjectSchema: &models.Schema{ | |
Classes: []*models.Class{ | |
{ | |
Class: "FirstClass", | |
VectorIndexType: "hnsw", | |
}, | |
}, | |
}, | |
} | |
sm, err := newManagerWithClusterAndTx(t, | |
&fakeClusterState{hosts: []string{"node1"}}, &fakeTxClient{}, | |
schemaBefore) | |
require.Nil(t, err) | |
if test.before != nil { | |
test.before(t, sm) | |
} | |
err = sm.handleCommit(context.Background(), test.tx) | |
if test.expectedErrContains == "" { | |
require.Nil(t, err) | |
test.assertSchema(t, sm) | |
} else { | |
require.NotNil(t, err) | |
assert.Contains(t, err.Error(), test.expectedErrContains) | |
} | |
}) | |
} | |
} | |
func TestTxResponse(t *testing.T) { | |
type test struct { | |
name string | |
tx *cluster.Transaction | |
assertTx func(t *testing.T, tx *cluster.Transaction, payload json.RawMessage) | |
} | |
tests := []test{ | |
{ | |
name: "ignore write transactions", | |
tx: &cluster.Transaction{ | |
Type: AddClass, | |
Payload: AddClassPayload{ | |
Class: &models.Class{ | |
Class: "SecondClass", | |
VectorIndexType: "hnsw", | |
}, | |
State: &sharding.State{}, | |
}, | |
}, | |
assertTx: func(t *testing.T, tx *cluster.Transaction, payload json.RawMessage) { | |
_, ok := tx.Payload.(AddClassPayload) | |
assert.True(t, ok, "write tx was not changed") | |
}, | |
}, | |
{ | |
name: "respond with schema on ReadSchema", | |
tx: &cluster.Transaction{ | |
Type: ReadSchema, | |
Payload: nil, | |
}, | |
assertTx: func(t *testing.T, tx *cluster.Transaction, payload json.RawMessage) { | |
pl, err := unmarshalRawJson[ReadSchemaPayload](payload) | |
require.Nil(t, err) | |
require.Len(t, pl.Schema.ObjectSchema.Classes, 1) | |
assert.Equal(t, "FirstClass", pl.Schema.ObjectSchema.Classes[0].Class) | |
}, | |
}, | |
} | |
for _, test := range tests { | |
t.Run(test.name, func(t *testing.T) { | |
schemaBefore := &State{ | |
ObjectSchema: &models.Schema{ | |
Classes: []*models.Class{ | |
{ | |
Class: "FirstClass", | |
VectorIndexType: "hnsw", | |
}, | |
}, | |
}, | |
} | |
sm, err := newManagerWithClusterAndTx(t, | |
&fakeClusterState{hosts: []string{"node1"}}, &fakeTxClient{}, | |
schemaBefore) | |
require.Nil(t, err) | |
data, err := sm.handleTxResponse(context.Background(), test.tx) | |
require.Nil(t, err) | |
if test.tx.Type == ReadSchema { | |
var txRes txResponsePayload | |
err = json.Unmarshal(data, &txRes) | |
require.Nil(t, err) | |
test.assertTx(t, test.tx, txRes.Payload) | |
} | |
}) | |
} | |
} | |
type txResponsePayload struct { | |
Type cluster.TransactionType `json:"type"` | |
ID string `json:"id"` | |
Payload json.RawMessage `json:"payload"` | |
} | |