KevinStephenson
Adding in weaviate code
b110593
raw
history blame
7.78 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package cluster
import (
"fmt"
"testing"
"time"
"github.com/hashicorp/memberlist"
"github.com/pkg/errors"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
)
func TestDiskSpaceMarshal(t *testing.T) {
for _, name := range []string{"", "host-12:1", "2", "00", "-jhd"} {
want := spaceMsg{
header{
ProtoVersion: uint8(1),
OpCode: _OpCode(2),
},
DiskUsage{
Total: 256,
Available: 3,
},
uint8(len(name)),
name,
}
bytes, err := want.marshal()
assert.Nil(t, err)
got := spaceMsg{}
err = got.unmarshal(bytes)
assert.Nil(t, err)
assert.Equal(t, want, got)
}
// simulate old version
x := spaceMsg{
header{
ProtoVersion: uint8(1),
OpCode: _OpCode(2),
},
DiskUsage{
Total: 256,
Available: 3,
},
uint8('0'),
"123",
}
bytes, err := x.marshal()
want := x
want.NodeLen = 4
want.Node = "0123"
assert.Nil(t, err)
got := spaceMsg{}
err = got.unmarshal(bytes)
assert.Nil(t, err)
assert.Equal(t, want, got)
}
func TestDelegateGetSet(t *testing.T) {
logger, _ := test.NewNullLogger()
now := time.Now().UnixMilli() - 1
st := State{
delegate: delegate{
Name: "ABC",
dataPath: ".",
log: logger,
Cache: make(map[string]NodeInfo, 32),
},
}
st.delegate.NotifyMsg(nil)
st.delegate.GetBroadcasts(0, 0)
st.delegate.NodeMeta(0)
spaces := make([]spaceMsg, 32)
for i := range spaces {
node := fmt.Sprintf("N-%d", i+1)
spaces[i] = spaceMsg{
header: header{
OpCode: _OpCodeDisk,
ProtoVersion: _ProtoVersion + 2,
},
DiskUsage: DiskUsage{
uint64(i + 1),
uint64(i),
},
Node: node,
NodeLen: uint8(len(node)),
}
}
done := make(chan struct{})
go func() {
for _, x := range spaces {
bytes, _ := x.marshal()
st.delegate.MergeRemoteState(bytes, false)
}
done <- struct{}{}
}()
_, ok := st.delegate.get("X")
assert.False(t, ok)
for _, x := range spaces {
space, ok := st.NodeInfo(x.Node)
if ok {
assert.Equal(t, x.DiskUsage, space.DiskUsage)
}
}
<-done
for _, x := range spaces {
info, ok := st.NodeInfo(x.Node)
assert.Greater(t, info.LastTimeMilli, now)
want := NodeInfo{x.DiskUsage, info.LastTimeMilli}
assert.Equal(t, want, info)
assert.True(t, ok)
st.delegate.delete(x.Node)
}
assert.Empty(t, st.delegate.Cache)
st.delegate.init(diskSpace)
assert.Equal(t, 1, len(st.delegate.Cache))
st.delegate.MergeRemoteState(st.delegate.LocalState(false), false)
space, ok := st.NodeInfo(st.delegate.Name)
assert.True(t, ok)
assert.Greater(t, space.Total, space.Available)
}
func TestDelegateMergeRemoteState(t *testing.T) {
logger, _ := test.NewNullLogger()
var (
node = "N1"
d = delegate{
Name: node,
dataPath: ".",
log: logger,
Cache: make(map[string]NodeInfo, 32),
}
x = spaceMsg{
header{
OpCode: _OpCodeDisk,
ProtoVersion: _ProtoVersion,
},
DiskUsage{2, 1},
uint8(len(node)),
node,
}
)
// valid operation payload
bytes, err := x.marshal()
assert.Nil(t, err)
d.MergeRemoteState(bytes, false)
_, ok := d.get(node)
assert.True(t, ok)
node = "N2"
// invalid payload => expect marshalling error
d.MergeRemoteState(bytes[:4], false)
assert.Nil(t, err)
_, ok = d.get(node)
assert.False(t, ok)
// valid payload but operation is not supported
node = "N2"
x.header.OpCode = _OpCodeDisk + 2
bytes, err = x.marshal()
d.MergeRemoteState(bytes, false)
assert.Nil(t, err)
_, ok = d.get(node)
assert.False(t, ok)
}
func TestDelegateSort(t *testing.T) {
now := time.Now().UnixMilli()
GB := uint64(1) << 30
delegate := delegate{
Name: "ABC",
dataPath: ".",
Cache: make(map[string]NodeInfo, 32),
}
delegate.set("N1", NodeInfo{DiskUsage{Available: GB}, now})
delegate.set("N2", NodeInfo{DiskUsage{Available: 3 * GB}, now})
delegate.set("N3", NodeInfo{DiskUsage{Available: 2 * GB}, now})
delegate.set("N4", NodeInfo{DiskUsage{Available: 4 * GB}, now})
got := delegate.sortCandidates([]string{"N1", "N0", "N2", "N4", "N3"})
assert.Equal(t, []string{"N4", "N2", "N3", "N1", "N0"}, got)
delegate.set("N1", NodeInfo{DiskUsage{Available: GB - 10}, now})
// insert equivalent nodes "N2" and "N3"
delegate.set("N2", NodeInfo{DiskUsage{Available: GB + 128}, now})
delegate.set("N3", NodeInfo{DiskUsage{Available: GB + 512}, now})
// one block more
delegate.set("N4", NodeInfo{DiskUsage{Available: GB + 4096}, now})
got = delegate.sortCandidates([]string{"N1", "N0", "N2", "N3", "N4"})
if got[1] == "N2" {
assert.Equal(t, []string{"N4", "N2", "N3", "N1", "N0"}, got)
} else {
assert.Equal(t, []string{"N4", "N3", "N2", "N1", "N0"}, got)
}
}
func TestDelegateCleanUp(t *testing.T) {
st := State{
delegate: delegate{
Name: "N0",
dataPath: ".",
},
}
diskSpace := func(path string) (DiskUsage, error) {
return DiskUsage{100, 50}, nil
}
st.delegate.init(diskSpace)
_, ok := st.delegate.get("N0")
assert.True(t, ok, "N0 must exist")
st.delegate.set("N1", NodeInfo{LastTimeMilli: 1})
st.delegate.set("N2", NodeInfo{LastTimeMilli: 2})
handler := events{&st.delegate}
handler.NotifyJoin(nil)
handler.NotifyUpdate(nil)
handler.NotifyLeave(&memberlist.Node{Name: "N0"})
handler.NotifyLeave(&memberlist.Node{Name: "N1"})
handler.NotifyLeave(&memberlist.Node{Name: "N2"})
assert.Empty(t, st.delegate.Cache)
}
func TestDelegateLocalState(t *testing.T) {
now := time.Now().UnixMilli() - 1
errAny := errors.New("any error")
logger, _ := test.NewNullLogger()
t.Run("FirstError", func(t *testing.T) {
d := delegate{
Name: "N0",
dataPath: ".",
log: logger,
Cache: map[string]NodeInfo{},
}
du := func(path string) (DiskUsage, error) { return DiskUsage{}, errAny }
d.init(du)
// error reading disk space
d.LocalState(true)
assert.Len(t, d.Cache, 1)
})
t.Run("Success", func(t *testing.T) {
d := delegate{
Name: "N0",
dataPath: ".",
log: logger,
Cache: map[string]NodeInfo{},
}
du := func(path string) (DiskUsage, error) { return DiskUsage{5, 1}, nil }
d.init(du)
// successful case
d.LocalState(true)
got, ok := d.get("N0")
assert.True(t, ok)
assert.Greater(t, got.LastTimeMilli, now)
assert.Equal(t, DiskUsage{5, 1}, got.DiskUsage)
})
}
func TestDelegateUpdater(t *testing.T) {
logger, _ := test.NewNullLogger()
now := time.Now().UnixMilli() - 1
d := delegate{
Name: "N0",
dataPath: ".",
log: logger,
Cache: map[string]NodeInfo{},
}
err := d.init(nil)
assert.NotNil(t, err)
doneCh := make(chan bool)
nCalls := uint64(0)
du := func(path string) (DiskUsage, error) {
nCalls++
if nCalls == 1 || nCalls == 3 {
return DiskUsage{2 * nCalls, nCalls}, nil
}
if nCalls == 2 {
return DiskUsage{}, fmt.Errorf("any")
}
if nCalls == 4 {
close(doneCh)
}
return DiskUsage{}, fmt.Errorf("any")
}
go d.updater(time.Millisecond, 5*time.Millisecond, du)
<-doneCh
// error reading disk space
d.LocalState(true)
got, ok := d.get("N0")
assert.True(t, ok)
assert.Greater(t, got.LastTimeMilli, now)
assert.Equal(t, DiskUsage{3 * 2, 3}, got.DiskUsage)
}