KevinStephenson
Adding in weaviate code
b110593
raw
history blame
5.67 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package cluster
import (
"fmt"
"net"
"strings"
"github.com/hashicorp/memberlist"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
type State struct {
config Config
list *memberlist.Memberlist
delegate delegate
}
type Config struct {
Hostname string `json:"hostname" yaml:"hostname"`
GossipBindPort int `json:"gossipBindPort" yaml:"gossipBindPort"`
DataBindPort int `json:"dataBindPort" yaml:"dataBindPort"`
Join string `json:"join" yaml:"join"`
IgnoreStartupSchemaSync bool `json:"ignoreStartupSchemaSync" yaml:"ignoreStartupSchemaSync"`
SkipSchemaSyncRepair bool `json:"skipSchemaSyncRepair" yaml:"skipSchemaSyncRepair"`
AuthConfig AuthConfig `json:"auth" yaml:"auth"`
}
type AuthConfig struct {
BasicAuth BasicAuth `json:"basic" yaml:"basic"`
}
type BasicAuth struct {
Username string `json:"username" yaml:"username"`
Password string `json:"password" yaml:"password"`
}
func (ba BasicAuth) Enabled() bool {
return ba.Username != "" || ba.Password != ""
}
func Init(userConfig Config, dataPath string, logger logrus.FieldLogger) (_ *State, err error) {
cfg := memberlist.DefaultLANConfig()
cfg.LogOutput = newLogParser(logger)
if userConfig.Hostname != "" {
cfg.Name = userConfig.Hostname
}
state := State{
config: userConfig,
delegate: delegate{
Name: cfg.Name,
dataPath: dataPath,
log: logger,
},
}
if err := state.delegate.init(diskSpace); err != nil {
logger.WithField("action", "init_state.delete_init").Error(err)
}
cfg.Delegate = &state.delegate
cfg.Events = events{&state.delegate}
if userConfig.GossipBindPort != 0 {
cfg.BindPort = userConfig.GossipBindPort
}
if state.list, err = memberlist.Create(cfg); err != nil {
logger.WithField("action", "memberlist_init").
WithField("hostname", userConfig.Hostname).
WithField("bind_port", userConfig.GossipBindPort).
WithError(err).
Error("memberlist not created")
return nil, errors.Wrap(err, "create member list")
}
var joinAddr []string
if userConfig.Join != "" {
joinAddr = strings.Split(userConfig.Join, ",")
}
if len(joinAddr) > 0 {
_, err := net.LookupIP(strings.Split(joinAddr[0], ":")[0])
if err != nil {
logger.WithField("action", "cluster_attempt_join").
WithField("remote_hostname", joinAddr[0]).
WithError(err).
Warn("specified hostname to join cluster cannot be resolved. This is fine" +
"if this is the first node of a new cluster, but problematic otherwise.")
} else {
_, err := state.list.Join(joinAddr)
if err != nil {
logger.WithField("action", "memberlist_init").
WithField("remote_hostname", joinAddr).
WithError(err).
Error("memberlist join not successful")
return nil, errors.Wrap(err, "join cluster")
}
}
}
return &state, nil
}
// Hostnames for all live members, except self. Use AllHostnames to include
// self, prefixes the data port.
func (s *State) Hostnames() []string {
mem := s.list.Members()
out := make([]string, len(mem))
i := 0
for _, m := range mem {
if m.Name == s.list.LocalNode().Name {
continue
}
// TODO: how can we find out the actual data port as opposed to relying on
// the convention that it's 1 higher than the gossip port
out[i] = fmt.Sprintf("%s:%d", m.Addr.String(), m.Port+1)
i++
}
return out[:i]
}
// AllHostnames for live members, including self.
func (s *State) AllHostnames() []string {
mem := s.list.Members()
out := make([]string, len(mem))
for i, m := range mem {
// TODO: how can we find out the actual data port as opposed to relying on
// the convention that it's 1 higher than the gossip port
out[i] = fmt.Sprintf("%s:%d", m.Addr.String(), m.Port+1)
}
return out
}
// All node names (not their hostnames!) for live members, including self.
func (s *State) AllNames() []string {
mem := s.list.Members()
out := make([]string, len(mem))
for i, m := range mem {
out[i] = m.Name
}
return out
}
// Candidates returns list of nodes (names) sorted by the
// free amount of disk space in descending order
func (s *State) Candidates() []string {
return s.delegate.sortCandidates(s.AllNames())
}
// All node names (not their hostnames!) for live members, including self.
func (s *State) NodeCount() int {
return s.list.NumMembers()
}
func (s *State) LocalName() string {
return s.list.LocalNode().Name
}
func (s *State) ClusterHealthScore() int {
return s.list.GetHealthScore()
}
func (s *State) NodeHostname(nodeName string) (string, bool) {
for _, mem := range s.list.Members() {
if mem.Name == nodeName {
// TODO: how can we find out the actual data port as opposed to relying on
// the convention that it's 1 higher than the gossip port
return fmt.Sprintf("%s:%d", mem.Addr.String(), mem.Port+1), true
}
}
return "", false
}
func (s *State) SchemaSyncIgnored() bool {
return s.config.IgnoreStartupSchemaSync
}
func (s *State) SkipSchemaRepair() bool {
return s.config.SkipSchemaSyncRepair
}
func (s *State) NodeInfo(node string) (NodeInfo, bool) {
return s.delegate.get(node)
}