SemanticSearchPOC / usecases /schema /startup_cluster_sync_test.go
KevinStephenson
Adding in weaviate code
b110593
raw
history blame
12.8 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package schema
import (
"context"
"encoding/json"
"fmt"
"testing"
testlog "github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaviate/weaviate/entities/models"
"github.com/weaviate/weaviate/entities/replication"
"github.com/weaviate/weaviate/usecases/cluster"
"github.com/weaviate/weaviate/usecases/config"
"github.com/weaviate/weaviate/usecases/sharding"
)
func TestStartupSync(t *testing.T) {
t.Run("new node joining, other nodes have schema", func(t *testing.T) {
clusterState := &fakeClusterState{
hosts: []string{"node1", "node2"},
}
txJSON, _ := json.Marshal(ReadSchemaPayload{
Schema: &State{
ObjectSchema: &models.Schema{
Classes: []*models.Class{
{
Class: "Bongourno",
VectorIndexType: "hnsw",
},
},
},
},
})
txClient := &fakeTxClient{
openInjectPayload: json.RawMessage(txJSON),
}
sm, err := newManagerWithClusterAndTx(t, clusterState, txClient, nil)
require.Nil(t, err)
localSchema := sm.GetSchemaSkipAuth()
assert.Equal(t, "Bongourno", localSchema.FindClassByName("Bongourno").Class)
st, _ := sm.ClusterStatus(context.Background())
assert.False(t, st.IgnoreSchemaSync, "sync is indicated as not skipped")
assert.True(t, st.Healthy, "cluster is deemed healthy")
assert.Len(t, st.Error, 0, "no error is shown")
})
t.Run("new node joining, other nodes have no schema", func(t *testing.T) {
clusterState := &fakeClusterState{
hosts: []string{"node1", "node2"},
}
txJSON, _ := json.Marshal(ReadSchemaPayload{
Schema: &State{
ObjectSchema: &models.Schema{
Classes: []*models.Class{},
},
},
})
txClient := &fakeTxClient{
openInjectPayload: json.RawMessage(txJSON),
}
sm, err := newManagerWithClusterAndTx(t, clusterState, txClient, nil)
require.Nil(t, err)
localSchema := sm.GetSchemaSkipAuth()
assert.Len(t, localSchema.Objects.Classes, 0)
st, _ := sm.ClusterStatus(context.Background())
assert.False(t, st.IgnoreSchemaSync, "sync is indicated as not skipped")
assert.True(t, st.Healthy, "cluster is deemed healthy")
assert.Len(t, st.Error, 0, "no error is shown")
})
t.Run("new node joining, conflict in schema between nodes", func(t *testing.T) {
clusterState := &fakeClusterState{
hosts: []string{"node1", "node2"},
skipRepair: true,
}
txJSON, _ := json.Marshal(ReadSchemaPayload{
Schema: &State{
ObjectSchema: &models.Schema{
Classes: []*models.Class{
{
Class: "Bongourno",
VectorIndexType: "hnsw",
},
},
},
},
})
txClient := &fakeTxClient{
openInjectPayload: json.RawMessage(txJSON),
}
_, err := newManagerWithClusterAndTx(t, clusterState, txClient, &State{
ObjectSchema: &models.Schema{
Classes: []*models.Class{
{
Class: "Hola",
VectorIndexType: "hnsw",
},
},
},
})
require.NotNil(t, err)
assert.Contains(t, err.Error(), "corrupt")
})
t.Run("conflict, but schema repaired", func(t *testing.T) {
clusterState := &fakeClusterState{
hosts: []string{"node1", "node2"},
}
txJSON, _ := json.Marshal(ReadSchemaPayload{
Schema: &State{
ObjectSchema: &models.Schema{
Classes: []*models.Class{
{
Class: "Bongourno",
VectorIndexType: "hnsw",
},
},
},
},
})
txClient := &fakeTxClient{
openInjectPayload: json.RawMessage(txJSON),
}
mgr, err := newManagerWithClusterAndTx(t, clusterState, txClient, &State{
ObjectSchema: &models.Schema{
Classes: []*models.Class{
{
Class: "Hola",
VectorIndexType: "hnsw",
},
},
},
})
assert.Len(t, mgr.ObjectSchema.Classes, 2)
require.Nil(t, err, "expected nil err, got: %v", err)
})
t.Run("conflict, but sync skipped -> no error", func(t *testing.T) {
clusterState := &fakeClusterState{
hosts: []string{"node1", "node2"},
syncIgnored: true,
skipRepair: true,
}
txJSON, _ := json.Marshal(ReadSchemaPayload{
Schema: &State{
ObjectSchema: &models.Schema{
Classes: []*models.Class{
{
Class: "Bongourno",
VectorIndexType: "hnsw",
},
},
},
},
})
txClient := &fakeTxClient{
openInjectPayload: json.RawMessage(txJSON),
}
m, err := newManagerWithClusterAndTx(t, clusterState, txClient, &State{
ObjectSchema: &models.Schema{
Classes: []*models.Class{
{
Class: "Hola",
VectorIndexType: "hnsw",
},
},
},
})
require.Nil(t, err)
st, _ := m.ClusterStatus(context.Background())
assert.True(t, st.IgnoreSchemaSync, "sync is indicated as skipped")
assert.False(t, st.Healthy, "cluster is not deemed healthy")
assert.True(t, len(st.Error) > 0, "the error is shown")
})
t.Run("new node joining, agreement between all", func(t *testing.T) {
clusterState := &fakeClusterState{
hosts: []string{"node1", "node2"},
}
txJSON, _ := json.Marshal(ReadSchemaPayload{
Schema: &State{
ShardingState: map[string]*sharding.State{
"GutenTag": {},
},
ObjectSchema: &models.Schema{
Classes: []*models.Class{
{
Class: "GutenTag",
VectorIndexType: "hnsw",
},
},
},
},
})
txClient := &fakeTxClient{
openInjectPayload: json.RawMessage(txJSON),
}
sm, err := newManagerWithClusterAndTx(t, clusterState, txClient, &State{
ShardingState: map[string]*sharding.State{
"GutenTag": {},
},
ObjectSchema: &models.Schema{
Classes: []*models.Class{
{
Class: "GutenTag",
VectorIndexType: "hnsw",
},
},
},
})
require.Nil(t, err)
localSchema := sm.GetSchemaSkipAuth()
assert.Equal(t, "GutenTag", localSchema.FindClassByName("GutenTag").Class)
})
t.Run("new node joining, other nodes include an outdated version", func(t *testing.T) {
clusterState := &fakeClusterState{
hosts: []string{"node1", "node2"},
}
txClient := &fakeTxClient{
openErr: fmt.Errorf("unrecognized schema transaction type"),
}
sm, err := newManagerWithClusterAndTx(t, clusterState, txClient, nil)
require.Nil(t, err) // no error, sync was skipped
schema := sm.GetSchemaSkipAuth()
assert.Len(t, schema.Objects.Classes, 0, "schema is still empty")
})
t.Run("node with data (re-)joining, but other nodes are too old", func(t *testing.T) {
// we expect that sync would be skipped because the other nodes can't take
// part in the sync
clusterState := &fakeClusterState{
hosts: []string{"node1", "node2"},
}
txClient := &fakeTxClient{
openErr: fmt.Errorf("unrecognized schema transaction type"),
}
sm, err := newManagerWithClusterAndTx(t, clusterState, txClient, &State{
ObjectSchema: &models.Schema{
Classes: []*models.Class{
{
Class: "Hola",
VectorIndexType: "hnsw",
},
},
},
})
require.Nil(t, err) // startup sync was skipped, no error
schema := sm.GetSchemaSkipAuth()
require.Len(t, schema.Objects.Classes, 1, "schema is still the local schema")
assert.Equal(t, "Hola", schema.Objects.Classes[0].Class)
})
t.Run("new node joining, schema identical, but other nodes have already been migrated", func(t *testing.T) {
// Migration refers to the change that happens when a node first starts
// up with v1.17. It reads the `belongsToNode` from the sharding config and
// writes the content into the new `belongsToNodes[]` array type.
//
// The timing of the migration vs the sync matters: The remote notes have
// already completed startup, therefore they have been migrated. If the
// local schema is not migrated yet, it could fail the checks even though
// it is logically identical.
clusterState := &fakeClusterState{
hosts: []string{"node1", "node2"},
}
txJSON, _ := json.Marshal(ReadSchemaPayload{
Schema: &State{
ShardingState: map[string]*sharding.State{
"GutenTag": {
IndexID: "GutenTag",
Physical: map[string]sharding.Physical{
"a-shard-of-beauty": {
Name: "a-shard-of-beauty",
BelongsToNodes: []string{"node-0"}, // Note the usage of the new field (!)
},
},
},
},
ObjectSchema: &models.Schema{
Classes: []*models.Class{
{
Class: "GutenTag",
VectorIndexType: "hnsw",
},
},
},
},
})
txClient := &fakeTxClient{
openInjectPayload: json.RawMessage(txJSON),
}
sm, err := newManagerWithClusterAndTx(t, clusterState, txClient, &State{
ShardingState: map[string]*sharding.State{
"GutenTag": {
IndexID: "GutenTag",
Physical: map[string]sharding.Physical{
"a-shard-of-beauty": {
Name: "a-shard-of-beauty",
LegacyBelongsToNodeForBackwardCompat: "node-0", // Note the usage of the old field (!)
},
},
},
},
ObjectSchema: &models.Schema{
Classes: []*models.Class{
{
Class: "GutenTag",
VectorIndexType: "hnsw",
},
},
},
})
require.Nil(t, err)
localSchema := sm.GetSchemaSkipAuth()
assert.Equal(t, "GutenTag", localSchema.FindClassByName("GutenTag").Class)
})
}
func TestStartupSyncUnhappyPaths(t *testing.T) {
type test struct {
name string
nodes []string
errContains string
txPayload interface{}
txOpenErr error
initialSchema *State
}
tests := []test{
{
name: "corrupt cluster state: no nodes",
nodes: []string{},
errContains: "cluster has size=0",
},
{
name: "corrupt cluster state: name mismatch",
nodes: []string{"the-wrong-one"},
errContains: "only node in the cluster does not match local",
},
{
name: "open tx fails on empty node",
nodes: []string{"node1", "node2"},
txOpenErr: cluster.ErrConcurrentTransaction,
errContains: "concurrent transaction",
},
{
name: "open tx fails on populated node",
initialSchema: &State{ObjectSchema: &models.Schema{
Classes: []*models.Class{{Class: "Foo", VectorIndexType: "hnsw"}},
}},
nodes: []string{"node1", "node2"},
txOpenErr: cluster.ErrConcurrentTransaction,
errContains: "concurrent transaction",
},
{
name: "wrong tx payload",
nodes: []string{"node1", "node2"},
txPayload: "foo",
errContains: "unmarshal tx",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
clusterState := &fakeClusterState{
hosts: test.nodes,
}
if test.txPayload == nil {
test.txPayload = ReadSchemaPayload{
Schema: &State{
ObjectSchema: &models.Schema{
Classes: []*models.Class{
{
Class: "Bongourno",
VectorIndexType: "hnsw",
},
},
},
},
}
}
txJSON, _ := json.Marshal(test.txPayload)
txClient := &fakeTxClient{
openInjectPayload: json.RawMessage(txJSON),
openErr: test.txOpenErr,
}
_, err := newManagerWithClusterAndTx(t, clusterState, txClient, test.initialSchema)
require.NotNil(t, err)
assert.Contains(t, err.Error(), test.errContains)
})
}
}
func newManagerWithClusterAndTx(t *testing.T, clusterState clusterState,
txClient cluster.Client, initialSchema *State,
) (*Manager, error) {
logger, _ := testlog.NewNullLogger()
repo := newFakeRepo()
if initialSchema == nil {
initState := NewState(1)
initialSchema = &initState
}
repo.schema = *initialSchema
sm, err := NewManager(&NilMigrator{}, repo, logger, &fakeAuthorizer{},
config.Config{
DefaultVectorizerModule: config.VectorizerModuleNone,
Replication: replication.GlobalConfig{MinimumFactor: 1},
},
dummyParseVectorConfig, // only option for now
&fakeVectorizerValidator{}, dummyValidateInvertedConfig,
&fakeModuleConfig{}, clusterState, txClient, &fakeTxPersistence{}, &fakeScaleOutManager{},
)
return sm, err
}