Spaces:
				
			
			
	
			
			
		Sleeping
		
	
	
	
			
			
	
	
	
	
		
		
		Sleeping
		
	File size: 2,956 Bytes
			
			b110593  | 
								1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122  | 
								//                           _       _
// __      _____  __ ___   ___  __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
//  \ V  V /  __/ (_| |\ V /| | (_| | ||  __/
//   \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
//  Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
//  CONTACT: [email protected]
//
package replica
import (
	"fmt"
	"github.com/pkg/errors"
)
// ConsistencyLevel is an enum of all possible consistency level
type ConsistencyLevel string
const (
	One    ConsistencyLevel = "ONE"
	Quorum ConsistencyLevel = "QUORUM"
	All    ConsistencyLevel = "ALL"
)
// cLevel returns min number of replicas to fulfill the consistency level
func cLevel(l ConsistencyLevel, n int) int {
	switch l {
	case All:
		return n
	case Quorum:
		return n/2 + 1
	default:
		return 1
	}
}
var (
	errNoReplicaFound = errors.New("no replica found")
	errUnresolvedName = errors.New("unresolved node name")
)
// resolver finds replicas and resolves theirs names
type resolver struct {
	Schema shardingState
	nodeResolver
	Class    string
	NodeName string
}
// State returns replicas state
func (r *resolver) State(shardName string, cl ConsistencyLevel, directCandidate string) (res rState, err error) {
	res.CLevel = cl
	m, err := r.Schema.ResolveParentNodes(r.Class, shardName)
	if err != nil {
		return res, err
	}
	res.NodeMap = m
	// count number of valid addr
	n := 0
	for name, addr := range m {
		if name != "" && addr != "" {
			n++
		}
	}
	res.Hosts = make([]string, 0, n)
	// We must hold the data if candidate is specified hence it must exist
	// if specified the direct candidate is always at index 0
	if directCandidate == "" {
		directCandidate = r.NodeName
	}
	// This node should be the first to respond in case if the shard is locally available
	if addr := m[directCandidate]; addr != "" {
		res.Hosts = append(res.Hosts, addr)
	}
	for name, addr := range m {
		if name != "" && addr != "" && name != directCandidate {
			res.Hosts = append(res.Hosts, addr)
		}
	}
	if res.Len() == 0 {
		return res, errNoReplicaFound
	}
	res.Level, err = res.ConsistencyLevel(cl)
	return res, err
}
// rState replicas state
type rState struct {
	CLevel  ConsistencyLevel
	Level   int
	Hosts   []string // successfully resolved names
	NodeMap map[string]string
}
// Len returns the number of replicas
func (r *rState) Len() int {
	return len(r.NodeMap)
}
// ConsistencyLevel returns consistency level if it is satisfied
func (r *rState) ConsistencyLevel(l ConsistencyLevel) (int, error) {
	level := cLevel(l, r.Len())
	if n := len(r.Hosts); level > n {
		nodes := []string{}
		for k, addr := range r.NodeMap {
			if addr == "" {
				nodes = append(nodes, k)
			}
		}
		return 0, fmt.Errorf("consistency level (%d) > available replicas(%d): %w :%v",
			level, n, errUnresolvedName, nodes)
	}
	return level, nil
}
 |