SemanticSearchPOC / usecases /cluster /ideal_node_list.go
KevinStephenson
Adding in weaviate code
b110593
raw
history blame
2.09 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package cluster
import (
"fmt"
"sort"
"strings"
"sync"
"time"
)
type IdealClusterState struct {
memberNames []string
currentState MemberLister
lock sync.Mutex
}
func NewIdealClusterState(s MemberLister) *IdealClusterState {
ics := &IdealClusterState{currentState: s}
go ics.startPolling()
return ics
}
// Validate returns an error if the actual state does not match the assumed
// ideal state, e.g. because a node has died, or left unexpectedly.
func (ics *IdealClusterState) Validate() error {
ics.lock.Lock()
defer ics.lock.Unlock()
actual := map[string]struct{}{}
for _, name := range ics.currentState.AllNames() {
actual[name] = struct{}{}
}
var missing []string
for _, name := range ics.memberNames {
if _, ok := actual[name]; !ok {
missing = append(missing, name)
}
}
if len(missing) > 0 {
return fmt.Errorf("node(s) %s unhealthy or unavailable",
strings.Join(missing, ", "))
}
return nil
}
func (ics *IdealClusterState) Members() []string {
ics.lock.Lock()
defer ics.lock.Unlock()
return ics.memberNames
}
func (ics *IdealClusterState) startPolling() {
t := time.NewTicker(1 * time.Second)
for {
<-t.C
current := ics.currentState.AllNames()
ics.extendList(current)
}
}
func (ics *IdealClusterState) extendList(current []string) {
ics.lock.Lock()
defer ics.lock.Unlock()
var unknown []string
known := map[string]struct{}{}
for _, name := range ics.memberNames {
known[name] = struct{}{}
}
for _, name := range current {
if _, ok := known[name]; !ok {
unknown = append(unknown, name)
}
}
ics.memberNames = append(ics.memberNames, unknown...)
sort.Strings(ics.memberNames)
}