Spaces:
Running
Running
// _ _ | |
// __ _____ __ ___ ___ __ _| |_ ___ | |
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
// \ V V / __/ (_| |\ V /| | (_| | || __/ | |
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
// | |
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. | |
// | |
// CONTACT: [email protected] | |
// | |
package schema | |
import ( | |
"context" | |
"encoding/json" | |
"fmt" | |
"testing" | |
logrustest "github.com/sirupsen/logrus/hooks/test" | |
"github.com/stretchr/testify/assert" | |
"github.com/stretchr/testify/require" | |
"github.com/weaviate/weaviate/entities/models" | |
"github.com/weaviate/weaviate/usecases/cluster" | |
"github.com/weaviate/weaviate/usecases/sharding" | |
) | |
func TestReadConsensus(t *testing.T) { | |
type test struct { | |
name string | |
in []*cluster.Transaction | |
expectedResult *cluster.Transaction | |
expectError bool | |
parser parserFn | |
} | |
tests := []test{ | |
{ | |
name: "different (unrelated) tx type (no consensus required)", | |
in: []*cluster.Transaction{ | |
{ | |
Type: AddClass, | |
ID: "id1", | |
}, | |
}, | |
expectError: false, | |
}, | |
{ | |
name: "single schema with content", | |
in: []*cluster.Transaction{ | |
wrapSchemaAsRawReadTx(&State{ | |
ShardingState: map[string]*sharding.State{ | |
"Foo": { | |
IndexID: "1234", | |
}, | |
}, | |
ObjectSchema: &models.Schema{ | |
Classes: []*models.Class{ | |
{ | |
Class: "Foo", | |
}, | |
}, | |
}, | |
}), | |
}, | |
expectedResult: wrapSchemaAsReadTx(&State{ | |
ShardingState: map[string]*sharding.State{ | |
"Foo": { | |
IndexID: "1234", | |
}, | |
}, | |
ObjectSchema: &models.Schema{ | |
Classes: []*models.Class{ | |
{ | |
Class: "Foo", | |
}, | |
}, | |
}, | |
}), | |
}, | |
{ | |
name: "two identical empty schemas", | |
in: []*cluster.Transaction{ | |
wrapSchemaAsRawReadTx(newSchema()), | |
wrapSchemaAsRawReadTx(newSchema()), | |
}, | |
expectedResult: wrapSchemaAsReadTx(newSchema()), | |
}, | |
{ | |
name: "two identical filled schemas", | |
in: []*cluster.Transaction{ | |
wrapSchemaAsRawReadTx(&State{ | |
ShardingState: map[string]*sharding.State{ | |
"Foo": { | |
IndexID: "1234", | |
}, | |
}, | |
ObjectSchema: &models.Schema{ | |
Classes: []*models.Class{ | |
{ | |
Class: "Foo", | |
}, | |
}, | |
}, | |
}), | |
wrapSchemaAsRawReadTx(&State{ | |
ShardingState: map[string]*sharding.State{ | |
"Foo": { | |
IndexID: "1234", | |
}, | |
}, | |
ObjectSchema: &models.Schema{ | |
Classes: []*models.Class{ | |
{ | |
Class: "Foo", | |
}, | |
}, | |
}, | |
}), | |
}, | |
expectedResult: wrapSchemaAsReadTx(&State{ | |
ShardingState: map[string]*sharding.State{ | |
"Foo": { | |
IndexID: "1234", | |
}, | |
}, | |
ObjectSchema: &models.Schema{ | |
Classes: []*models.Class{ | |
{ | |
Class: "Foo", | |
}, | |
}, | |
}, | |
}), | |
}, | |
{ | |
name: "3 responses with a conflict", | |
in: []*cluster.Transaction{ | |
wrapSchemaAsRawReadTx(&State{ | |
ShardingState: map[string]*sharding.State{ | |
"Foo": { | |
IndexID: "1234", | |
}, | |
}, | |
ObjectSchema: &models.Schema{ | |
Classes: []*models.Class{ | |
{ | |
Class: "Foo", | |
}, | |
}, | |
}, | |
}), | |
wrapSchemaAsRawReadTx(&State{ | |
ShardingState: map[string]*sharding.State{ | |
"Foo": { | |
IndexID: "1234", | |
}, | |
}, | |
ObjectSchema: &models.Schema{ | |
Classes: []*models.Class{ | |
{ | |
Class: "Foo", | |
}, | |
}, | |
}, | |
}), | |
wrapSchemaAsRawReadTx(&State{ | |
ShardingState: map[string]*sharding.State{ | |
"Foo": { | |
IndexID: "1234", | |
}, | |
}, | |
ObjectSchema: &models.Schema{ | |
Classes: []*models.Class{ | |
{ | |
Class: "Foo", | |
// the other classes don't have the vector index set: | |
VectorIndexType: "La-Forca-de-Bruta", | |
}, | |
}, | |
}, | |
}), | |
}, | |
expectError: true, | |
}, | |
{ | |
name: "tx id mismatch", | |
in: []*cluster.Transaction{ | |
{ | |
Type: ReadSchema, | |
Payload: json.RawMessage("null"), | |
ID: "id1", | |
}, | |
{ | |
Type: ReadSchema, | |
Payload: json.RawMessage("null"), | |
ID: "id2", | |
}, | |
}, | |
expectError: true, | |
}, | |
{ | |
name: "invalid payload json", | |
in: []*cluster.Transaction{ | |
{ | |
Type: ReadSchema, | |
Payload: json.RawMessage("----<@#()$*--"), | |
ID: "id1", | |
}, | |
}, | |
expectError: true, | |
}, | |
{ | |
name: "invalid payload json", | |
in: []*cluster.Transaction{ | |
{ | |
Type: ReadSchema, | |
Payload: json.RawMessage("----<@#()$*--"), | |
ID: "id1", | |
}, | |
}, | |
expectError: true, | |
}, | |
{ | |
name: "schema parse error", | |
parser: func(ctx context.Context, s *State) error { | |
return fmt.Errorf("not so fast there, Mister Schema!") | |
}, | |
in: []*cluster.Transaction{ | |
wrapSchemaAsRawReadTx(&State{ | |
ShardingState: map[string]*sharding.State{ | |
"Foo": { | |
IndexID: "1234", | |
}, | |
}, | |
ObjectSchema: &models.Schema{ | |
Classes: []*models.Class{ | |
{ | |
Class: "Foo", | |
}, | |
}, | |
}, | |
}), | |
}, | |
expectError: true, | |
}, | |
} | |
for _, test := range tests { | |
t.Run(test.name, func(t *testing.T) { | |
parser := dummyParser | |
if test.parser != nil { | |
parser = test.parser | |
} | |
logger, _ := logrustest.NewNullLogger() | |
out, err := newReadConsensus(parser, logger)(context.Background(), test.in) | |
if test.expectError { | |
require.NotNil(t, err, "must error") | |
} else { | |
require.Nil(t, err) | |
} | |
assert.Equal(t, test.expectedResult, out) | |
}) | |
} | |
} | |
// raw = before unmarshalling the payload | |
func wrapSchemaAsRawReadTx(s *State) *cluster.Transaction { | |
payloadJSON, _ := json.Marshal(ReadSchemaPayload{ | |
Schema: s, | |
}) | |
return &cluster.Transaction{ | |
Type: ReadSchema, | |
Payload: json.RawMessage(payloadJSON), | |
ID: "best-tx", | |
} | |
} | |
func wrapSchemaAsReadTx(s *State) *cluster.Transaction { | |
return &cluster.Transaction{ | |
Type: ReadSchema, | |
Payload: ReadSchemaPayload{Schema: s}, | |
ID: "best-tx", | |
} | |
} | |
func dummyParser(ctx context.Context, schema *State) error { | |
return nil | |
} | |