File size: 2,956 Bytes
b110593
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
//                           _       _
// __      _____  __ ___   ___  __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
//  \ V  V /  __/ (_| |\ V /| | (_| | ||  __/
//   \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
//  Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
//  CONTACT: [email protected]
//

package replica

import (
	"fmt"

	"github.com/pkg/errors"
)

// ConsistencyLevel is an enum of all possible consistency level
type ConsistencyLevel string

const (
	One    ConsistencyLevel = "ONE"
	Quorum ConsistencyLevel = "QUORUM"
	All    ConsistencyLevel = "ALL"
)

// cLevel returns min number of replicas to fulfill the consistency level
func cLevel(l ConsistencyLevel, n int) int {
	switch l {
	case All:
		return n
	case Quorum:
		return n/2 + 1
	default:
		return 1
	}
}

var (
	errNoReplicaFound = errors.New("no replica found")
	errUnresolvedName = errors.New("unresolved node name")
)

// resolver finds replicas and resolves theirs names
type resolver struct {
	Schema shardingState
	nodeResolver
	Class    string
	NodeName string
}

// State returns replicas state
func (r *resolver) State(shardName string, cl ConsistencyLevel, directCandidate string) (res rState, err error) {
	res.CLevel = cl
	m, err := r.Schema.ResolveParentNodes(r.Class, shardName)
	if err != nil {
		return res, err
	}
	res.NodeMap = m
	// count number of valid addr
	n := 0
	for name, addr := range m {
		if name != "" && addr != "" {
			n++
		}
	}
	res.Hosts = make([]string, 0, n)

	// We must hold the data if candidate is specified hence it must exist
	// if specified the direct candidate is always at index 0
	if directCandidate == "" {
		directCandidate = r.NodeName
	}
	// This node should be the first to respond in case if the shard is locally available
	if addr := m[directCandidate]; addr != "" {
		res.Hosts = append(res.Hosts, addr)
	}
	for name, addr := range m {
		if name != "" && addr != "" && name != directCandidate {
			res.Hosts = append(res.Hosts, addr)
		}
	}

	if res.Len() == 0 {
		return res, errNoReplicaFound
	}

	res.Level, err = res.ConsistencyLevel(cl)
	return res, err
}

// rState replicas state
type rState struct {
	CLevel  ConsistencyLevel
	Level   int
	Hosts   []string // successfully resolved names
	NodeMap map[string]string
}

// Len returns the number of replicas
func (r *rState) Len() int {
	return len(r.NodeMap)
}

// ConsistencyLevel returns consistency level if it is satisfied
func (r *rState) ConsistencyLevel(l ConsistencyLevel) (int, error) {
	level := cLevel(l, r.Len())
	if n := len(r.Hosts); level > n {
		nodes := []string{}
		for k, addr := range r.NodeMap {
			if addr == "" {
				nodes = append(nodes, k)
			}
		}
		return 0, fmt.Errorf("consistency level (%d) > available replicas(%d): %w :%v",
			level, n, errUnresolvedName, nodes)
	}
	return level, nil
}