KevinStephenson
Adding in weaviate code
b110593
raw
history blame
6.38 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package clusterapi_test
import (
"context"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaviate/weaviate/adapters/clients"
"github.com/weaviate/weaviate/adapters/handlers/rest/clusterapi"
"github.com/weaviate/weaviate/entities/models"
"github.com/weaviate/weaviate/usecases/cluster"
"github.com/weaviate/weaviate/usecases/config"
"github.com/weaviate/weaviate/usecases/scaler"
schemauc "github.com/weaviate/weaviate/usecases/schema"
"github.com/weaviate/weaviate/usecases/sharding"
)
// This is a cross-package test that tests the schema manager in a distributed
// settings including some of its dependencies, such as the REST API and
// clients. This setup pretends that replication was one-way for simplicity
// sake, but uses the same components on either side to make sure that it
// would work in both directions.
func TestComponentCluster(t *testing.T) {
t.Run("add class", func(t *testing.T) {
localManager, remoteManager := setupManagers(t)
ctx := context.Background()
err := localManager.AddClass(ctx, nil, testClass())
require.Nil(t, err)
localClass, err := localManager.GetClass(ctx, nil, testClass().Class)
require.Nil(t, err)
remoteClass, err := remoteManager.GetClass(ctx, nil, testClass().Class)
require.Nil(t, err)
assert.Equal(t, localClass, remoteClass)
})
t.Run("add class and extend property", func(t *testing.T) {
localManager, remoteManager := setupManagers(t)
ctx := context.Background()
err := localManager.AddClass(ctx, nil, testClass())
require.Nil(t, err)
err = localManager.AddClassProperty(ctx, nil, testClass().Class, testProperty())
require.Nil(t, err)
localClass, err := localManager.GetClass(ctx, nil, testClass().Class)
require.Nil(t, err)
remoteClass, err := remoteManager.GetClass(ctx, nil, testClass().Class)
require.Nil(t, err)
assert.Equal(t, localClass, remoteClass)
})
t.Run("delete class", func(t *testing.T) {
localManager, remoteManager := setupManagers(t)
ctx := context.Background()
err := localManager.AddClass(ctx, nil, testClass())
require.Nil(t, err)
err = localManager.DeleteClass(ctx, nil, testClass().Class)
require.Nil(t, err)
localSchema, err := localManager.GetSchema(nil)
require.Nil(t, err)
remoteSchema, err := remoteManager.GetSchema(nil)
require.Nil(t, err)
assert.Equal(t, localSchema, remoteSchema)
})
t.Run("add class update config", func(t *testing.T) {
localManager, remoteManager := setupManagers(t)
ctx := context.Background()
err := localManager.AddClass(ctx, nil, testClass())
require.Nil(t, err)
updated := testClass()
updated.VectorIndexConfig.(map[string]interface{})["secondKey"] = "added"
err = localManager.UpdateClass(ctx, nil, testClass().Class, updated)
require.Nil(t, err)
localClass, err := localManager.GetClass(ctx, nil, testClass().Class)
require.Nil(t, err)
remoteClass, err := remoteManager.GetClass(ctx, nil, testClass().Class)
require.Nil(t, err)
assert.Equal(t, localClass, remoteClass)
})
}
func setupManagers(t *testing.T) (*schemauc.Manager, *schemauc.Manager) {
remoteManager := newSchemaManagerWithClusterStateAndClient(
&fakeClusterState{hosts: []string{"node1"}}, nil)
schemaHandlers := clusterapi.NewSchema(remoteManager.TxManager(), clusterapi.NewNoopAuthHandler())
mux := http.NewServeMux()
mux.Handle("/schema/transactions/", http.StripPrefix("/schema/transactions/",
schemaHandlers.Transactions()))
server := httptest.NewServer(mux)
client := clients.NewClusterSchema(&http.Client{})
parsedURL, err := url.Parse(server.URL)
require.Nil(t, err)
state := &fakeClusterState{hosts: []string{parsedURL.Host}}
localManager := newSchemaManagerWithClusterStateAndClient(state, client)
// this will also mark the tx managers as ready
localManager.StartServing(context.Background())
remoteManager.StartServing(context.Background())
return localManager, remoteManager
}
func testClass() *models.Class {
return &models.Class{
Class: "MyClass",
Properties: []*models.Property{
{
Name: "propOne", DataType: []string{"text"},
},
},
VectorIndexConfig: map[string]interface{}{
"foo": "bar",
},
}
}
func testProperty() *models.Property {
return &models.Property{
Name: "propTwo", DataType: []string{"int"},
}
}
// New Local Schema *Manager
func newSchemaManagerWithClusterStateAndClient(clusterState *fakeClusterState,
client cluster.Client,
) *schemauc.Manager {
logger, _ := test.NewNullLogger()
vectorizerValidator := &fakeVectorizerValidator{
valid: []string{"text2vec-contextionary", "model1", "model2"},
}
sm, err := schemauc.NewManager(&NilMigrator{}, newFakeRepo(), logger, &fakeAuthorizer{},
config.Config{DefaultVectorizerModule: config.VectorizerModuleNone},
dummyParseVectorConfig, // only option for now
vectorizerValidator, dummyValidateInvertedConfig,
&fakeModuleConfig{}, clusterState, client, &fakeTxPersistence{},
&fakeScaleOutManager{},
)
if err != nil {
panic(err.Error())
}
return sm
}
type fakeScaleOutManager struct{}
func (f *fakeScaleOutManager) Scale(ctx context.Context,
className string, updated sharding.Config, _, _ int64,
) (*sharding.State, error) {
return nil, nil
}
func (f *fakeScaleOutManager) SetSchemaManager(sm scaler.SchemaManager) {
}
// does nothing as this component test does not involve crashes
type fakeTxPersistence struct{}
func (f *fakeTxPersistence) StoreTx(ctx context.Context,
tx *cluster.Transaction,
) error {
return nil
}
func (f *fakeTxPersistence) DeleteTx(ctx context.Context,
txID string,
) error {
return nil
}
func (f *fakeTxPersistence) IterateAll(ctx context.Context,
cb func(tx *cluster.Transaction),
) error {
return nil
}