Spaces:
Running
Running
File size: 4,802 Bytes
b110593 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package cluster
import (
"context"
"fmt"
"time"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)
type TxBroadcaster struct {
state MemberLister
client Client
consensusFn ConsensusFn
ideal *IdealClusterState
}
// The Broadcaster is the link between the the current node and all other nodes
// during a tx operation. This makes it a natural place to inject a consensus
// function for read transactions. How consensus is reached is completely opaque
// to the broadcaster and can be controlled through custom business logic.
type ConsensusFn func(ctx context.Context,
in []*Transaction) (*Transaction, error)
type Client interface {
OpenTransaction(ctx context.Context, host string, tx *Transaction) error
AbortTransaction(ctx context.Context, host string, tx *Transaction) error
CommitTransaction(ctx context.Context, host string, tx *Transaction) error
}
type MemberLister interface {
AllNames() []string
Hostnames() []string
}
func NewTxBroadcaster(state MemberLister, client Client) *TxBroadcaster {
ideal := NewIdealClusterState(state)
return &TxBroadcaster{
state: state,
client: client,
ideal: ideal,
}
}
func (t *TxBroadcaster) SetConsensusFunction(fn ConsensusFn) {
t.consensusFn = fn
}
func (t *TxBroadcaster) BroadcastTransaction(rootCtx context.Context, tx *Transaction) error {
if !tx.TolerateNodeFailures {
if err := t.ideal.Validate(); err != nil {
return fmt.Errorf("tx does not tolerate node failures: %w", err)
}
}
hosts := t.state.Hostnames()
resTx := make([]*Transaction, len(hosts))
eg := &errgroup.Group{}
for i, host := range hosts {
i := i // https://golang.org/doc/faq#closures_and_goroutines
host := host // https://golang.org/doc/faq#closures_and_goroutines
eg.Go(func() error {
// make sure we don't block forever if the caller passes in an unlimited
// context. If another node does not respond within the timeout, consider
// the tx open attempt failed.
ctx, cancel := context.WithTimeout(rootCtx, 30*time.Second)
defer cancel()
// the client call can mutate the tx, so we need to work with copies to
// prevent a race and to be able to keep all individual results, so they
// can be passed to the consensus fn
resTx[i] = copyTx(tx)
if err := t.client.OpenTransaction(ctx, host, resTx[i]); err != nil {
return errors.Wrapf(err, "host %q", host)
}
return nil
})
}
err := eg.Wait()
if err != nil {
return err
}
if t.consensusFn != nil {
merged, err := t.consensusFn(rootCtx, resTx)
if err != nil {
return fmt.Errorf("try to reach consenus: %w", err)
}
if merged != nil {
tx.Payload = merged.Payload
}
}
return nil
}
func (t *TxBroadcaster) BroadcastAbortTransaction(rootCtx context.Context, tx *Transaction) error {
eg := &errgroup.Group{}
for _, host := range t.state.Hostnames() {
host := host // https://golang.org/doc/faq#closures_and_goroutines
eg.Go(func() error {
// make sure we don't block forever if the caller passes in an unlimited
// context. If another node does not respond within the timeout, consider
// the tx abort attempt failed.
ctx, cancel := context.WithTimeout(rootCtx, 30*time.Second)
defer cancel()
if err := t.client.AbortTransaction(ctx, host, tx); err != nil {
return errors.Wrapf(err, "host %q", host)
}
return nil
})
}
return eg.Wait()
}
func (t *TxBroadcaster) BroadcastCommitTransaction(rootCtx context.Context, tx *Transaction) error {
if !tx.TolerateNodeFailures {
if err := t.ideal.Validate(); err != nil {
return fmt.Errorf("tx does not tolerate node failures: %w", err)
}
}
eg := &errgroup.Group{}
for _, host := range t.state.Hostnames() {
// make sure we don't block forever if the caller passes in an unlimited
// context. If another node does not respond within the timeout, consider
// the tx commit attempt failed.
ctx, cancel := context.WithTimeout(rootCtx, 30*time.Second)
defer cancel()
host := host // https://golang.org/doc/faq#closures_and_goroutines
eg.Go(func() error {
if err := t.client.CommitTransaction(ctx, host, tx); err != nil {
return errors.Wrapf(err, "host %q", host)
}
return nil
})
}
return eg.Wait()
}
func copyTx(in *Transaction) *Transaction {
out := *in
return &out
}
|