Spaces:
Running
Running
File size: 2,956 Bytes
b110593 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package replica
import (
"fmt"
"github.com/pkg/errors"
)
// ConsistencyLevel is an enum of all possible consistency level
type ConsistencyLevel string
const (
One ConsistencyLevel = "ONE"
Quorum ConsistencyLevel = "QUORUM"
All ConsistencyLevel = "ALL"
)
// cLevel returns min number of replicas to fulfill the consistency level
func cLevel(l ConsistencyLevel, n int) int {
switch l {
case All:
return n
case Quorum:
return n/2 + 1
default:
return 1
}
}
var (
errNoReplicaFound = errors.New("no replica found")
errUnresolvedName = errors.New("unresolved node name")
)
// resolver finds replicas and resolves theirs names
type resolver struct {
Schema shardingState
nodeResolver
Class string
NodeName string
}
// State returns replicas state
func (r *resolver) State(shardName string, cl ConsistencyLevel, directCandidate string) (res rState, err error) {
res.CLevel = cl
m, err := r.Schema.ResolveParentNodes(r.Class, shardName)
if err != nil {
return res, err
}
res.NodeMap = m
// count number of valid addr
n := 0
for name, addr := range m {
if name != "" && addr != "" {
n++
}
}
res.Hosts = make([]string, 0, n)
// We must hold the data if candidate is specified hence it must exist
// if specified the direct candidate is always at index 0
if directCandidate == "" {
directCandidate = r.NodeName
}
// This node should be the first to respond in case if the shard is locally available
if addr := m[directCandidate]; addr != "" {
res.Hosts = append(res.Hosts, addr)
}
for name, addr := range m {
if name != "" && addr != "" && name != directCandidate {
res.Hosts = append(res.Hosts, addr)
}
}
if res.Len() == 0 {
return res, errNoReplicaFound
}
res.Level, err = res.ConsistencyLevel(cl)
return res, err
}
// rState replicas state
type rState struct {
CLevel ConsistencyLevel
Level int
Hosts []string // successfully resolved names
NodeMap map[string]string
}
// Len returns the number of replicas
func (r *rState) Len() int {
return len(r.NodeMap)
}
// ConsistencyLevel returns consistency level if it is satisfied
func (r *rState) ConsistencyLevel(l ConsistencyLevel) (int, error) {
level := cLevel(l, r.Len())
if n := len(r.Hosts); level > n {
nodes := []string{}
for k, addr := range r.NodeMap {
if addr == "" {
nodes = append(nodes, k)
}
}
return 0, fmt.Errorf("consistency level (%d) > available replicas(%d): %w :%v",
level, n, errUnresolvedName, nodes)
}
return level, nil
}
|