Spaces:
Running
Running
// _ _ | |
// __ _____ __ ___ ___ __ _| |_ ___ | |
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
// \ V V / __/ (_| |\ V /| | (_| | || __/ | |
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
// | |
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. | |
// | |
// CONTACT: [email protected] | |
// | |
package test | |
import ( | |
"context" | |
"fmt" | |
"testing" | |
"time" | |
"github.com/go-openapi/strfmt" | |
"github.com/stretchr/testify/assert" | |
"github.com/stretchr/testify/require" | |
"github.com/weaviate/weaviate/client/batch" | |
"github.com/weaviate/weaviate/client/meta" | |
"github.com/weaviate/weaviate/client/nodes" | |
"github.com/weaviate/weaviate/entities/models" | |
"github.com/weaviate/weaviate/entities/verbosity" | |
"github.com/weaviate/weaviate/test/docker" | |
"github.com/weaviate/weaviate/test/helper" | |
"github.com/weaviate/weaviate/test/helper/sample-schema/books" | |
"github.com/weaviate/weaviate/test/helper/sample-schema/documents" | |
"github.com/weaviate/weaviate/test/helper/sample-schema/multishard" | |
) | |
func Test_NodesAPI(t *testing.T) { | |
t.Run("empty DB", func(t *testing.T) { | |
meta, err := helper.Client(t).Meta.MetaGet(meta.NewMetaGetParams(), nil) | |
require.Nil(t, err) | |
assert.NotNil(t, meta.GetPayload()) | |
assertions := func(t *testing.T, nodeStatus *models.NodeStatus) { | |
require.NotNil(t, nodeStatus) | |
assert.Equal(t, models.NodeStatusStatusHEALTHY, *nodeStatus.Status) | |
assert.True(t, len(nodeStatus.Name) > 0) | |
assert.True(t, nodeStatus.GitHash != "" && nodeStatus.GitHash != "unknown") | |
assert.Equal(t, meta.Payload.Version, nodeStatus.Version) | |
assert.Empty(t, nodeStatus.Shards) | |
require.NotNil(t, nodeStatus.Stats) | |
assert.Equal(t, int64(0), nodeStatus.Stats.ObjectCount) | |
assert.Equal(t, int64(0), nodeStatus.Stats.ShardCount) | |
} | |
testStatusResponse(t, assertions, nil, "") | |
}) | |
t.Run("DB with Books (1 class ,1 shard configuration, 1 node)", func(t *testing.T) { | |
booksClass := books.ClassContextionaryVectorizer() | |
helper.CreateClass(t, booksClass) | |
defer helper.DeleteClass(t, booksClass.Class) | |
for _, book := range books.Objects() { | |
helper.CreateObject(t, book) | |
helper.AssertGetObjectEventually(t, book.Class, book.ID) | |
} | |
minimalAssertions := func(t *testing.T, nodeStatus *models.NodeStatus) { | |
require.NotNil(t, nodeStatus) | |
assert.Equal(t, models.NodeStatusStatusHEALTHY, *nodeStatus.Status) | |
assert.True(t, len(nodeStatus.Name) > 0) | |
assert.True(t, nodeStatus.GitHash != "" && nodeStatus.GitHash != "unknown") | |
require.NotNil(t, nodeStatus.Stats) | |
assert.Equal(t, int64(3), nodeStatus.Stats.ObjectCount) | |
assert.Equal(t, int64(1), nodeStatus.Stats.ShardCount) | |
} | |
verboseAssertions := func(t *testing.T, nodeStatus *models.NodeStatus) { | |
require.Len(t, nodeStatus.Shards, 1) | |
shard := nodeStatus.Shards[0] | |
assert.True(t, len(shard.Name) > 0) | |
assert.Equal(t, booksClass.Class, shard.Class) | |
assert.Equal(t, int64(3), shard.ObjectCount) | |
} | |
testStatusResponse(t, minimalAssertions, verboseAssertions, "") | |
}) | |
t.Run("DB with MultiShard (1 class, 2 shards configuration, 1 node)", func(t *testing.T) { | |
multiShardClass := multishard.ClassContextionaryVectorizer() | |
helper.CreateClass(t, multiShardClass) | |
defer helper.DeleteClass(t, multiShardClass.Class) | |
for _, multiShard := range multishard.Objects() { | |
helper.CreateObject(t, multiShard) | |
helper.AssertGetObjectEventually(t, multiShard.Class, multiShard.ID) | |
} | |
minimalAssertions := func(t *testing.T, nodeStatus *models.NodeStatus) { | |
require.NotNil(t, nodeStatus) | |
assert.Equal(t, models.NodeStatusStatusHEALTHY, *nodeStatus.Status) | |
assert.True(t, len(nodeStatus.Name) > 0) | |
assert.True(t, nodeStatus.GitHash != "" && nodeStatus.GitHash != "unknown") | |
require.NotNil(t, nodeStatus.Stats) | |
assert.Equal(t, int64(3), nodeStatus.Stats.ObjectCount) | |
assert.Equal(t, int64(2), nodeStatus.Stats.ShardCount) | |
} | |
verboseAsssertions := func(t *testing.T, nodeStatus *models.NodeStatus) { | |
assert.Len(t, nodeStatus.Shards, 2) | |
for _, shard := range nodeStatus.Shards { | |
assert.True(t, len(shard.Name) > 0) | |
assert.Equal(t, multiShardClass.Class, shard.Class) | |
assert.GreaterOrEqual(t, shard.ObjectCount, int64(0)) | |
} | |
} | |
testStatusResponse(t, minimalAssertions, verboseAsssertions, "") | |
}) | |
t.Run("with class name: DB with Books and Documents, 1 shard, 1 node", func(t *testing.T) { | |
booksClass := books.ClassContextionaryVectorizer() | |
helper.CreateClass(t, booksClass) | |
defer helper.DeleteClass(t, booksClass.Class) | |
t.Run("insert and check books", func(t *testing.T) { | |
for _, book := range books.Objects() { | |
helper.CreateObject(t, book) | |
helper.AssertGetObjectEventually(t, book.Class, book.ID) | |
} | |
assertions := func(t *testing.T, nodeStatus *models.NodeStatus) { | |
require.NotNil(t, nodeStatus.Stats) | |
assert.Equal(t, int64(3), nodeStatus.Stats.ObjectCount) | |
assert.Equal(t, int64(1), nodeStatus.Stats.ShardCount) | |
} | |
testStatusResponse(t, assertions, nil, "") | |
}) | |
t.Run("insert and check documents", func(t *testing.T) { | |
docsClasses := documents.ClassesContextionaryVectorizer(false) | |
helper.CreateClass(t, docsClasses[0]) | |
helper.CreateClass(t, docsClasses[1]) | |
defer helper.DeleteClass(t, docsClasses[0].Class) | |
defer helper.DeleteClass(t, docsClasses[1].Class) | |
for _, doc := range documents.Objects() { | |
helper.CreateObject(t, doc) | |
helper.AssertGetObjectEventually(t, doc.Class, doc.ID) | |
} | |
docsClass := docsClasses[0] | |
minimalAssertions := func(t *testing.T, nodeStatus *models.NodeStatus) { | |
assert.Equal(t, models.NodeStatusStatusHEALTHY, *nodeStatus.Status) | |
assert.True(t, len(nodeStatus.Name) > 0) | |
assert.True(t, nodeStatus.GitHash != "" && nodeStatus.GitHash != "unknown") | |
require.NotNil(t, nodeStatus.Stats) | |
assert.Equal(t, int64(2), nodeStatus.Stats.ObjectCount) | |
assert.Equal(t, int64(1), nodeStatus.Stats.ShardCount) | |
} | |
verboseAssertions := func(t *testing.T, nodeStatus *models.NodeStatus) { | |
assert.Len(t, nodeStatus.Shards, 1) | |
shard := nodeStatus.Shards[0] | |
assert.True(t, len(shard.Name) > 0) | |
assert.Equal(t, docsClass.Class, shard.Class) | |
assert.Equal(t, int64(2), shard.ObjectCount) | |
} | |
testStatusResponse(t, minimalAssertions, verboseAssertions, docsClass.Class) | |
}) | |
}) | |
// This test prevents a regression of | |
// https://github.com/weaviate/weaviate/issues/2454 | |
t.Run("validate count with updates", func(t *testing.T) { | |
booksClass := books.ClassContextionaryVectorizer() | |
helper.CreateClass(t, booksClass) | |
defer helper.DeleteClass(t, booksClass.Class) | |
_, err := helper.BatchClient(t).BatchObjectsCreate( | |
batch.NewBatchObjectsCreateParams().WithBody(batch.BatchObjectsCreateBody{ | |
Objects: []*models.Object{ | |
{ | |
ID: strfmt.UUID("2D0D3E3B-54B2-48D4-BFE0-4BE2C060110E"), | |
Class: booksClass.Class, | |
Properties: map[string]interface{}{ | |
"title": "A book that changes", | |
"description": "First iteration", | |
}, | |
}, | |
}, | |
}), nil) | |
require.Nil(t, err) | |
// Note that this is the same ID as before, so this is an update!! | |
_, err = helper.BatchClient(t).BatchObjectsCreate( | |
batch.NewBatchObjectsCreateParams().WithBody(batch.BatchObjectsCreateBody{ | |
Objects: []*models.Object{ | |
{ | |
ID: strfmt.UUID("2D0D3E3B-54B2-48D4-BFE0-4BE2C060110E"), | |
Class: booksClass.Class, | |
Properties: map[string]interface{}{ | |
"title": "A book that changes", | |
"description": "A new (second) iteration", | |
}, | |
}, | |
}, | |
}), nil) | |
require.Nil(t, err) | |
assertions := func(t *testing.T, nodeStatus *models.NodeStatus) { | |
require.NotNil(t, nodeStatus.Stats) | |
assert.Equal(t, int64(1), nodeStatus.Stats.ObjectCount) | |
} | |
testStatusResponse(t, assertions, nil, "") | |
}) | |
} | |
func TestNodesApi_Compression_AsyncIndexing(t *testing.T) { | |
ctx := context.Background() | |
compose, err := docker.New(). | |
WithWeaviate(). | |
WithText2VecContextionary(). | |
WithWeaviateEnv("ASYNC_INDEXING", "true"). | |
Start(ctx) | |
require.NoError(t, err) | |
defer func() { | |
require.NoError(t, compose.Terminate(ctx)) | |
}() | |
defer helper.SetupClient(fmt.Sprintf("%s:%s", helper.ServerHost, helper.ServerPort)) | |
helper.SetupClient(compose.GetWeaviate().URI()) | |
t.Run("validate flat compression status", func(t *testing.T) { | |
booksClass := books.ClassContextionaryVectorizer() | |
booksClass.VectorIndexType = "flat" | |
booksClass.VectorIndexConfig = map[string]interface{}{ | |
"bq": map[string]interface{}{ | |
"enabled": true, | |
}, | |
} | |
helper.CreateClass(t, booksClass) | |
defer helper.DeleteClass(t, booksClass.Class) | |
t.Run("check compressed true", func(t *testing.T) { | |
verbose := "verbose" | |
params := nodes.NewNodesGetParams().WithOutput(&verbose) | |
resp, err := helper.Client(t).Nodes.NodesGet(params, nil) | |
require.Nil(t, err) | |
nodeStatusResp := resp.GetPayload() | |
require.NotNil(t, nodeStatusResp) | |
nodes := nodeStatusResp.Nodes | |
require.NotNil(t, nodes) | |
require.Len(t, nodes, 1) | |
nodeStatus := nodes[0] | |
require.NotNil(t, nodeStatus) | |
require.True(t, nodeStatus.Shards[0].Compressed) | |
}) | |
}) | |
t.Run("validate hnsw pq async compression", func(t *testing.T) { | |
booksClass := books.ClassContextionaryVectorizer() | |
booksClass.VectorIndexConfig = map[string]interface{}{ | |
"pq": map[string]interface{}{ | |
"trainingLimit": 256, | |
"enabled": true, | |
"segments": 1, | |
}, | |
} | |
helper.CreateClass(t, booksClass) | |
defer helper.DeleteClass(t, booksClass.Class) | |
t.Run("check compressed initially false", func(t *testing.T) { | |
verbose := "verbose" | |
params := nodes.NewNodesGetParams().WithOutput(&verbose) | |
resp, err := helper.Client(t).Nodes.NodesGet(params, nil) | |
require.Nil(t, err) | |
nodeStatusResp := resp.GetPayload() | |
require.NotNil(t, nodeStatusResp) | |
nodes := nodeStatusResp.Nodes | |
require.NotNil(t, nodes) | |
require.Len(t, nodes, 1) | |
nodeStatus := nodes[0] | |
require.NotNil(t, nodeStatus) | |
require.False(t, nodeStatus.Shards[0].Compressed) | |
}) | |
t.Run("load data for pq", func(t *testing.T) { | |
num := 1024 | |
objects := make([]*models.Object, num) | |
for i := 0; i < num; i++ { | |
objects[i] = &models.Object{ | |
Class: booksClass.Class, | |
Vector: []float32{float32(i % 32), float32(i), 3.0, 4.0}, | |
} | |
} | |
_, err := helper.BatchClient(t).BatchObjectsCreate( | |
batch.NewBatchObjectsCreateParams().WithBody(batch.BatchObjectsCreateBody{ | |
Objects: objects, | |
}, | |
), nil) | |
require.Nil(t, err) | |
}) | |
t.Run("check eventually compressed if async enabled", func(t *testing.T) { | |
checkThunk := func() interface{} { | |
verbose := "verbose" | |
params := nodes.NewNodesGetParams().WithOutput(&verbose) | |
resp, err := helper.Client(t).Nodes.NodesGet(params, nil) | |
require.Nil(t, err) | |
nodeStatusResp := resp.GetPayload() | |
require.NotNil(t, nodeStatusResp) | |
nodes := nodeStatusResp.Nodes | |
require.NotNil(t, nodes) | |
require.Len(t, nodes, 1) | |
nodeStatus := nodes[0] | |
require.NotNil(t, nodeStatus) | |
return nodeStatus.Shards[0].Compressed | |
} | |
helper.AssertEventuallyEqualWithFrequencyAndTimeout(t, true, checkThunk, 100*time.Millisecond, 10*time.Second) | |
}) | |
}) | |
} | |
func TestNodesApi_Compression_SyncIndexing(t *testing.T) { | |
t.Run("validate flat compression status", func(t *testing.T) { | |
booksClass := books.ClassContextionaryVectorizer() | |
booksClass.VectorIndexType = "flat" | |
booksClass.VectorIndexConfig = map[string]interface{}{ | |
"bq": map[string]interface{}{ | |
"enabled": true, | |
}, | |
} | |
helper.CreateClass(t, booksClass) | |
defer helper.DeleteClass(t, booksClass.Class) | |
t.Run("check compressed true", func(t *testing.T) { | |
verbose := "verbose" | |
params := nodes.NewNodesGetParams().WithOutput(&verbose) | |
resp, err := helper.Client(t).Nodes.NodesGet(params, nil) | |
require.Nil(t, err) | |
nodeStatusResp := resp.GetPayload() | |
require.NotNil(t, nodeStatusResp) | |
nodes := nodeStatusResp.Nodes | |
require.NotNil(t, nodes) | |
require.Len(t, nodes, 1) | |
nodeStatus := nodes[0] | |
require.NotNil(t, nodeStatus) | |
require.True(t, nodeStatus.Shards[0].Compressed) | |
}) | |
}) | |
} | |
func testStatusResponse(t *testing.T, minimalAssertions, verboseAssertions func(*testing.T, *models.NodeStatus), | |
class string, | |
) { | |
minimal, verbose := verbosity.OutputMinimal, verbosity.OutputVerbose | |
commonTests := func(resp *nodes.NodesGetOK) { | |
require.NotNil(t, resp.Payload) | |
nodes := resp.Payload.Nodes | |
require.NotNil(t, nodes) | |
require.Len(t, nodes, 1) | |
minimalAssertions(t, nodes[0]) | |
} | |
t.Run("minimal", func(t *testing.T) { | |
payload, err := getNodesStatus(t, minimal, class) | |
require.Nil(t, err) | |
commonTests(&nodes.NodesGetOK{Payload: payload}) | |
}) | |
if verboseAssertions != nil { | |
t.Run("verbose", func(t *testing.T) { | |
payload, err := getNodesStatus(t, verbose, class) | |
require.Nil(t, err) | |
commonTests(&nodes.NodesGetOK{Payload: payload}) | |
// If commonTests pass, resp.Nodes[0] != nil | |
verboseAssertions(t, payload.Nodes[0]) | |
}) | |
} | |
} | |
func getNodesStatus(t *testing.T, output, class string) (payload *models.NodesStatusResponse, err error) { | |
if class != "" { | |
params := nodes.NewNodesGetClassParams().WithOutput(&output).WithClassName(class) | |
body, clientErr := helper.Client(t).Nodes.NodesGetClass(params, nil) | |
payload, err = body.Payload, clientErr | |
} else { | |
params := nodes.NewNodesGetParams().WithOutput(&output) | |
body, clientErr := helper.Client(t).Nodes.NodesGet(params, nil) | |
payload, err = body.Payload, clientErr | |
} | |
return | |
} | |