Spaces:
Running
Running
File size: 8,419 Bytes
b110593 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 |
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package schema
import (
"context"
"fmt"
"strings"
"github.com/sirupsen/logrus"
"github.com/weaviate/weaviate/entities/models"
)
// startupClusterSync tries to determine what - if any - schema migration is
// required at startup. If a node is the first in a cluster the assumption is
// that its state is the truth.
//
// For the n-th node (where n>1) there is a potential for conflict if the
// schemas aren't in sync:
//
// - If Node 1 has a non-nil schema, but Node 2 has a nil-schema, then we can
// consider Node 2 to be a new node that is just joining the cluster. In this
// case, we can copy the state from the existing nodes (if they agree on a
// schema)
//
// - If Node 1 and Node 2 have an identical schema, then we can assume that the
// startup was just an ordinary (re)start of the node. No action is required.
//
// - If Node 1 and Node 2 both have a schema, but they aren't in sync, the
// cluster is broken. This state cannot be automatically recovered from and
// startup needs to fail. Manual intervention would be required in this case.
func (m *Manager) startupClusterSync(ctx context.Context) error {
nodes := m.clusterState.AllNames()
if len(nodes) <= 1 {
return m.startupHandleSingleNode(ctx, nodes)
}
if m.schemaCache.isEmpty() {
return m.startupJoinCluster(ctx)
}
err := m.validateSchemaCorruption(ctx)
if err == nil {
// schema is fine, we are done
return nil
}
if m.clusterState.SchemaSyncIgnored() {
m.logger.WithError(err).WithFields(logrusStartupSyncFields()).
Warning("schema out of sync, but ignored because " +
"CLUSTER_IGNORE_SCHEMA_SYNC=true")
return nil
}
if m.cluster.HaveDanglingTxs(ctx, resumableTxs) {
m.logger.WithFields(logrusStartupSyncFields()).
Infof("schema out of sync, but there are dangling transactions, the check will be repeated after an attempt to resume those transactions")
m.LockGuard(func() {
m.shouldTryToResumeTx = true
})
return nil
}
return err
}
// startupHandleSingleNode deals with the case where there is only a single
// node in the cluster. In the vast majority of cases there is nothing to do.
// An edge case would be where the cluster has size=0, or size=1 but the node's
// name is not the local name's node. This would indicate a broken cluster and
// can't be recovered from
func (m *Manager) startupHandleSingleNode(ctx context.Context,
nodes []string,
) error {
localName := m.clusterState.LocalName()
if len(nodes) == 0 {
return fmt.Errorf("corrupt cluster state: cluster has size=0")
}
if nodes[0] != localName {
return fmt.Errorf("corrupt cluster state: only node in the cluster does not "+
"match local node name: %v vs %s", nodes, localName)
}
m.logger.WithFields(logrusStartupSyncFields()).
Debug("Only node in the cluster at this point. " +
"No schema sync necessary.")
// startup is complete
return nil
}
// startupJoinCluster migrates the schema for a new node. The assumption is
// that other nodes have schema state and we need to migrate this schema to the
// local node transactionally. In other words, this startup process can not
// occur concurrently with a user-initiated schema update. One of those must
// fail.
//
// There is one edge case: The cluster could consist of multiple nodes which
// are empty. In this case, no migration is required.
func (m *Manager) startupJoinCluster(ctx context.Context) error {
tx, err := m.cluster.BeginTransaction(ctx, ReadSchema, nil, DefaultTxTTL)
if err != nil {
if m.clusterSyncImpossibleBecauseRemoteNodeTooOld(err) {
return nil
}
return fmt.Errorf("read schema: open transaction: %w", err)
}
// this tx is read-only, so we don't have to worry about aborting it, the
// close should be the same on both happy and unhappy path
defer m.cluster.CloseReadTransaction(ctx, tx)
pl, ok := tx.Payload.(ReadSchemaPayload)
if !ok {
return fmt.Errorf("unrecognized tx response payload: %T", tx.Payload)
}
// by the time we're here the consensus function has run, so we can be sure
// that all other nodes agree on this schema.
if isEmpty(pl.Schema) {
// already in sync, nothing to do
return nil
}
if err := m.saveSchema(ctx, *pl.Schema); err != nil {
return fmt.Errorf("save schema: %w", err)
}
m.schemaCache.setState(*pl.Schema)
return nil
}
func (m *Manager) ClusterStatus(ctx context.Context) (*models.SchemaClusterStatus, error) {
m.RLock()
defer m.RUnlock()
out := &models.SchemaClusterStatus{
Hostname: m.clusterState.LocalName(),
IgnoreSchemaSync: m.clusterState.SchemaSyncIgnored(),
}
nodes := m.clusterState.AllNames()
out.NodeCount = int64(len(nodes))
if len(nodes) < 2 {
out.Healthy = true
return out, nil
}
err := m.validateSchemaCorruption(ctx)
if err != nil {
out.Error = err.Error()
out.Healthy = false
return out, err
}
out.Healthy = true
return out, nil
}
// validateSchemaCorruption makes sure that - given that all nodes in the
// cluster have a schema - they are in sync. If not the cluster is considered
// broken and needs to be repaired manually
func (m *Manager) validateSchemaCorruption(ctx context.Context) error {
tx, err := m.cluster.BeginTransaction(ctx, ReadSchema, nil, DefaultTxTTL)
if err != nil {
if m.clusterSyncImpossibleBecauseRemoteNodeTooOld(err) {
return nil
}
return fmt.Errorf("read schema: open transaction: %w", err)
}
// this tx is read-only, so we don't have to worry about aborting it, the
// close should be the same on both happy and unhappy path
if err = m.cluster.CloseReadTransaction(ctx, tx); err != nil {
return err
}
pl, ok := tx.Payload.(ReadSchemaPayload)
if !ok {
return fmt.Errorf("unrecognized tx response payload: %T", tx.Payload)
}
var diff []string
cmp := func() error {
if err := Equal(&m.schemaCache.State, pl.Schema); err != nil {
diff = Diff("local", &m.schemaCache.State, "cluster", pl.Schema)
return err
}
return nil
}
if err := m.schemaCache.RLockGuard(cmp); err != nil {
m.logger.WithFields(logrusStartupSyncFields()).WithFields(logrus.Fields{
"diff": diff,
}).Warning("mismatch between local schema and remote (other nodes consensus) schema")
if m.clusterState.SkipSchemaRepair() {
return fmt.Errorf("corrupt cluster: other nodes have consensus on schema, "+
"but local node has a different (non-null) schema: %w", err)
}
if repairErr := m.repairSchema(ctx, pl.Schema); repairErr != nil {
return fmt.Errorf("attempted to repair and failed: %v, sync error: %w", repairErr, err)
}
}
return nil
}
func logrusStartupSyncFields() logrus.Fields {
return logrus.Fields{"action": "startup_cluster_schema_sync"}
}
func isEmpty(schema *State) bool {
return schema == nil || schema.ObjectSchema == nil || len(schema.ObjectSchema.Classes) == 0
}
func (m *Manager) clusterSyncImpossibleBecauseRemoteNodeTooOld(err error) bool {
// string-matching on the error message isn't the cleanest way possible, but
// unfortunately there's not an easy way to find out, as this check has to
// work with whatever was already present in v1.16.x
//
// in theory we could have used the node api which also returns the versions,
// however, the node API depends on the DB which depends on the schema
// manager, so we cannot use them at schema manager startup which happens
// before db startup.
//
// Given that this workaround should only ever be required during a rolling
// update from v1.16 to v1.17, we can consider this acceptable
if strings.Contains(err.Error(), "unrecognized schema transaction type") {
m.logger.WithFields(logrusStartupSyncFields()).
Info("skipping schema cluster sync because not all nodes in the cluster " +
"support schema cluster sync yet. To enable schema cluster sync at startup " +
"make sure all nodes in the cluster run at least v1.17")
return true
}
return false
}
|