Spaces:
Sleeping
Sleeping
| // _ _ | |
| // __ _____ __ ___ ___ __ _| |_ ___ | |
| // \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
| // \ V V / __/ (_| |\ V /| | (_| | || __/ | |
| // \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
| // | |
| // Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. | |
| // | |
| // CONTACT: [email protected] | |
| // | |
| package cluster | |
| import ( | |
| "context" | |
| "fmt" | |
| "time" | |
| "github.com/pkg/errors" | |
| "golang.org/x/sync/errgroup" | |
| ) | |
| type TxBroadcaster struct { | |
| state MemberLister | |
| client Client | |
| consensusFn ConsensusFn | |
| ideal *IdealClusterState | |
| } | |
| // The Broadcaster is the link between the the current node and all other nodes | |
| // during a tx operation. This makes it a natural place to inject a consensus | |
| // function for read transactions. How consensus is reached is completely opaque | |
| // to the broadcaster and can be controlled through custom business logic. | |
| type ConsensusFn func(ctx context.Context, | |
| in []*Transaction) (*Transaction, error) | |
| type Client interface { | |
| OpenTransaction(ctx context.Context, host string, tx *Transaction) error | |
| AbortTransaction(ctx context.Context, host string, tx *Transaction) error | |
| CommitTransaction(ctx context.Context, host string, tx *Transaction) error | |
| } | |
| type MemberLister interface { | |
| AllNames() []string | |
| Hostnames() []string | |
| } | |
| func NewTxBroadcaster(state MemberLister, client Client) *TxBroadcaster { | |
| ideal := NewIdealClusterState(state) | |
| return &TxBroadcaster{ | |
| state: state, | |
| client: client, | |
| ideal: ideal, | |
| } | |
| } | |
| func (t *TxBroadcaster) SetConsensusFunction(fn ConsensusFn) { | |
| t.consensusFn = fn | |
| } | |
| func (t *TxBroadcaster) BroadcastTransaction(rootCtx context.Context, tx *Transaction) error { | |
| if !tx.TolerateNodeFailures { | |
| if err := t.ideal.Validate(); err != nil { | |
| return fmt.Errorf("tx does not tolerate node failures: %w", err) | |
| } | |
| } | |
| hosts := t.state.Hostnames() | |
| resTx := make([]*Transaction, len(hosts)) | |
| eg := &errgroup.Group{} | |
| for i, host := range hosts { | |
| i := i // https://golang.org/doc/faq#closures_and_goroutines | |
| host := host // https://golang.org/doc/faq#closures_and_goroutines | |
| eg.Go(func() error { | |
| // make sure we don't block forever if the caller passes in an unlimited | |
| // context. If another node does not respond within the timeout, consider | |
| // the tx open attempt failed. | |
| ctx, cancel := context.WithTimeout(rootCtx, 30*time.Second) | |
| defer cancel() | |
| // the client call can mutate the tx, so we need to work with copies to | |
| // prevent a race and to be able to keep all individual results, so they | |
| // can be passed to the consensus fn | |
| resTx[i] = copyTx(tx) | |
| if err := t.client.OpenTransaction(ctx, host, resTx[i]); err != nil { | |
| return errors.Wrapf(err, "host %q", host) | |
| } | |
| return nil | |
| }) | |
| } | |
| err := eg.Wait() | |
| if err != nil { | |
| return err | |
| } | |
| if t.consensusFn != nil { | |
| merged, err := t.consensusFn(rootCtx, resTx) | |
| if err != nil { | |
| return fmt.Errorf("try to reach consenus: %w", err) | |
| } | |
| if merged != nil { | |
| tx.Payload = merged.Payload | |
| } | |
| } | |
| return nil | |
| } | |
| func (t *TxBroadcaster) BroadcastAbortTransaction(rootCtx context.Context, tx *Transaction) error { | |
| eg := &errgroup.Group{} | |
| for _, host := range t.state.Hostnames() { | |
| host := host // https://golang.org/doc/faq#closures_and_goroutines | |
| eg.Go(func() error { | |
| // make sure we don't block forever if the caller passes in an unlimited | |
| // context. If another node does not respond within the timeout, consider | |
| // the tx abort attempt failed. | |
| ctx, cancel := context.WithTimeout(rootCtx, 30*time.Second) | |
| defer cancel() | |
| if err := t.client.AbortTransaction(ctx, host, tx); err != nil { | |
| return errors.Wrapf(err, "host %q", host) | |
| } | |
| return nil | |
| }) | |
| } | |
| return eg.Wait() | |
| } | |
| func (t *TxBroadcaster) BroadcastCommitTransaction(rootCtx context.Context, tx *Transaction) error { | |
| if !tx.TolerateNodeFailures { | |
| if err := t.ideal.Validate(); err != nil { | |
| return fmt.Errorf("tx does not tolerate node failures: %w", err) | |
| } | |
| } | |
| eg := &errgroup.Group{} | |
| for _, host := range t.state.Hostnames() { | |
| // make sure we don't block forever if the caller passes in an unlimited | |
| // context. If another node does not respond within the timeout, consider | |
| // the tx commit attempt failed. | |
| ctx, cancel := context.WithTimeout(rootCtx, 30*time.Second) | |
| defer cancel() | |
| host := host // https://golang.org/doc/faq#closures_and_goroutines | |
| eg.Go(func() error { | |
| if err := t.client.CommitTransaction(ctx, host, tx); err != nil { | |
| return errors.Wrapf(err, "host %q", host) | |
| } | |
| return nil | |
| }) | |
| } | |
| return eg.Wait() | |
| } | |
| func copyTx(in *Transaction) *Transaction { | |
| out := *in | |
| return &out | |
| } | |