File size: 8,419 Bytes
b110593
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
//                           _       _
// __      _____  __ ___   ___  __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
//  \ V  V /  __/ (_| |\ V /| | (_| | ||  __/
//   \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
//  Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
//  CONTACT: [email protected]
//

package schema

import (
	"context"
	"fmt"
	"strings"

	"github.com/sirupsen/logrus"
	"github.com/weaviate/weaviate/entities/models"
)

// startupClusterSync tries to determine what - if any - schema migration is
// required at startup. If a node is the first in a cluster the assumption is
// that its state is the truth.
//
// For the n-th node (where n>1) there is a potential for conflict if the
// schemas aren't in sync:
//
// - If Node 1 has a non-nil schema, but Node 2 has a nil-schema, then we can
// consider Node 2 to be a new node that is just joining the cluster. In this
// case, we can copy the state from the existing nodes (if they agree on a
// schema)
//
// - If Node 1 and Node 2 have an identical schema, then we can assume that the
// startup was just an ordinary (re)start of the node. No action is required.
//
// - If Node 1 and Node 2 both have a schema, but they aren't in sync, the
// cluster is broken. This state cannot be automatically recovered from and
// startup needs to fail. Manual intervention would be required in this case.
func (m *Manager) startupClusterSync(ctx context.Context) error {
	nodes := m.clusterState.AllNames()
	if len(nodes) <= 1 {
		return m.startupHandleSingleNode(ctx, nodes)
	}

	if m.schemaCache.isEmpty() {
		return m.startupJoinCluster(ctx)
	}

	err := m.validateSchemaCorruption(ctx)
	if err == nil {
		// schema is fine, we are done
		return nil
	}

	if m.clusterState.SchemaSyncIgnored() {
		m.logger.WithError(err).WithFields(logrusStartupSyncFields()).
			Warning("schema out of sync, but ignored because " +
				"CLUSTER_IGNORE_SCHEMA_SYNC=true")
		return nil
	}

	if m.cluster.HaveDanglingTxs(ctx, resumableTxs) {
		m.logger.WithFields(logrusStartupSyncFields()).
			Infof("schema out of sync, but there are dangling transactions, the check will be repeated after an attempt to resume those transactions")

		m.LockGuard(func() {
			m.shouldTryToResumeTx = true
		})
		return nil
	}

	return err
}

// startupHandleSingleNode deals with the case where there is only a single
// node in the cluster. In the vast majority of cases there is nothing to do.
// An edge case would be where the cluster has size=0, or size=1 but the node's
// name is not the local name's node. This would indicate a broken cluster and
// can't be recovered from
func (m *Manager) startupHandleSingleNode(ctx context.Context,
	nodes []string,
) error {
	localName := m.clusterState.LocalName()
	if len(nodes) == 0 {
		return fmt.Errorf("corrupt cluster state: cluster has size=0")
	}

	if nodes[0] != localName {
		return fmt.Errorf("corrupt cluster state: only node in the cluster does not "+
			"match local node name: %v vs %s", nodes, localName)
	}

	m.logger.WithFields(logrusStartupSyncFields()).
		Debug("Only node in the cluster at this point. " +
			"No schema sync necessary.")

	// startup is complete
	return nil
}

// startupJoinCluster migrates the schema for a new node. The assumption is
// that other nodes have schema state and we need to migrate this schema to the
// local node transactionally. In other words, this startup process can not
// occur concurrently with a user-initiated schema update. One of those must
// fail.
//
// There is one edge case: The cluster could consist of multiple nodes which
// are empty. In this case, no migration is required.
func (m *Manager) startupJoinCluster(ctx context.Context) error {
	tx, err := m.cluster.BeginTransaction(ctx, ReadSchema, nil, DefaultTxTTL)
	if err != nil {
		if m.clusterSyncImpossibleBecauseRemoteNodeTooOld(err) {
			return nil
		}
		return fmt.Errorf("read schema: open transaction: %w", err)
	}

	// this tx is read-only, so we don't have to worry about aborting it, the
	// close should be the same on both happy and unhappy path
	defer m.cluster.CloseReadTransaction(ctx, tx)

	pl, ok := tx.Payload.(ReadSchemaPayload)
	if !ok {
		return fmt.Errorf("unrecognized tx response payload: %T", tx.Payload)
	}

	// by the time we're here the consensus function has run, so we can be sure
	// that all other nodes agree on this schema.

	if isEmpty(pl.Schema) {
		// already in sync, nothing to do
		return nil
	}

	if err := m.saveSchema(ctx, *pl.Schema); err != nil {
		return fmt.Errorf("save schema: %w", err)
	}

	m.schemaCache.setState(*pl.Schema)

	return nil
}

func (m *Manager) ClusterStatus(ctx context.Context) (*models.SchemaClusterStatus, error) {
	m.RLock()
	defer m.RUnlock()

	out := &models.SchemaClusterStatus{
		Hostname:         m.clusterState.LocalName(),
		IgnoreSchemaSync: m.clusterState.SchemaSyncIgnored(),
	}

	nodes := m.clusterState.AllNames()
	out.NodeCount = int64(len(nodes))
	if len(nodes) < 2 {
		out.Healthy = true
		return out, nil
	}

	err := m.validateSchemaCorruption(ctx)
	if err != nil {
		out.Error = err.Error()
		out.Healthy = false
		return out, err
	}

	out.Healthy = true
	return out, nil
}

// validateSchemaCorruption makes sure that - given that all nodes in the
// cluster have a schema - they are in sync. If not the cluster is considered
// broken and needs to be repaired manually
func (m *Manager) validateSchemaCorruption(ctx context.Context) error {
	tx, err := m.cluster.BeginTransaction(ctx, ReadSchema, nil, DefaultTxTTL)
	if err != nil {
		if m.clusterSyncImpossibleBecauseRemoteNodeTooOld(err) {
			return nil
		}
		return fmt.Errorf("read schema: open transaction: %w", err)
	}

	// this tx is read-only, so we don't have to worry about aborting it, the
	// close should be the same on both happy and unhappy path
	if err = m.cluster.CloseReadTransaction(ctx, tx); err != nil {
		return err
	}

	pl, ok := tx.Payload.(ReadSchemaPayload)
	if !ok {
		return fmt.Errorf("unrecognized tx response payload: %T", tx.Payload)
	}
	var diff []string
	cmp := func() error {
		if err := Equal(&m.schemaCache.State, pl.Schema); err != nil {
			diff = Diff("local", &m.schemaCache.State, "cluster", pl.Schema)
			return err
		}
		return nil
	}
	if err := m.schemaCache.RLockGuard(cmp); err != nil {
		m.logger.WithFields(logrusStartupSyncFields()).WithFields(logrus.Fields{
			"diff": diff,
		}).Warning("mismatch between local schema and remote (other nodes consensus) schema")
		if m.clusterState.SkipSchemaRepair() {
			return fmt.Errorf("corrupt cluster: other nodes have consensus on schema, "+
				"but local node has a different (non-null) schema: %w", err)
		}
		if repairErr := m.repairSchema(ctx, pl.Schema); repairErr != nil {
			return fmt.Errorf("attempted to repair and failed: %v, sync error: %w", repairErr, err)
		}
	}

	return nil
}

func logrusStartupSyncFields() logrus.Fields {
	return logrus.Fields{"action": "startup_cluster_schema_sync"}
}

func isEmpty(schema *State) bool {
	return schema == nil || schema.ObjectSchema == nil || len(schema.ObjectSchema.Classes) == 0
}

func (m *Manager) clusterSyncImpossibleBecauseRemoteNodeTooOld(err error) bool {
	// string-matching on the error message isn't the cleanest way possible, but
	// unfortunately there's not an easy way to find out, as this check has to
	// work with whatever was already present in v1.16.x
	//
	// in theory we could have used the node api which also returns the versions,
	// however, the node API depends on the DB which depends on the schema
	// manager, so we cannot use them at schema manager startup which happens
	// before db startup.
	//
	// Given that this workaround should only ever be required during a rolling
	// update from v1.16 to v1.17, we can consider this acceptable
	if strings.Contains(err.Error(), "unrecognized schema transaction type") {
		m.logger.WithFields(logrusStartupSyncFields()).
			Info("skipping schema cluster sync because not all nodes in the cluster " +
				"support schema cluster sync yet. To enable schema cluster sync at startup " +
				"make sure all nodes in the cluster run at least v1.17")
		return true
	}

	return false
}