KevinStephenson
Adding in weaviate code
b110593
raw
history blame
9.82 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
//go:build integrationTest
// +build integrationTest
package clusterintegrationtest
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/http/httptest"
"net/url"
"path"
"sync"
"time"
"github.com/sirupsen/logrus/hooks/test"
"github.com/weaviate/weaviate/adapters/clients"
"github.com/weaviate/weaviate/adapters/handlers/rest/clusterapi"
"github.com/weaviate/weaviate/adapters/repos/db"
"github.com/weaviate/weaviate/entities/backup"
"github.com/weaviate/weaviate/entities/models"
"github.com/weaviate/weaviate/entities/modulecapabilities"
"github.com/weaviate/weaviate/entities/schema"
modstgfs "github.com/weaviate/weaviate/modules/backup-filesystem"
ubak "github.com/weaviate/weaviate/usecases/backup"
"github.com/weaviate/weaviate/usecases/sharding"
)
type node struct {
name string
repo *db.DB
schemaManager *fakeSchemaManager
backupManager *ubak.Handler
scheduler *ubak.Scheduler
migrator *db.Migrator
hostname string
}
func (n *node) init(dirName string, shardStateRaw []byte,
allNodes *[]*node,
) {
localDir := path.Join(dirName, n.name)
logger, _ := test.NewNullLogger()
nodeResolver := &nodeResolver{
nodes: allNodes,
local: n.name,
}
shardState, err := sharding.StateFromJSON(shardStateRaw, nodeResolver)
if err != nil {
panic(err)
}
client := clients.NewRemoteIndex(&http.Client{})
nodesClient := clients.NewRemoteNode(&http.Client{})
replicaClient := clients.NewReplicationClient(&http.Client{})
n.repo, err = db.New(logger, db.Config{
MemtablesFlushIdleAfter: 60,
RootPath: localDir,
QueryMaximumResults: 10000,
MaxImportGoroutinesFactor: 1,
}, client, nodeResolver, nodesClient, replicaClient, nil)
if err != nil {
panic(err)
}
n.schemaManager = &fakeSchemaManager{
shardState: shardState,
schema: schema.Schema{Objects: &models.Schema{}},
nodeResolver: nodeResolver,
}
n.repo.SetSchemaGetter(n.schemaManager)
err = n.repo.WaitForStartup(context.Background())
if err != nil {
panic(err)
}
backendProvider := newFakeBackupBackendProvider(localDir)
n.backupManager = ubak.NewHandler(
logger, &fakeAuthorizer{}, n.schemaManager, n.repo, backendProvider)
backupClient := clients.NewClusterBackups(&http.Client{})
n.scheduler = ubak.NewScheduler(
&fakeAuthorizer{}, backupClient, n.repo, backendProvider, nodeResolver, logger)
n.migrator = db.NewMigrator(n.repo, logger)
indices := clusterapi.NewIndices(sharding.NewRemoteIndexIncoming(n.repo), n.repo, clusterapi.NewNoopAuthHandler())
mux := http.NewServeMux()
mux.Handle("/indices/", indices.Indices())
backups := clusterapi.NewBackups(n.backupManager, clusterapi.NewNoopAuthHandler())
mux.Handle("/backups/can-commit", backups.CanCommit())
mux.Handle("/backups/commit", backups.Commit())
mux.Handle("/backups/abort", backups.Abort())
mux.Handle("/backups/status", backups.Status())
srv := httptest.NewServer(mux)
u, err := url.Parse(srv.URL)
if err != nil {
panic(err)
}
n.hostname = u.Host
}
type fakeNodes struct {
nodes []string
}
func (f fakeNodes) Candidates() []string {
return f.nodes
}
func (f fakeNodes) LocalName() string {
return f.nodes[0]
}
type fakeSchemaManager struct {
schema schema.Schema
shardState *sharding.State
nodeResolver *nodeResolver
}
func (f *fakeSchemaManager) GetSchemaSkipAuth() schema.Schema {
return f.schema
}
func (f *fakeSchemaManager) CopyShardingState(class string) *sharding.State {
return f.shardState
}
func (f *fakeSchemaManager) ShardOwner(class, shard string) (string, error) {
ss := f.shardState
x, ok := ss.Physical[shard]
if !ok {
return "", fmt.Errorf("shard not found")
}
if len(x.BelongsToNodes) < 1 || x.BelongsToNodes[0] == "" {
return "", fmt.Errorf("owner node not found")
}
return ss.Physical[shard].BelongsToNodes[0], nil
}
func (f *fakeSchemaManager) ShardReplicas(class, shard string) ([]string, error) {
ss := f.shardState
x, ok := ss.Physical[shard]
if !ok {
return nil, fmt.Errorf("shard not found")
}
return x.BelongsToNodes, nil
}
func (f *fakeSchemaManager) TenantShard(class, tenant string) (string, string) {
return tenant, models.TenantActivityStatusHOT
}
func (f *fakeSchemaManager) ShardFromUUID(class string, uuid []byte) string {
ss := f.shardState
return ss.Shard("", string(uuid))
}
func (f *fakeSchemaManager) RestoreClass(ctx context.Context, d *backup.ClassDescriptor, nodeMapping map[string]string) error {
return nil
}
func (f *fakeSchemaManager) Nodes() []string {
return []string{"NOT SET"}
}
func (f *fakeSchemaManager) NodeName() string {
return f.nodeResolver.local
}
func (f *fakeSchemaManager) ClusterHealthScore() int {
return 0
}
func (f *fakeSchemaManager) ResolveParentNodes(_ string, shard string,
) (map[string]string, error) {
return nil, nil
}
type nodeResolver struct {
nodes *[]*node
local string
}
func (r nodeResolver) AllNames() []string {
panic("node resolving not implemented yet")
}
func (r nodeResolver) Candidates() []string {
return nil
}
func (r nodeResolver) LocalName() string {
return r.local
}
func (r nodeResolver) NodeCount() int {
return len(*r.nodes)
}
func (r nodeResolver) NodeHostname(nodeName string) (string, bool) {
for _, node := range *r.nodes {
if node.name == nodeName {
return node.hostname, true
}
}
return "", false
}
func newFakeBackupBackendProvider(backupsPath string) *fakeBackupBackendProvider {
return &fakeBackupBackendProvider{
backupsPath: backupsPath,
}
}
type fakeBackupBackendProvider struct {
backupsPath string
}
func (f *fakeBackupBackendProvider) BackupBackend(name string) (modulecapabilities.BackupBackend, error) {
backend.setLocal(name == modstgfs.Name)
return backend, nil
}
type fakeBackupBackend struct {
sync.Mutex
backupsPath string
backupID string
counter int
isLocal bool
startedAt time.Time
}
func (f *fakeBackupBackend) HomeDir(backupID string) string {
f.Lock()
defer f.Unlock()
return f.backupsPath
}
func (f *fakeBackupBackend) GetObject(ctx context.Context, backupID, key string) ([]byte, error) {
f.Lock()
defer f.Unlock()
f.counter++
if f.counter <= 2 {
return nil, backup.ErrNotFound{}
}
var resp interface{}
if key == ubak.GlobalBackupFile {
resp = f.successGlobalMeta()
} else {
resp = f.successLocalMeta()
}
b, _ := json.Marshal(resp)
return b, nil
}
func (f *fakeBackupBackend) WriteToFile(ctx context.Context, backupID, key, destPath string) error {
f.Lock()
defer f.Unlock()
return nil
}
func (f *fakeBackupBackend) Write(ctx context.Context, backupID, key string, r io.ReadCloser) (int64, error) {
f.Lock()
defer f.Unlock()
defer r.Close()
return 0, nil
}
func (f *fakeBackupBackend) Read(ctx context.Context, backupID, key string, w io.WriteCloser) (int64, error) {
f.Lock()
defer f.Unlock()
defer w.Close()
return 0, nil
}
func (f *fakeBackupBackend) SourceDataPath() string {
f.Lock()
defer f.Unlock()
return f.backupsPath
}
func (f *fakeBackupBackend) setLocal(v bool) {
f.Lock()
defer f.Unlock()
f.isLocal = v
}
func (f *fakeBackupBackend) IsExternal() bool {
f.Lock()
defer f.Unlock()
return !f.isLocal
}
func (f *fakeBackupBackend) Name() string {
return "fakeBackupBackend"
}
func (f *fakeBackupBackend) PutFile(ctx context.Context, backupID, key, srcPath string) error {
f.Lock()
defer f.Unlock()
return nil
}
func (f *fakeBackupBackend) PutObject(ctx context.Context, backupID, key string, byes []byte) error {
f.Lock()
defer f.Unlock()
return nil
}
func (f *fakeBackupBackend) Initialize(ctx context.Context, backupID string) error {
f.Lock()
defer f.Unlock()
return nil
}
func (f *fakeBackupBackend) successGlobalMeta() backup.DistributedBackupDescriptor {
return backup.DistributedBackupDescriptor{
StartedAt: f.startedAt,
ID: f.backupID,
Nodes: map[string]*backup.NodeDescriptor{
"node-0": {
Classes: []string{distributedClass},
Status: "SUCCESS",
},
},
Status: "SUCCESS",
Version: ubak.Version,
ServerVersion: "x.x.x",
}
}
func (f *fakeBackupBackend) successLocalMeta() backup.BackupDescriptor {
return backup.BackupDescriptor{
ID: f.backupID,
Status: "SUCCESS",
ServerVersion: "x.x.x",
Version: ubak.Version,
StartedAt: f.startedAt,
Classes: []backup.ClassDescriptor{
{
Name: distributedClass,
Shards: []*backup.ShardDescriptor{
{
Name: "123",
Node: "node-0",
Files: []string{"some-file.db"},
DocIDCounter: []byte("1"),
DocIDCounterPath: ".",
Version: []byte("1"),
ShardVersionPath: ".",
PropLengthTracker: []byte("1"),
PropLengthTrackerPath: ".",
},
},
ShardingState: []byte("sharding state!"),
Schema: []byte("schema!"),
},
},
}
}
func (f *fakeBackupBackend) reset() {
f.counter = 0
}
type fakeAuthorizer struct{}
func (f *fakeAuthorizer) Authorize(_ *models.Principal, _, _ string) error {
return nil
}