File size: 4,802 Bytes
b110593
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
//                           _       _
// __      _____  __ ___   ___  __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
//  \ V  V /  __/ (_| |\ V /| | (_| | ||  __/
//   \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
//  Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
//  CONTACT: [email protected]
//

package cluster

import (
	"context"
	"fmt"
	"time"

	"github.com/pkg/errors"
	"golang.org/x/sync/errgroup"
)

type TxBroadcaster struct {
	state       MemberLister
	client      Client
	consensusFn ConsensusFn
	ideal       *IdealClusterState
}

// The Broadcaster is the link between the the current node and all other nodes
// during a tx operation. This makes it a natural place to inject a consensus
// function for read transactions. How consensus is reached is completely opaque
// to the broadcaster and can be controlled through custom business logic.
type ConsensusFn func(ctx context.Context,

	in []*Transaction) (*Transaction, error)

type Client interface {
	OpenTransaction(ctx context.Context, host string, tx *Transaction) error
	AbortTransaction(ctx context.Context, host string, tx *Transaction) error
	CommitTransaction(ctx context.Context, host string, tx *Transaction) error
}

type MemberLister interface {
	AllNames() []string
	Hostnames() []string
}

func NewTxBroadcaster(state MemberLister, client Client) *TxBroadcaster {
	ideal := NewIdealClusterState(state)
	return &TxBroadcaster{
		state:  state,
		client: client,
		ideal:  ideal,
	}
}

func (t *TxBroadcaster) SetConsensusFunction(fn ConsensusFn) {
	t.consensusFn = fn
}

func (t *TxBroadcaster) BroadcastTransaction(rootCtx context.Context, tx *Transaction) error {
	if !tx.TolerateNodeFailures {
		if err := t.ideal.Validate(); err != nil {
			return fmt.Errorf("tx does not tolerate node failures: %w", err)
		}
	}

	hosts := t.state.Hostnames()
	resTx := make([]*Transaction, len(hosts))
	eg := &errgroup.Group{}
	for i, host := range hosts {
		i := i       // https://golang.org/doc/faq#closures_and_goroutines
		host := host // https://golang.org/doc/faq#closures_and_goroutines

		eg.Go(func() error {
			// make sure we don't block forever if the caller passes in an unlimited
			// context. If another node does not respond within the timeout, consider
			// the tx open attempt failed.
			ctx, cancel := context.WithTimeout(rootCtx, 30*time.Second)
			defer cancel()

			// the client call can mutate the tx, so we need to work with copies to
			// prevent a race and to be able to keep all individual results, so they
			// can be passed to the consensus fn
			resTx[i] = copyTx(tx)
			if err := t.client.OpenTransaction(ctx, host, resTx[i]); err != nil {
				return errors.Wrapf(err, "host %q", host)
			}

			return nil
		})
	}

	err := eg.Wait()
	if err != nil {
		return err
	}

	if t.consensusFn != nil {
		merged, err := t.consensusFn(rootCtx, resTx)
		if err != nil {
			return fmt.Errorf("try to reach consenus: %w", err)
		}

		if merged != nil {
			tx.Payload = merged.Payload
		}
	}

	return nil
}

func (t *TxBroadcaster) BroadcastAbortTransaction(rootCtx context.Context, tx *Transaction) error {
	eg := &errgroup.Group{}
	for _, host := range t.state.Hostnames() {
		host := host // https://golang.org/doc/faq#closures_and_goroutines
		eg.Go(func() error {
			// make sure we don't block forever if the caller passes in an unlimited
			// context. If another node does not respond within the timeout, consider
			// the tx abort attempt failed.
			ctx, cancel := context.WithTimeout(rootCtx, 30*time.Second)
			defer cancel()

			if err := t.client.AbortTransaction(ctx, host, tx); err != nil {
				return errors.Wrapf(err, "host %q", host)
			}

			return nil
		})
	}

	return eg.Wait()
}

func (t *TxBroadcaster) BroadcastCommitTransaction(rootCtx context.Context, tx *Transaction) error {
	if !tx.TolerateNodeFailures {
		if err := t.ideal.Validate(); err != nil {
			return fmt.Errorf("tx does not tolerate node failures: %w", err)
		}
	}
	eg := &errgroup.Group{}
	for _, host := range t.state.Hostnames() {
		// make sure we don't block forever if the caller passes in an unlimited
		// context. If another node does not respond within the timeout, consider
		// the tx commit attempt failed.
		ctx, cancel := context.WithTimeout(rootCtx, 30*time.Second)
		defer cancel()
		host := host // https://golang.org/doc/faq#closures_and_goroutines
		eg.Go(func() error {
			if err := t.client.CommitTransaction(ctx, host, tx); err != nil {
				return errors.Wrapf(err, "host %q", host)
			}

			return nil
		})
	}

	return eg.Wait()
}

func copyTx(in *Transaction) *Transaction {
	out := *in
	return &out
}