Spaces:
Running
Running
// _ _ | |
// __ _____ __ ___ ___ __ _| |_ ___ | |
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
// \ V V / __/ (_| |\ V /| | (_| | || __/ | |
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
// | |
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. | |
// | |
// CONTACT: [email protected] | |
// | |
package replica | |
import ( | |
"context" | |
"fmt" | |
"sync" | |
"github.com/sirupsen/logrus" | |
) | |
type ( | |
// readyOp asks a replica if it is ready to commit | |
readyOp func(_ context.Context, host, requestID string) error | |
// readyOp asks a replica to execute the actual operation | |
commitOp[T any] func(_ context.Context, host, requestID string) (T, error) | |
// readOp defines a generic read operation | |
readOp[T any] func(_ context.Context, host string, fullRead bool) (T, error) | |
// coordinator coordinates replication of write and read requests | |
coordinator[T any] struct { | |
Client | |
Resolver *resolver // node_name -> host_address | |
log logrus.FieldLogger | |
Class string | |
Shard string | |
TxID string // transaction ID | |
} | |
) | |
// newCoordinator used by the replicator | |
func newCoordinator[T any](r *Replicator, shard, requestID string, l logrus.FieldLogger, | |
) *coordinator[T] { | |
return &coordinator[T]{ | |
Client: r.client, | |
Resolver: r.resolver, | |
log: l, | |
Class: r.class, | |
Shard: shard, | |
TxID: requestID, | |
} | |
} | |
// newCoordinator used by the Finder to read objects from replicas | |
func newReadCoordinator[T any](f *Finder, shard string) *coordinator[T] { | |
return &coordinator[T]{ | |
Resolver: f.resolver, | |
Class: f.class, | |
Shard: shard, | |
} | |
} | |
// broadcast sends write request to all replicas (first phase of a two-phase commit) | |
func (c *coordinator[T]) broadcast(ctx context.Context, | |
replicas []string, | |
op readyOp, level int, | |
) <-chan string { | |
// prepare tells replicas to be ready | |
prepare := func() <-chan _Result[string] { | |
resChan := make(chan _Result[string], len(replicas)) | |
go func() { // broadcast | |
defer close(resChan) | |
var wg sync.WaitGroup | |
wg.Add(len(replicas)) | |
for _, replica := range replicas { | |
go func(replica string, candidateCh chan<- _Result[string]) error { | |
defer wg.Done() | |
err := op(ctx, replica, c.TxID) | |
candidateCh <- _Result[string]{replica, err} | |
return err | |
}(replica, resChan) | |
} | |
wg.Wait() | |
}() | |
return resChan | |
} | |
// handle responses to prepare requests | |
replicaCh := make(chan string, len(replicas)) | |
go func(level int) { | |
defer close(replicaCh) | |
actives := make([]string, 0, level) // cache for active replicas | |
for r := range prepare() { | |
if r.Err != nil { // connection error | |
c.log.WithField("op", "broadcast").Error(r.Err) | |
continue | |
} | |
level-- | |
if level > 0 { // cache since level has not been reached yet | |
actives = append(actives, r.Value) | |
continue | |
} | |
if level == 0 { // consistency level has been reached | |
for _, x := range actives { | |
replicaCh <- x | |
} | |
} | |
replicaCh <- r.Value | |
} | |
if level > 0 { // abort: nothing has been sent to the caller | |
fs := logrus.Fields{"op": "broadcast", "active": len(actives), "total": len(replicas)} | |
c.log.WithFields(fs).Error("abort") | |
for _, node := range replicas { | |
c.Abort(ctx, node, c.Class, c.Shard, c.TxID) | |
} | |
} | |
}(level) | |
return replicaCh | |
} | |
// commitAll tells replicas to commit pending updates related to a specific request | |
// (second phase of a two-phase commit) | |
func (c *coordinator[T]) commitAll(ctx context.Context, | |
replicaCh <-chan string, | |
op commitOp[T], | |
) <-chan _Result[T] { | |
replyCh := make(chan _Result[T], cap(replicaCh)) | |
go func() { // tells active replicas to commit | |
wg := sync.WaitGroup{} | |
for replica := range replicaCh { | |
wg.Add(1) | |
go func(replica string) { | |
defer wg.Done() | |
resp, err := op(ctx, replica, c.TxID) | |
replyCh <- _Result[T]{resp, err} | |
}(replica) | |
} | |
wg.Wait() | |
close(replyCh) | |
}() | |
return replyCh | |
} | |
// Push pushes updates to all replicas of a specific shard | |
func (c *coordinator[T]) Push(ctx context.Context, | |
cl ConsistencyLevel, | |
ask readyOp, | |
com commitOp[T], | |
) (<-chan _Result[T], int, error) { | |
state, err := c.Resolver.State(c.Shard, cl, "") | |
if err != nil { | |
return nil, 0, fmt.Errorf("%w : class %q shard %q", err, c.Class, c.Shard) | |
} | |
level := state.Level | |
nodeCh := c.broadcast(ctx, state.Hosts, ask, level) | |
return c.commitAll(context.Background(), nodeCh, com), level, nil | |
} | |
// Pull data from replica depending on consistency level | |
// Pull involves just as many replicas to satisfy the consistency level. | |
// | |
// directCandidate when specified a direct request is set to this node (default to this node) | |
func (c *coordinator[T]) Pull(ctx context.Context, | |
cl ConsistencyLevel, | |
op readOp[T], directCandidate string, | |
) (<-chan _Result[T], rState, error) { | |
state, err := c.Resolver.State(c.Shard, cl, directCandidate) | |
if err != nil { | |
return nil, state, fmt.Errorf("%w : class %q shard %q", err, c.Class, c.Shard) | |
} | |
level := state.Level | |
replyCh := make(chan _Result[T], level) | |
candidates := state.Hosts[:level] // direct ones | |
candidatePool := make(chan string, len(state.Hosts)-level) // remaining ones | |
for _, replica := range state.Hosts[level:] { | |
candidatePool <- replica | |
} | |
close(candidatePool) // pool is ready | |
go func() { | |
wg := sync.WaitGroup{} | |
wg.Add(len(candidates)) | |
for i := range candidates { // Ask direct candidate first | |
go func(idx int) { | |
defer wg.Done() | |
resp, err := op(ctx, candidates[idx], idx == 0) | |
// If node is not responding delegate request to another node | |
for err != nil { | |
if delegate, ok := <-candidatePool; ok { | |
resp, err = op(ctx, delegate, idx == 0) | |
} else { | |
break | |
} | |
} | |
replyCh <- _Result[T]{resp, err} | |
}(i) | |
} | |
wg.Wait() | |
close(replyCh) | |
}() | |
return replyCh, state, nil | |
} | |