SemanticSearchPOC / usecases /schema /startup_cluster_sync.go
KevinStephenson
Adding in weaviate code
b110593
raw
history blame
8.42 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package schema
import (
"context"
"fmt"
"strings"
"github.com/sirupsen/logrus"
"github.com/weaviate/weaviate/entities/models"
)
// startupClusterSync tries to determine what - if any - schema migration is
// required at startup. If a node is the first in a cluster the assumption is
// that its state is the truth.
//
// For the n-th node (where n>1) there is a potential for conflict if the
// schemas aren't in sync:
//
// - If Node 1 has a non-nil schema, but Node 2 has a nil-schema, then we can
// consider Node 2 to be a new node that is just joining the cluster. In this
// case, we can copy the state from the existing nodes (if they agree on a
// schema)
//
// - If Node 1 and Node 2 have an identical schema, then we can assume that the
// startup was just an ordinary (re)start of the node. No action is required.
//
// - If Node 1 and Node 2 both have a schema, but they aren't in sync, the
// cluster is broken. This state cannot be automatically recovered from and
// startup needs to fail. Manual intervention would be required in this case.
func (m *Manager) startupClusterSync(ctx context.Context) error {
nodes := m.clusterState.AllNames()
if len(nodes) <= 1 {
return m.startupHandleSingleNode(ctx, nodes)
}
if m.schemaCache.isEmpty() {
return m.startupJoinCluster(ctx)
}
err := m.validateSchemaCorruption(ctx)
if err == nil {
// schema is fine, we are done
return nil
}
if m.clusterState.SchemaSyncIgnored() {
m.logger.WithError(err).WithFields(logrusStartupSyncFields()).
Warning("schema out of sync, but ignored because " +
"CLUSTER_IGNORE_SCHEMA_SYNC=true")
return nil
}
if m.cluster.HaveDanglingTxs(ctx, resumableTxs) {
m.logger.WithFields(logrusStartupSyncFields()).
Infof("schema out of sync, but there are dangling transactions, the check will be repeated after an attempt to resume those transactions")
m.LockGuard(func() {
m.shouldTryToResumeTx = true
})
return nil
}
return err
}
// startupHandleSingleNode deals with the case where there is only a single
// node in the cluster. In the vast majority of cases there is nothing to do.
// An edge case would be where the cluster has size=0, or size=1 but the node's
// name is not the local name's node. This would indicate a broken cluster and
// can't be recovered from
func (m *Manager) startupHandleSingleNode(ctx context.Context,
nodes []string,
) error {
localName := m.clusterState.LocalName()
if len(nodes) == 0 {
return fmt.Errorf("corrupt cluster state: cluster has size=0")
}
if nodes[0] != localName {
return fmt.Errorf("corrupt cluster state: only node in the cluster does not "+
"match local node name: %v vs %s", nodes, localName)
}
m.logger.WithFields(logrusStartupSyncFields()).
Debug("Only node in the cluster at this point. " +
"No schema sync necessary.")
// startup is complete
return nil
}
// startupJoinCluster migrates the schema for a new node. The assumption is
// that other nodes have schema state and we need to migrate this schema to the
// local node transactionally. In other words, this startup process can not
// occur concurrently with a user-initiated schema update. One of those must
// fail.
//
// There is one edge case: The cluster could consist of multiple nodes which
// are empty. In this case, no migration is required.
func (m *Manager) startupJoinCluster(ctx context.Context) error {
tx, err := m.cluster.BeginTransaction(ctx, ReadSchema, nil, DefaultTxTTL)
if err != nil {
if m.clusterSyncImpossibleBecauseRemoteNodeTooOld(err) {
return nil
}
return fmt.Errorf("read schema: open transaction: %w", err)
}
// this tx is read-only, so we don't have to worry about aborting it, the
// close should be the same on both happy and unhappy path
defer m.cluster.CloseReadTransaction(ctx, tx)
pl, ok := tx.Payload.(ReadSchemaPayload)
if !ok {
return fmt.Errorf("unrecognized tx response payload: %T", tx.Payload)
}
// by the time we're here the consensus function has run, so we can be sure
// that all other nodes agree on this schema.
if isEmpty(pl.Schema) {
// already in sync, nothing to do
return nil
}
if err := m.saveSchema(ctx, *pl.Schema); err != nil {
return fmt.Errorf("save schema: %w", err)
}
m.schemaCache.setState(*pl.Schema)
return nil
}
func (m *Manager) ClusterStatus(ctx context.Context) (*models.SchemaClusterStatus, error) {
m.RLock()
defer m.RUnlock()
out := &models.SchemaClusterStatus{
Hostname: m.clusterState.LocalName(),
IgnoreSchemaSync: m.clusterState.SchemaSyncIgnored(),
}
nodes := m.clusterState.AllNames()
out.NodeCount = int64(len(nodes))
if len(nodes) < 2 {
out.Healthy = true
return out, nil
}
err := m.validateSchemaCorruption(ctx)
if err != nil {
out.Error = err.Error()
out.Healthy = false
return out, err
}
out.Healthy = true
return out, nil
}
// validateSchemaCorruption makes sure that - given that all nodes in the
// cluster have a schema - they are in sync. If not the cluster is considered
// broken and needs to be repaired manually
func (m *Manager) validateSchemaCorruption(ctx context.Context) error {
tx, err := m.cluster.BeginTransaction(ctx, ReadSchema, nil, DefaultTxTTL)
if err != nil {
if m.clusterSyncImpossibleBecauseRemoteNodeTooOld(err) {
return nil
}
return fmt.Errorf("read schema: open transaction: %w", err)
}
// this tx is read-only, so we don't have to worry about aborting it, the
// close should be the same on both happy and unhappy path
if err = m.cluster.CloseReadTransaction(ctx, tx); err != nil {
return err
}
pl, ok := tx.Payload.(ReadSchemaPayload)
if !ok {
return fmt.Errorf("unrecognized tx response payload: %T", tx.Payload)
}
var diff []string
cmp := func() error {
if err := Equal(&m.schemaCache.State, pl.Schema); err != nil {
diff = Diff("local", &m.schemaCache.State, "cluster", pl.Schema)
return err
}
return nil
}
if err := m.schemaCache.RLockGuard(cmp); err != nil {
m.logger.WithFields(logrusStartupSyncFields()).WithFields(logrus.Fields{
"diff": diff,
}).Warning("mismatch between local schema and remote (other nodes consensus) schema")
if m.clusterState.SkipSchemaRepair() {
return fmt.Errorf("corrupt cluster: other nodes have consensus on schema, "+
"but local node has a different (non-null) schema: %w", err)
}
if repairErr := m.repairSchema(ctx, pl.Schema); repairErr != nil {
return fmt.Errorf("attempted to repair and failed: %v, sync error: %w", repairErr, err)
}
}
return nil
}
func logrusStartupSyncFields() logrus.Fields {
return logrus.Fields{"action": "startup_cluster_schema_sync"}
}
func isEmpty(schema *State) bool {
return schema == nil || schema.ObjectSchema == nil || len(schema.ObjectSchema.Classes) == 0
}
func (m *Manager) clusterSyncImpossibleBecauseRemoteNodeTooOld(err error) bool {
// string-matching on the error message isn't the cleanest way possible, but
// unfortunately there's not an easy way to find out, as this check has to
// work with whatever was already present in v1.16.x
//
// in theory we could have used the node api which also returns the versions,
// however, the node API depends on the DB which depends on the schema
// manager, so we cannot use them at schema manager startup which happens
// before db startup.
//
// Given that this workaround should only ever be required during a rolling
// update from v1.16 to v1.17, we can consider this acceptable
if strings.Contains(err.Error(), "unrecognized schema transaction type") {
m.logger.WithFields(logrusStartupSyncFields()).
Info("skipping schema cluster sync because not all nodes in the cluster " +
"support schema cluster sync yet. To enable schema cluster sync at startup " +
"make sure all nodes in the cluster run at least v1.17")
return true
}
return false
}