KevinStephenson
Adding in weaviate code
b110593
raw
history blame
2.96 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package replica
import (
"fmt"
"github.com/pkg/errors"
)
// ConsistencyLevel is an enum of all possible consistency level
type ConsistencyLevel string
const (
One ConsistencyLevel = "ONE"
Quorum ConsistencyLevel = "QUORUM"
All ConsistencyLevel = "ALL"
)
// cLevel returns min number of replicas to fulfill the consistency level
func cLevel(l ConsistencyLevel, n int) int {
switch l {
case All:
return n
case Quorum:
return n/2 + 1
default:
return 1
}
}
var (
errNoReplicaFound = errors.New("no replica found")
errUnresolvedName = errors.New("unresolved node name")
)
// resolver finds replicas and resolves theirs names
type resolver struct {
Schema shardingState
nodeResolver
Class string
NodeName string
}
// State returns replicas state
func (r *resolver) State(shardName string, cl ConsistencyLevel, directCandidate string) (res rState, err error) {
res.CLevel = cl
m, err := r.Schema.ResolveParentNodes(r.Class, shardName)
if err != nil {
return res, err
}
res.NodeMap = m
// count number of valid addr
n := 0
for name, addr := range m {
if name != "" && addr != "" {
n++
}
}
res.Hosts = make([]string, 0, n)
// We must hold the data if candidate is specified hence it must exist
// if specified the direct candidate is always at index 0
if directCandidate == "" {
directCandidate = r.NodeName
}
// This node should be the first to respond in case if the shard is locally available
if addr := m[directCandidate]; addr != "" {
res.Hosts = append(res.Hosts, addr)
}
for name, addr := range m {
if name != "" && addr != "" && name != directCandidate {
res.Hosts = append(res.Hosts, addr)
}
}
if res.Len() == 0 {
return res, errNoReplicaFound
}
res.Level, err = res.ConsistencyLevel(cl)
return res, err
}
// rState replicas state
type rState struct {
CLevel ConsistencyLevel
Level int
Hosts []string // successfully resolved names
NodeMap map[string]string
}
// Len returns the number of replicas
func (r *rState) Len() int {
return len(r.NodeMap)
}
// ConsistencyLevel returns consistency level if it is satisfied
func (r *rState) ConsistencyLevel(l ConsistencyLevel) (int, error) {
level := cLevel(l, r.Len())
if n := len(r.Hosts); level > n {
nodes := []string{}
for k, addr := range r.NodeMap {
if addr == "" {
nodes = append(nodes, k)
}
}
return 0, fmt.Errorf("consistency level (%d) > available replicas(%d): %w :%v",
level, n, errUnresolvedName, nodes)
}
return level, nil
}