SemanticSearchPOC / usecases /cluster /transactions_broadcast.go
KevinStephenson
Adding in weaviate code
b110593
raw
history blame
4.8 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package cluster
import (
"context"
"fmt"
"time"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)
type TxBroadcaster struct {
state MemberLister
client Client
consensusFn ConsensusFn
ideal *IdealClusterState
}
// The Broadcaster is the link between the the current node and all other nodes
// during a tx operation. This makes it a natural place to inject a consensus
// function for read transactions. How consensus is reached is completely opaque
// to the broadcaster and can be controlled through custom business logic.
type ConsensusFn func(ctx context.Context,
in []*Transaction) (*Transaction, error)
type Client interface {
OpenTransaction(ctx context.Context, host string, tx *Transaction) error
AbortTransaction(ctx context.Context, host string, tx *Transaction) error
CommitTransaction(ctx context.Context, host string, tx *Transaction) error
}
type MemberLister interface {
AllNames() []string
Hostnames() []string
}
func NewTxBroadcaster(state MemberLister, client Client) *TxBroadcaster {
ideal := NewIdealClusterState(state)
return &TxBroadcaster{
state: state,
client: client,
ideal: ideal,
}
}
func (t *TxBroadcaster) SetConsensusFunction(fn ConsensusFn) {
t.consensusFn = fn
}
func (t *TxBroadcaster) BroadcastTransaction(rootCtx context.Context, tx *Transaction) error {
if !tx.TolerateNodeFailures {
if err := t.ideal.Validate(); err != nil {
return fmt.Errorf("tx does not tolerate node failures: %w", err)
}
}
hosts := t.state.Hostnames()
resTx := make([]*Transaction, len(hosts))
eg := &errgroup.Group{}
for i, host := range hosts {
i := i // https://golang.org/doc/faq#closures_and_goroutines
host := host // https://golang.org/doc/faq#closures_and_goroutines
eg.Go(func() error {
// make sure we don't block forever if the caller passes in an unlimited
// context. If another node does not respond within the timeout, consider
// the tx open attempt failed.
ctx, cancel := context.WithTimeout(rootCtx, 30*time.Second)
defer cancel()
// the client call can mutate the tx, so we need to work with copies to
// prevent a race and to be able to keep all individual results, so they
// can be passed to the consensus fn
resTx[i] = copyTx(tx)
if err := t.client.OpenTransaction(ctx, host, resTx[i]); err != nil {
return errors.Wrapf(err, "host %q", host)
}
return nil
})
}
err := eg.Wait()
if err != nil {
return err
}
if t.consensusFn != nil {
merged, err := t.consensusFn(rootCtx, resTx)
if err != nil {
return fmt.Errorf("try to reach consenus: %w", err)
}
if merged != nil {
tx.Payload = merged.Payload
}
}
return nil
}
func (t *TxBroadcaster) BroadcastAbortTransaction(rootCtx context.Context, tx *Transaction) error {
eg := &errgroup.Group{}
for _, host := range t.state.Hostnames() {
host := host // https://golang.org/doc/faq#closures_and_goroutines
eg.Go(func() error {
// make sure we don't block forever if the caller passes in an unlimited
// context. If another node does not respond within the timeout, consider
// the tx abort attempt failed.
ctx, cancel := context.WithTimeout(rootCtx, 30*time.Second)
defer cancel()
if err := t.client.AbortTransaction(ctx, host, tx); err != nil {
return errors.Wrapf(err, "host %q", host)
}
return nil
})
}
return eg.Wait()
}
func (t *TxBroadcaster) BroadcastCommitTransaction(rootCtx context.Context, tx *Transaction) error {
if !tx.TolerateNodeFailures {
if err := t.ideal.Validate(); err != nil {
return fmt.Errorf("tx does not tolerate node failures: %w", err)
}
}
eg := &errgroup.Group{}
for _, host := range t.state.Hostnames() {
// make sure we don't block forever if the caller passes in an unlimited
// context. If another node does not respond within the timeout, consider
// the tx commit attempt failed.
ctx, cancel := context.WithTimeout(rootCtx, 30*time.Second)
defer cancel()
host := host // https://golang.org/doc/faq#closures_and_goroutines
eg.Go(func() error {
if err := t.client.CommitTransaction(ctx, host, tx); err != nil {
return errors.Wrapf(err, "host %q", host)
}
return nil
})
}
return eg.Wait()
}
func copyTx(in *Transaction) *Transaction {
out := *in
return &out
}