Spaces:
Running
Running
// _ _ | |
// __ _____ __ ___ ___ __ _| |_ ___ | |
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
// \ V V / __/ (_| |\ V /| | (_| | || __/ | |
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
// | |
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. | |
// | |
// CONTACT: [email protected] | |
// | |
package cluster | |
import ( | |
"bytes" | |
"encoding/binary" | |
"fmt" | |
"sort" | |
"sync" | |
"time" | |
"github.com/hashicorp/memberlist" | |
"github.com/sirupsen/logrus" | |
) | |
// _OpCode represents the type of supported operation | |
type _OpCode uint8 | |
const ( | |
// _ProtoVersion internal protocol version for exchanging messages | |
_ProtoVersion uint8 = 1 | |
// _OpCodeDisk operation code for getting disk space | |
_OpCodeDisk _OpCode = 1 | |
// _ProtoTTL used to decide when to update the cache | |
_ProtoTTL = time.Second * 8 | |
) | |
// spaceMsg is used to notify other nodes about current disk usage | |
type spaceMsg struct { | |
header | |
DiskUsage | |
NodeLen uint8 // = len(Node) is required to marshal Node | |
Node string // node space | |
} | |
// header of an operation | |
type header struct { | |
// OpCode operation code | |
OpCode _OpCode | |
// ProtoVersion protocol we will speak | |
ProtoVersion uint8 | |
} | |
// DiskUsage contains total and available space in B | |
type DiskUsage struct { | |
// Total disk space | |
Total uint64 | |
// Total available space | |
Available uint64 | |
} | |
// NodeInfo disk space | |
type NodeInfo struct { | |
DiskUsage | |
LastTimeMilli int64 // last update time in milliseconds | |
} | |
func (d *spaceMsg) marshal() (data []byte, err error) { | |
buf := bytes.NewBuffer(make([]byte, 0, 24+len(d.Node))) | |
if err := binary.Write(buf, binary.BigEndian, d.header); err != nil { | |
return nil, err | |
} | |
if err := binary.Write(buf, binary.BigEndian, d.DiskUsage); err != nil { | |
return nil, err | |
} | |
// code node name starting by its length | |
if err := buf.WriteByte(d.NodeLen); err != nil { | |
return nil, err | |
} | |
_, err = buf.Write([]byte(d.Node)) | |
return buf.Bytes(), err | |
} | |
func (d *spaceMsg) unmarshal(data []byte) (err error) { | |
rd := bytes.NewReader(data) | |
if err = binary.Read(rd, binary.BigEndian, &d.header); err != nil { | |
return | |
} | |
if err = binary.Read(rd, binary.BigEndian, &d.DiskUsage); err != nil { | |
return | |
} | |
// decode node name start by its length | |
if d.NodeLen, err = rd.ReadByte(); err != nil { | |
return | |
} | |
begin := len(data) - rd.Len() | |
end := begin + int(d.NodeLen) | |
// make sure this version is backward compatible | |
if _ProtoVersion <= 1 && begin+int(d.NodeLen) != len(data) { | |
begin-- // since previous version doesn't encode the length | |
end = len(data) | |
d.NodeLen = uint8(end - begin) | |
} | |
d.Node = string(data[begin:end]) | |
return nil | |
} | |
// delegate implements the memberList delegate interface | |
type delegate struct { | |
Name string | |
dataPath string | |
log logrus.FieldLogger | |
sync.Mutex | |
Cache map[string]NodeInfo | |
mutex sync.Mutex | |
hostInfo NodeInfo | |
} | |
func (d *delegate) setOwnSpace(x DiskUsage) { | |
d.mutex.Lock() | |
d.hostInfo = NodeInfo{DiskUsage: x, LastTimeMilli: time.Now().UnixMilli()} | |
d.mutex.Unlock() | |
} | |
func (d *delegate) ownInfo() NodeInfo { | |
d.mutex.Lock() | |
defer d.mutex.Unlock() | |
return d.hostInfo | |
} | |
// init must be called first to initialize the cache | |
func (d *delegate) init(diskSpace func(path string) (DiskUsage, error)) error { | |
d.Cache = make(map[string]NodeInfo, 32) | |
if diskSpace == nil { | |
return fmt.Errorf("function calculating disk space cannot be empty") | |
} | |
space, err := diskSpace(d.dataPath) | |
if err != nil { | |
return fmt.Errorf("disk_space: %w", err) | |
} | |
d.setOwnSpace(space) | |
d.set(d.Name, NodeInfo{space, time.Now().UnixMilli()}) // cache | |
// delegate remains alive throughout the entire program. | |
go d.updater(_ProtoTTL, time.Second+_ProtoTTL/3, diskSpace) | |
return nil | |
} | |
// NodeMeta is used to retrieve meta-data about the current node | |
// when broadcasting an alive message. It's length is limited to | |
// the given byte size. This metadata is available in the Node structure. | |
func (d *delegate) NodeMeta(limit int) (meta []byte) { | |
return nil | |
} | |
// LocalState is used for a TCP Push/Pull. This is sent to | |
// the remote side in addition to the membership information. Any | |
// data can be sent here. See MergeRemoteState as well. The `join` | |
// boolean indicates this is for a join instead of a push/pull. | |
func (d *delegate) LocalState(join bool) []byte { | |
var ( | |
info = d.ownInfo() | |
err error | |
) | |
d.set(d.Name, info) // cache new value | |
x := spaceMsg{ | |
header{ | |
OpCode: _OpCodeDisk, | |
ProtoVersion: _ProtoVersion, | |
}, | |
info.DiskUsage, | |
uint8(len(d.Name)), | |
d.Name, | |
} | |
bytes, err := x.marshal() | |
if err != nil { | |
d.log.WithField("action", "delegate.local_state.marshal").Error(err) | |
return nil | |
} | |
return bytes | |
} | |
// MergeRemoteState is invoked after a TCP Push/Pull. This is the | |
// state received from the remote side and is the result of the | |
// remote side's LocalState call. The 'join' | |
// boolean indicates this is for a join instead of a push/pull. | |
func (d *delegate) MergeRemoteState(data []byte, join bool) { | |
// Does operation match _OpCodeDisk | |
if _OpCode(data[0]) != _OpCodeDisk { | |
return | |
} | |
var x spaceMsg | |
if err := x.unmarshal(data); err != nil || x.Node == "" { | |
d.log.WithField("action", "delegate.merge_remote.unmarshal"). | |
WithField("data", string(data)).Error(err) | |
return | |
} | |
info := NodeInfo{x.DiskUsage, time.Now().UnixMilli()} | |
d.set(x.Node, info) | |
} | |
func (d *delegate) NotifyMsg(data []byte) {} | |
func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte { | |
return nil | |
} | |
// get returns info about about a specific node in the cluster | |
func (d *delegate) get(node string) (NodeInfo, bool) { | |
d.Lock() | |
defer d.Unlock() | |
x, ok := d.Cache[node] | |
return x, ok | |
} | |
func (d *delegate) set(node string, x NodeInfo) { | |
d.Lock() | |
defer d.Unlock() | |
d.Cache[node] = x | |
} | |
// delete key from the cache | |
func (d *delegate) delete(node string) { | |
d.Lock() | |
defer d.Unlock() | |
delete(d.Cache, node) | |
} | |
// sortCandidates by the amount of free space in descending order | |
// | |
// Two nodes are considered equivalent if the difference between their | |
// free spaces is less than 4KB. | |
// The free space is just an rough estimate of the actual amount. | |
// The Lower bound 4KB helps to mitigate the risk of selecting same set of nodes | |
// when selections happens concurrently on different initiator nodes. | |
func (d *delegate) sortCandidates(names []string) []string { | |
d.Lock() | |
defer d.Unlock() | |
m := d.Cache | |
sort.Slice(names, func(i, j int) bool { | |
return (m[names[j]].Available >> 12) < (m[names[i]].Available >> 12) | |
}) | |
return names | |
} | |
// updater a function which updates node information periodically | |
func (d *delegate) updater(period, minPeriod time.Duration, du func(path string) (DiskUsage, error)) { | |
t := time.NewTicker(period) | |
defer t.Stop() | |
curTime := time.Now() | |
for range t.C { | |
if time.Since(curTime) < minPeriod { // too short | |
continue // wait for next cycle to avoid overwhelming the disk | |
} | |
space, err := du(d.dataPath) | |
if err != nil { | |
d.log.WithField("action", "delegate.local_state.disk_usage").Error(err) | |
} else { | |
d.setOwnSpace(space) | |
} | |
curTime = time.Now() | |
} | |
} | |
// events implement memberlist.EventDelegate interface | |
// EventDelegate is a simpler delegate that is used only to receive | |
// notifications about members joining and leaving. The methods in this | |
// delegate may be called by multiple goroutines, but never concurrently. | |
// This allows you to reason about ordering. | |
type events struct { | |
d *delegate | |
} | |
// NotifyJoin is invoked when a node is detected to have joined. | |
// The Node argument must not be modified. | |
func (e events) NotifyJoin(*memberlist.Node) {} | |
// NotifyLeave is invoked when a node is detected to have left. | |
// The Node argument must not be modified. | |
func (e events) NotifyLeave(node *memberlist.Node) { | |
e.d.delete(node.Name) | |
} | |
// NotifyUpdate is invoked when a node is detected to have | |
// updated, usually involving the meta data. The Node argument | |
// must not be modified. | |
func (e events) NotifyUpdate(*memberlist.Node) {} | |