SemanticSearchPOC / adapters /repos /db /backup_integration_test.go
KevinStephenson
Adding in weaviate code
b110593
raw
history blame
9.77 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
//go:build integrationTest
// +build integrationTest
package db
import (
"context"
"fmt"
"os"
"path"
"path/filepath"
"regexp"
"testing"
"time"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaviate/weaviate/entities/models"
"github.com/weaviate/weaviate/entities/schema"
"github.com/weaviate/weaviate/entities/storobj"
enthnsw "github.com/weaviate/weaviate/entities/vectorindex/hnsw"
)
func TestBackup_DBLevel(t *testing.T) {
t.Run("successful backup creation", func(t *testing.T) {
ctx := testCtx()
dirName := t.TempDir()
className := "DBLevelBackupClass"
backupID := "backup1"
now := time.Now()
db := setupTestDB(t, dirName, makeTestClass(className))
defer func() {
require.Nil(t, db.Shutdown(context.Background()))
}()
t.Run("insert data", func(t *testing.T) {
require.Nil(t, db.PutObject(ctx, &models.Object{
Class: className,
CreationTimeUnix: now.UnixNano(),
ID: "ff9fcae5-57b8-431c-b8e2-986fd78f5809",
LastUpdateTimeUnix: now.UnixNano(),
Vector: []float32{1, 2, 3},
VectorWeights: nil,
}, []float32{1, 2, 3}, nil))
})
expectedNodeName := "node1"
expectedShardName := db.schemaGetter.
CopyShardingState(className).
AllPhysicalShards()[0]
testShd := db.GetIndex(schema.ClassName(className)).
shards.Load(expectedShardName)
expectedCounterPath, _ := filepath.Rel(testShd.Index().Config.RootPath, testShd.Counter().FileName())
expectedCounter, err := os.ReadFile(testShd.Counter().FileName())
require.Nil(t, err)
expectedPropLengthPath, _ := filepath.Rel(testShd.Index().Config.RootPath, testShd.GetPropertyLengthTracker().FileName())
expectedShardVersionPath, _ := filepath.Rel(testShd.Index().Config.RootPath, testShd.Versioner().path)
expectedShardVersion, err := os.ReadFile(testShd.Versioner().path)
require.Nil(t, err)
expectedPropLength, err := os.ReadFile(testShd.GetPropertyLengthTracker().FileName())
require.Nil(t, err)
expectedShardState, err := testShd.Index().getSchema.CopyShardingState(className).JSON()
require.Nil(t, err)
expectedSchema, err := testShd.Index().getSchema.GetSchemaSkipAuth().
Objects.Classes[0].MarshalBinary()
require.Nil(t, err)
classes := db.ListBackupable()
t.Run("doesn't fail on casing permutation of existing class", func(t *testing.T) {
err := db.Backupable(ctx, []string{"DBLeVELBackupClass"})
require.NotNil(t, err)
require.Equal(t, "class DBLeVELBackupClass doesn't exist", err.Error())
})
t.Run("create backup", func(t *testing.T) {
err := db.Backupable(ctx, classes)
assert.Nil(t, err)
ch := db.BackupDescriptors(ctx, backupID, classes)
for d := range ch {
assert.Equal(t, className, d.Name)
assert.Len(t, d.Shards, len(classes))
for _, shd := range d.Shards {
assert.Equal(t, expectedShardName, shd.Name)
assert.Equal(t, expectedNodeName, shd.Node)
assert.NotEmpty(t, shd.Files)
for _, f := range shd.Files {
assert.NotEmpty(t, f)
}
assert.Equal(t, expectedCounterPath, shd.DocIDCounterPath)
assert.Equal(t, expectedCounter, shd.DocIDCounter)
assert.Equal(t, expectedPropLengthPath, shd.PropLengthTrackerPath)
assert.Equal(t, expectedPropLength, shd.PropLengthTracker)
assert.Equal(t, expectedShardVersionPath, shd.ShardVersionPath)
assert.Equal(t, expectedShardVersion, shd.Version)
}
assert.Equal(t, expectedShardState, d.ShardingState)
assert.Equal(t, expectedSchema, d.Schema)
}
})
t.Run("release backup", func(t *testing.T) {
for _, class := range classes {
err := db.ReleaseBackup(ctx, backupID, class)
assert.Nil(t, err)
}
})
t.Run("node names from shards", func(t *testing.T) {
res, err := db.Shards(ctx, className)
assert.NoError(t, err)
assert.Len(t, res, 1)
assert.Equal(t, "node1", res[0])
})
t.Run("get all classes", func(t *testing.T) {
res := db.ListClasses(ctx)
assert.Len(t, res, 1)
assert.Equal(t, className, res[0])
})
})
t.Run("failed backup creation from expired context", func(t *testing.T) {
ctx := testCtx()
dirName := t.TempDir()
className := "DBLevelBackupClass"
backupID := "backup1"
now := time.Now()
db := setupTestDB(t, dirName, makeTestClass(className))
defer func() {
require.Nil(t, db.Shutdown(context.Background()))
}()
t.Run("insert data", func(t *testing.T) {
require.Nil(t, db.PutObject(ctx, &models.Object{
Class: className,
CreationTimeUnix: now.UnixNano(),
ID: "ff9fcae5-57b8-431c-b8e2-986fd78f5809",
LastUpdateTimeUnix: now.UnixNano(),
Vector: []float32{1, 2, 3},
VectorWeights: nil,
}, []float32{1, 2, 3}, nil))
})
t.Run("fail with expired context", func(t *testing.T) {
classes := db.ListBackupable()
err := db.Backupable(ctx, classes)
assert.Nil(t, err)
timeoutCtx, cancel := context.WithTimeout(context.Background(), 0)
defer cancel()
ch := db.BackupDescriptors(timeoutCtx, backupID, classes)
for d := range ch {
require.NotNil(t, d.Error)
assert.Contains(t, d.Error.Error(), "context deadline exceeded")
}
})
})
}
func TestBackup_BucketLevel(t *testing.T) {
ctx := testCtx()
className := "BucketLevelBackup"
shard, _ := testShard(t, ctx, className)
t.Run("insert data", func(t *testing.T) {
err := shard.PutObject(ctx, &storobj.Object{
MarshallerVersion: 1,
Object: models.Object{
ID: "8c29da7a-600a-43dc-85fb-83ab2b08c294",
Class: className,
Properties: map[string]interface{}{
"stringField": "somevalue",
},
},
},
)
require.Nil(t, err)
})
t.Run("perform backup sequence", func(t *testing.T) {
objBucket := shard.Store().Bucket("objects")
require.NotNil(t, objBucket)
err := shard.Store().PauseCompaction(ctx)
require.Nil(t, err)
err = objBucket.FlushMemtable()
require.Nil(t, err)
files, err := objBucket.ListFiles(ctx, shard.Index().Config.RootPath)
require.Nil(t, err)
t.Run("check ListFiles, results", func(t *testing.T) {
assert.Len(t, files, 4)
// build regex to get very close approximation to the expected
// contents of the ListFiles result. the only thing we can't
// know for sure is the actual name of the segment group, hence
// the `.*`
re := path.Clean(fmt.Sprintf("%s\\/.*\\.(wal|db|bloom|cna)", shard.Index().Config.RootPath))
// we expect to see only four files inside the bucket at this point:
// 1. a *.db file - the segment itself
// 2. a *.bloom file - the segments' bloom filter (only since v1.17)
// 3. a *.secondary.0.bloom file - the bloom filter for the secondary index at pos 0 (only since v1.17)
// 4. a *.cna file - th segment's count net additions (only since v1.17)
//
// These files are created when the memtable is flushed, and the new
// segment is initialized. Both happens as a result of calling
// FlushMemtable().
for i := range files {
isMatch, err := regexp.MatchString(re, files[i])
assert.Nil(t, err)
assert.True(t, isMatch, files[i])
}
// check that we have one of each: *.db
exts := make([]string, 4)
for i, file := range files {
exts[i] = filepath.Ext(file)
}
assert.Contains(t, exts, ".db") // the main segment
assert.Contains(t, exts, ".cna") // the segment's count net additions
assert.Contains(t, exts, ".bloom") // matches both bloom filters (primary+secondary)
})
err = shard.Store().ResumeCompaction(ctx)
require.Nil(t, err)
})
t.Run("cleanup", func(t *testing.T) {
require.Nil(t, shard.Shutdown(ctx))
require.Nil(t, os.RemoveAll(shard.Index().Config.RootPath))
})
}
func setupTestDB(t *testing.T, rootDir string, classes ...*models.Class) *DB {
logger, _ := test.NewNullLogger()
schemaGetter := &fakeSchemaGetter{
schema: schema.Schema{Objects: &models.Schema{Classes: nil}},
shardState: singleShardState(),
}
db, err := New(logger, Config{
MemtablesFlushIdleAfter: 60,
RootPath: rootDir,
QueryMaximumResults: 10,
MaxImportGoroutinesFactor: 1,
}, &fakeRemoteClient{}, &fakeNodeResolver{}, &fakeRemoteNodeClient{}, &fakeReplicationClient{}, nil)
require.Nil(t, err)
db.SetSchemaGetter(schemaGetter)
require.Nil(t, db.WaitForStartup(testCtx()))
migrator := NewMigrator(db, logger)
for _, class := range classes {
require.Nil(t,
migrator.AddClass(context.Background(), class, schemaGetter.shardState))
}
// update schema getter so it's in sync with class
schemaGetter.schema = schema.Schema{
Objects: &models.Schema{
Classes: classes,
},
}
return db
}
func makeTestClass(className string) *models.Class {
return &models.Class{
VectorIndexConfig: enthnsw.NewDefaultUserConfig(),
InvertedIndexConfig: invertedConfig(),
Class: className,
Properties: []*models.Property{
{
Name: "stringProp",
DataType: schema.DataTypeText.PropString(),
Tokenization: models.PropertyTokenizationWhitespace,
},
},
}
}