File size: 16,742 Bytes
b110593
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
//                           _       _
// __      _____  __ ___   ___  __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
//  \ V  V /  __/ (_| |\ V /| | (_| | ||  __/
//   \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
//  Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
//  CONTACT: [email protected]
//

package cluster

import (
	"context"
	"fmt"
	"sync"
	"time"

	"github.com/google/uuid"
	"github.com/pkg/errors"
	"github.com/sirupsen/logrus"
	"golang.org/x/exp/slices"
)

type TransactionType string

var (
	ErrConcurrentTransaction = errors.New("concurrent transaction")
	ErrInvalidTransaction    = errors.New("invalid transaction")
	ErrExpiredTransaction    = errors.New("transaction TTL expired")
	ErrNotReady              = errors.New("server is not ready: either starting up or shutting down")
)

type Remote interface {
	BroadcastTransaction(ctx context.Context, tx *Transaction) error
	BroadcastAbortTransaction(ctx context.Context, tx *Transaction) error
	BroadcastCommitTransaction(ctx context.Context, tx *Transaction) error
}

type (
	CommitFn   func(ctx context.Context, tx *Transaction) error
	ResponseFn func(ctx context.Context, tx *Transaction) ([]byte, error)
)

type TxManager struct {
	sync.Mutex
	logger logrus.FieldLogger

	currentTransaction        *Transaction
	currentTransactionContext context.Context
	clearTransaction          func()

	// any time we start working on a commit, we need to add to this WaitGroup.
	// It will block shutdwon until the commit has completed to make sure that we
	// can't accidentally shutdown while a tx is committing.
	ongoingCommits sync.WaitGroup

	// when a shutdown signal has been received, we will no longer accept any new
	// tx's or commits
	acceptIncoming bool

	// read transactions that need to run at start can still be served, they have
	// no side-effects on the node that accepts them.
	//
	// If we disallowed them completely, then two unready nodes would be in a
	// deadlock as they each require information from the other(s) who can't
	// answerbecause they're not ready.
	allowUnready []TransactionType

	remote     Remote
	commitFn   CommitFn
	responseFn ResponseFn

	// keep the ids of expired transactions around. This way, we can return a
	// nicer error message to the user. Instead of just an "invalid transaction"
	// which no longer exists, they will get an explicit error message mentioning
	// the timeout.
	expiredTxIDs []string

	persistence Persistence
}

func newDummyCommitResponseFn() func(ctx context.Context, tx *Transaction) error {
	return func(ctx context.Context, tx *Transaction) error {
		return nil
	}
}

func newDummyResponseFn() func(ctx context.Context, tx *Transaction) ([]byte, error) {
	return func(ctx context.Context, tx *Transaction) ([]byte, error) {
		return nil, nil
	}
}

func NewTxManager(remote Remote, persistence Persistence,

	logger logrus.FieldLogger,

) *TxManager {
	return &TxManager{
		remote: remote,

		// by setting dummy fns that do nothing on default it is possible to run
		// the tx manager with only one set of functions. For example, if the
		// specific Tx is only ever used for broadcasting writes, there is no need
		// to set a responseFn. However, if the fn was nil, we'd panic. Thus a
		// dummy function is a reasonable default - and much cleaner than a
		// nil-check on every call.
		commitFn:    newDummyCommitResponseFn(),
		responseFn:  newDummyResponseFn(),
		logger:      logger,
		persistence: persistence,

		// ready to serve incoming requests
		acceptIncoming: false,
	}
}

func (c *TxManager) StartAcceptIncoming() {
	c.Lock()
	defer c.Unlock()

	c.acceptIncoming = true
}

func (c *TxManager) SetAllowUnready(types []TransactionType) {
	c.Lock()
	defer c.Unlock()

	c.allowUnready = types
}

// HaveDanglingTxs is a way to check if there are any uncommitted transactions
// in the durable storage. This can be used to make decisions about whether a
// failed schema check can be temporarily ignored - with the assumption that
// applying the dangling txs will fix the issue.
func (c *TxManager) HaveDanglingTxs(ctx context.Context,
	allowedTypes []TransactionType,
) (found bool) {
	c.persistence.IterateAll(context.Background(), func(tx *Transaction) {
		if !slices.Contains(allowedTypes, tx.Type) {
			return
		}
		found = true
	})

	return
}

// TryResumeDanglingTxs loops over the existing transactions and applies them.
// It only does so if the transaction type is explicitly listed as allowed.
// This is because - at the time of creating this - we were not sure if all
// transaction commit functions are idempotent. If one would not be, then
// reapplying a tx or tx commit could potentially be dangerous, as we don't
// know if it was already applied prior to the node death.
//
// For example, think of a "add property 'foo'" tx, that does nothing but
// append the property to the schema. If this ran twice, we might now end up
// with two duplicate properties with the name 'foo' which could in turn create
// other problems. To make sure all txs are resumable (which is what we want
// because that's the only way to avoid schema issues), we need to make sure
// that every single tx is idempotent, then add them to the allow list.
//
// One other limitation is that this method currently does nothing to check if
// a tx was really committed or not. In an ideal world, the node would contact
// the other nodes and ask. However, this sipmler implementation does not do
// this check. Instead [HaveDanglingTxs] is used in combination with the schema
// check. If the schema is not out of sync in the first place, no txs will be
// applied. This does not cover all edge cases, but it seems to work for now.
// This should be improved in the future.
func (c *TxManager) TryResumeDanglingTxs(ctx context.Context,
	allowedTypes []TransactionType,
) (applied bool, err error) {
	c.persistence.IterateAll(context.Background(), func(tx *Transaction) {
		if !slices.Contains(allowedTypes, tx.Type) {
			c.logger.WithField("action", "resume_transaction").
				WithField("transaction_id", tx.ID).
				WithField("transaction_type", tx.Type).
				Warnf("dangling transaction %q of type %q is not known to be resumable - skipping",
					tx.ID, tx.Type)

			return
		}
		if err = c.commitFn(ctx, tx); err != nil {
			return
		}

		applied = true
		c.logger.WithField("action", "resume_transaction").
			WithField("transaction_id", tx.ID).
			WithField("transaction_type", tx.Type).
			Infof("successfully resumed dangling transaction %q of type %q",
				tx.ID, tx.Type)
	})

	return
}

func (c *TxManager) resetTxExpiry(ttl time.Duration, id string) {
	cancel := func() {}
	ctx := context.Background()
	if ttl == 0 {
		c.currentTransactionContext = context.Background()
	} else {
		ctx, cancel = context.WithTimeout(ctx, ttl)
		c.currentTransactionContext = ctx
	}

	// to prevent a goroutine leak for the new routine we're spawning here,
	// register a way to terminate it in case the explicit cancel is called
	// before the context's done channel fires.
	clearCancelListener := make(chan struct{}, 1)

	c.clearTransaction = func() {
		c.currentTransaction = nil
		c.currentTransactionContext = nil
		c.clearTransaction = func() {}

		clearCancelListener <- struct{}{}
		close(clearCancelListener)
	}

	go func(id string) {
		ctxDone := ctx.Done()
		select {
		case <-clearCancelListener:
			cancel()
			return
		case <-ctxDone:
			c.Lock()
			defer c.Unlock()
			c.expiredTxIDs = append(c.expiredTxIDs, id)

			if c.currentTransaction == nil {
				// tx is already cleaned up, for example from a successful commit. Nothing to do for us
				return
			}

			if c.currentTransaction.ID != id {
				// tx was already cleaned up, then a new tx was started. Any action from
				// us would be destructive, as we'd accidentally destroy a perfectly valid
				// tx
				return
			}

			c.clearTransaction()
		}
	}(id)
}

// expired is a helper to return a more meaningful error message to the user.
// Instead of just telling the user that an ID does not exist, this tracks that
// it once existed, but has been cleared because it expired.
//
// This method is not thread-safe as the assumption is that it is called from a
// thread-safe environment where a lock would already be held
func (c *TxManager) expired(id string) bool {
	for _, expired := range c.expiredTxIDs {
		if expired == id {
			return true
		}
	}

	return false
}

// SetCommitFn sets a function that is used in Write Transactions, you can
// read from the transaction payload and use that state to alter your local
// state
func (c *TxManager) SetCommitFn(fn CommitFn) {
	c.commitFn = fn
}

// SetResponseFn sets a function that is used in Read Transactions. The
// function sets the local state (by writing it into the Tx Payload). It can
// then be sent to other nodes. Consensus is not part of the ResponseFn. The
// coordinator - who initiated the Tx - is responsible for coming up with
// consensus. Deciding on Consensus requires insights into business logic, as
// from the TX's perspective payloads are opaque.
func (c *TxManager) SetResponseFn(fn ResponseFn) {
	c.responseFn = fn
}

// Begin a Transaction with the specified type and payload. Transactions expire
// after the specified TTL. For a transaction that does not ever expire, pass
// in a ttl of 0. When choosing TTLs keep in mind that clocks might be slightly
// skewed in the cluster, therefore set your TTL for desiredTTL +
// toleratedClockSkew
//
// Regular transactions cannot be opened if the cluster is not considered
// healthy.
func (c *TxManager) BeginTransaction(ctx context.Context, trType TransactionType,
	payload interface{}, ttl time.Duration,
) (*Transaction, error) {
	return c.beginTransaction(ctx, trType, payload, ttl, false)
}

// Begin a Transaction that does not require the whole cluster to be healthy.
// This can be used for example in bootstrapping situations when not all nodes
// are present yet, or in disaster recovery situations when a node needs to run
// a transaction in order to re-join a cluster.
func (c *TxManager) BeginTransactionTolerateNodeFailures(ctx context.Context, trType TransactionType,
	payload interface{}, ttl time.Duration,
) (*Transaction, error) {
	return c.beginTransaction(ctx, trType, payload, ttl, true)
}

func (c *TxManager) beginTransaction(ctx context.Context, trType TransactionType,
	payload interface{}, ttl time.Duration, tolerateNodeFailures bool,
) (*Transaction, error) {
	c.Lock()

	if c.currentTransaction != nil {
		c.Unlock()
		return nil, ErrConcurrentTransaction
	}

	tx := &Transaction{
		Type:                 trType,
		ID:                   uuid.New().String(),
		Payload:              payload,
		TolerateNodeFailures: tolerateNodeFailures,
	}
	if ttl > 0 {
		tx.Deadline = time.Now().Add(ttl)
	} else {
		// UnixTime == 0 represents unlimited
		tx.Deadline = time.UnixMilli(0)
	}
	c.currentTransaction = tx
	c.Unlock()

	c.resetTxExpiry(ttl, c.currentTransaction.ID)

	if err := c.remote.BroadcastTransaction(ctx, tx); err != nil {
		// we could not open the transaction on every node, therefore we need to
		// abort it everywhere.

		if err := c.remote.BroadcastAbortTransaction(ctx, tx); err != nil {
			c.logger.WithFields(logrus.Fields{
				"action": "broadcast_abort_transaction",
				// before https://github.com/weaviate/weaviate/issues/2625 the next
				// line would read
				//
				// "id": c.currentTransaction.ID
				//
				// which had the potential for races. The tx itself is immutable and
				// therefore always thread-safe. However, the association between the tx
				// manager and the current tx is mutable, therefore the
				// c.currentTransaction pointer could be nil (nil pointer panic) or
				// point to another tx (incorrect log).
				"id": tx.ID,
			}).WithError(err).Errorf("broadcast tx abort failed")
		}

		c.Lock()
		c.clearTransaction()
		c.Unlock()

		return nil, errors.Wrap(err, "broadcast open transaction")
	}

	c.Lock()
	defer c.Unlock()
	return c.currentTransaction, nil
}

func (c *TxManager) CommitWriteTransaction(ctx context.Context,
	tx *Transaction,
) error {
	c.Lock()

	if !c.acceptIncoming {
		c.Unlock()
		return ErrNotReady
	}

	if c.currentTransaction == nil || c.currentTransaction.ID != tx.ID {
		expired := c.expired(tx.ID)
		c.Unlock()
		if expired {
			return ErrExpiredTransaction
		}
		return ErrInvalidTransaction
	}

	c.Unlock()

	// now that we know we are dealing with a valid transaction: no  matter the
	// outcome, after this call, we should not have a local transaction anymore
	defer func() {
		c.Lock()
		c.clearTransaction()
		c.Unlock()
	}()

	if err := c.remote.BroadcastCommitTransaction(ctx, tx); err != nil {
		// the broadcast failed, but we can't do anything about it. If we would
		// broadcast an "abort" now (as a previous version did) we'd likely run
		// into an inconsistency down the line. Network requests have variable
		// time, so there's a chance some nodes would see the abort before the
		// commit and vice-versa. Given enough nodes, we would end up with an
		// inconsistent state.
		//
		// A failed commit means the node that didn't receive the commit needs to
		// figure out itself how to get back to the correct state (e.g. by
		// recovering from a persisted tx), don't jeopardize all the other nodes as
		// a result!
		return errors.Wrap(err, "broadcast commit transaction")
	}

	return nil
}

func (c *TxManager) IncomingBeginTransaction(ctx context.Context,
	tx *Transaction,
) ([]byte, error) {
	c.Lock()
	defer c.Unlock()

	if !c.acceptIncoming && !slices.Contains(c.allowUnready, tx.Type) {
		return nil, ErrNotReady
	}

	if c.currentTransaction != nil && c.currentTransaction.ID != tx.ID {
		return nil, ErrConcurrentTransaction
	}

	if err := c.persistence.StoreTx(ctx, tx); err != nil {
		return nil, fmt.Errorf("make tx durable: %w", err)
	}

	c.currentTransaction = tx
	data, err := c.responseFn(ctx, tx)
	if err != nil {
		return nil, err
	}
	var ttl time.Duration
	if tx.Deadline.UnixMilli() != 0 {
		ttl = time.Until(tx.Deadline)
	}
	c.resetTxExpiry(ttl, tx.ID)

	return data, nil
}

func (c *TxManager) IncomingAbortTransaction(ctx context.Context,
	tx *Transaction,
) {
	c.Lock()
	defer c.Unlock()

	if c.currentTransaction == nil || c.currentTransaction.ID != tx.ID {
		// don't do anything
		return
	}

	c.currentTransaction = nil
	if err := c.persistence.DeleteTx(ctx, tx.ID); err != nil {
		c.logger.WithError(err).Errorf("abort tx: %s", err)
	}
}

func (c *TxManager) IncomingCommitTransaction(ctx context.Context,
	tx *Transaction,
) error {
	c.ongoingCommits.Add(1)
	defer c.ongoingCommits.Done()

	c.Lock()
	defer c.Unlock()

	if !c.acceptIncoming {
		return ErrNotReady
	}

	if c.currentTransaction == nil || c.currentTransaction.ID != tx.ID {
		expired := c.expired(tx.ID)
		if expired {
			return ErrExpiredTransaction
		}
		return ErrInvalidTransaction
	}

	// use transaction from cache, not passed in for two reason: a. protect
	// against the transaction being manipulated after being created, b. allow
	// an "empty" transaction that only contains the id for less network overhead
	// (we don't need to pass the payload around anymore, after it's successfully
	// opened - every node has a copy of the payload now)
	err := c.commitFn(ctx, c.currentTransaction)
	if err != nil {
		return err
	}

	// TODO: only clean up on success - does this make sense?
	c.currentTransaction = nil

	if err := c.persistence.DeleteTx(ctx, tx.ID); err != nil {
		return fmt.Errorf("close tx on disk: %w", err)
	}

	return nil
}

func (c *TxManager) Shutdown() {
	c.Lock()
	c.acceptIncoming = false
	c.Unlock()

	c.ongoingCommits.Wait()
}

type Transaction struct {
	ID       string
	Type     TransactionType
	Payload  interface{}
	Deadline time.Time

	// If TolerateNodeFailures is false (the default) a transaction cannot be
	// opened or committed if a node is confirmed dead. If a node is only
	// suspected dead, the TxManager will try, but abort unless all nodes ACK.
	TolerateNodeFailures bool
}

type Persistence interface {
	StoreTx(ctx context.Context, tx *Transaction) error
	DeleteTx(ctx context.Context, txID string) error
	IterateAll(ctx context.Context, cb func(tx *Transaction)) error
}