Spaces:
Running
Running
// _ _ | |
// __ _____ __ ___ ___ __ _| |_ ___ | |
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
// \ V V / __/ (_| |\ V /| | (_| | || __/ | |
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
// | |
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. | |
// | |
// CONTACT: [email protected] | |
// | |
package clusterapi_test | |
import ( | |
"context" | |
"net/http" | |
"net/http/httptest" | |
"net/url" | |
"testing" | |
"github.com/sirupsen/logrus/hooks/test" | |
"github.com/stretchr/testify/assert" | |
"github.com/stretchr/testify/require" | |
"github.com/weaviate/weaviate/adapters/clients" | |
"github.com/weaviate/weaviate/adapters/handlers/rest/clusterapi" | |
"github.com/weaviate/weaviate/entities/models" | |
"github.com/weaviate/weaviate/usecases/cluster" | |
"github.com/weaviate/weaviate/usecases/config" | |
"github.com/weaviate/weaviate/usecases/scaler" | |
schemauc "github.com/weaviate/weaviate/usecases/schema" | |
"github.com/weaviate/weaviate/usecases/sharding" | |
) | |
// This is a cross-package test that tests the schema manager in a distributed | |
// settings including some of its dependencies, such as the REST API and | |
// clients. This setup pretends that replication was one-way for simplicity | |
// sake, but uses the same components on either side to make sure that it | |
// would work in both directions. | |
func TestComponentCluster(t *testing.T) { | |
t.Run("add class", func(t *testing.T) { | |
localManager, remoteManager := setupManagers(t) | |
ctx := context.Background() | |
err := localManager.AddClass(ctx, nil, testClass()) | |
require.Nil(t, err) | |
localClass, err := localManager.GetClass(ctx, nil, testClass().Class) | |
require.Nil(t, err) | |
remoteClass, err := remoteManager.GetClass(ctx, nil, testClass().Class) | |
require.Nil(t, err) | |
assert.Equal(t, localClass, remoteClass) | |
}) | |
t.Run("add class and extend property", func(t *testing.T) { | |
localManager, remoteManager := setupManagers(t) | |
ctx := context.Background() | |
err := localManager.AddClass(ctx, nil, testClass()) | |
require.Nil(t, err) | |
err = localManager.AddClassProperty(ctx, nil, testClass().Class, testProperty()) | |
require.Nil(t, err) | |
localClass, err := localManager.GetClass(ctx, nil, testClass().Class) | |
require.Nil(t, err) | |
remoteClass, err := remoteManager.GetClass(ctx, nil, testClass().Class) | |
require.Nil(t, err) | |
assert.Equal(t, localClass, remoteClass) | |
}) | |
t.Run("delete class", func(t *testing.T) { | |
localManager, remoteManager := setupManagers(t) | |
ctx := context.Background() | |
err := localManager.AddClass(ctx, nil, testClass()) | |
require.Nil(t, err) | |
err = localManager.DeleteClass(ctx, nil, testClass().Class) | |
require.Nil(t, err) | |
localSchema, err := localManager.GetSchema(nil) | |
require.Nil(t, err) | |
remoteSchema, err := remoteManager.GetSchema(nil) | |
require.Nil(t, err) | |
assert.Equal(t, localSchema, remoteSchema) | |
}) | |
t.Run("add class update config", func(t *testing.T) { | |
localManager, remoteManager := setupManagers(t) | |
ctx := context.Background() | |
err := localManager.AddClass(ctx, nil, testClass()) | |
require.Nil(t, err) | |
updated := testClass() | |
updated.VectorIndexConfig.(map[string]interface{})["secondKey"] = "added" | |
err = localManager.UpdateClass(ctx, nil, testClass().Class, updated) | |
require.Nil(t, err) | |
localClass, err := localManager.GetClass(ctx, nil, testClass().Class) | |
require.Nil(t, err) | |
remoteClass, err := remoteManager.GetClass(ctx, nil, testClass().Class) | |
require.Nil(t, err) | |
assert.Equal(t, localClass, remoteClass) | |
}) | |
} | |
func setupManagers(t *testing.T) (*schemauc.Manager, *schemauc.Manager) { | |
remoteManager := newSchemaManagerWithClusterStateAndClient( | |
&fakeClusterState{hosts: []string{"node1"}}, nil) | |
schemaHandlers := clusterapi.NewSchema(remoteManager.TxManager(), clusterapi.NewNoopAuthHandler()) | |
mux := http.NewServeMux() | |
mux.Handle("/schema/transactions/", http.StripPrefix("/schema/transactions/", | |
schemaHandlers.Transactions())) | |
server := httptest.NewServer(mux) | |
client := clients.NewClusterSchema(&http.Client{}) | |
parsedURL, err := url.Parse(server.URL) | |
require.Nil(t, err) | |
state := &fakeClusterState{hosts: []string{parsedURL.Host}} | |
localManager := newSchemaManagerWithClusterStateAndClient(state, client) | |
// this will also mark the tx managers as ready | |
localManager.StartServing(context.Background()) | |
remoteManager.StartServing(context.Background()) | |
return localManager, remoteManager | |
} | |
func testClass() *models.Class { | |
return &models.Class{ | |
Class: "MyClass", | |
Properties: []*models.Property{ | |
{ | |
Name: "propOne", DataType: []string{"text"}, | |
}, | |
}, | |
VectorIndexConfig: map[string]interface{}{ | |
"foo": "bar", | |
}, | |
} | |
} | |
func testProperty() *models.Property { | |
return &models.Property{ | |
Name: "propTwo", DataType: []string{"int"}, | |
} | |
} | |
// New Local Schema *Manager | |
func newSchemaManagerWithClusterStateAndClient(clusterState *fakeClusterState, | |
client cluster.Client, | |
) *schemauc.Manager { | |
logger, _ := test.NewNullLogger() | |
vectorizerValidator := &fakeVectorizerValidator{ | |
valid: []string{"text2vec-contextionary", "model1", "model2"}, | |
} | |
sm, err := schemauc.NewManager(&NilMigrator{}, newFakeRepo(), logger, &fakeAuthorizer{}, | |
config.Config{DefaultVectorizerModule: config.VectorizerModuleNone}, | |
dummyParseVectorConfig, // only option for now | |
vectorizerValidator, dummyValidateInvertedConfig, | |
&fakeModuleConfig{}, clusterState, client, &fakeTxPersistence{}, | |
&fakeScaleOutManager{}, | |
) | |
if err != nil { | |
panic(err.Error()) | |
} | |
return sm | |
} | |
type fakeScaleOutManager struct{} | |
func (f *fakeScaleOutManager) Scale(ctx context.Context, | |
className string, updated sharding.Config, _, _ int64, | |
) (*sharding.State, error) { | |
return nil, nil | |
} | |
func (f *fakeScaleOutManager) SetSchemaManager(sm scaler.SchemaManager) { | |
} | |
// does nothing as this component test does not involve crashes | |
type fakeTxPersistence struct{} | |
func (f *fakeTxPersistence) StoreTx(ctx context.Context, | |
tx *cluster.Transaction, | |
) error { | |
return nil | |
} | |
func (f *fakeTxPersistence) DeleteTx(ctx context.Context, | |
txID string, | |
) error { | |
return nil | |
} | |
func (f *fakeTxPersistence) IterateAll(ctx context.Context, | |
cb func(tx *cluster.Transaction), | |
) error { | |
return nil | |
} | |