Spaces:
Running
Running
File size: 5,029 Bytes
b110593 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
//go:build integrationTest
// +build integrationTest
package hnsw
import (
"context"
"fmt"
"os"
"path"
"testing"
"time"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaviate/weaviate/adapters/repos/db/vector/hnsw/distancer"
"github.com/weaviate/weaviate/entities/cyclemanager"
enthnsw "github.com/weaviate/weaviate/entities/vectorindex/hnsw"
)
func TestBackup_Integration(t *testing.T) {
ctx := context.Background()
logger, _ := test.NewNullLogger()
dirName := t.TempDir()
indexID := "backup-integration-test"
parentCommitLoggerCallbacks := cyclemanager.NewCallbackGroup("parentCommitLogger", logger, 1)
parentCommitLoggerCycle := cyclemanager.NewManager(
cyclemanager.HnswCommitLoggerCycleTicker(),
parentCommitLoggerCallbacks.CycleCallback)
parentCommitLoggerCycle.Start()
defer parentCommitLoggerCycle.StopAndWait(ctx)
commitLoggerCallbacks := cyclemanager.NewCallbackGroup("childCommitLogger", logger, 1)
commitLoggerCallbacksCtrl := parentCommitLoggerCallbacks.Register("commitLogger", commitLoggerCallbacks.CycleCallback)
parentTombstoneCleanupCallbacks := cyclemanager.NewCallbackGroup("parentTombstoneCleanup", logger, 1)
parentTombstoneCleanupCycle := cyclemanager.NewManager(
cyclemanager.NewFixedTicker(enthnsw.DefaultCleanupIntervalSeconds*time.Second),
parentTombstoneCleanupCallbacks.CycleCallback)
parentTombstoneCleanupCycle.Start()
defer parentTombstoneCleanupCycle.StopAndWait(ctx)
tombstoneCleanupCallbacks := cyclemanager.NewCallbackGroup("childTombstoneCleanup", logger, 1)
tombstoneCleanupCallbacksCtrl := parentTombstoneCleanupCallbacks.Register("tombstoneCleanup", tombstoneCleanupCallbacks.CycleCallback)
combinedCtrl := cyclemanager.NewCombinedCallbackCtrl(2, commitLoggerCallbacksCtrl, tombstoneCleanupCallbacksCtrl)
idx, err := New(Config{
RootPath: dirName,
ID: indexID,
Logger: logger,
DistanceProvider: distancer.NewCosineDistanceProvider(),
VectorForIDThunk: testVectorForID,
MakeCommitLoggerThunk: func() (CommitLogger, error) {
return NewCommitLogger(dirName, indexID, logger, commitLoggerCallbacks)
},
}, enthnsw.NewDefaultUserConfig(), tombstoneCleanupCallbacks, cyclemanager.NewCallbackGroupNoop(), cyclemanager.NewCallbackGroupNoop(), nil)
require.Nil(t, err)
idx.PostStartup()
t.Run("insert vector into index", func(t *testing.T) {
for i := 0; i < 10; i++ {
inc := float32(i)
err := idx.Add(uint64(i), []float32{inc, inc + 1, inc + 2})
require.Nil(t, err)
}
})
// let the index age for a second so that
// the commitlogger filenames, which are
// based on current timestamp, can differ
time.Sleep(time.Second)
t.Run("pause maintenance", func(t *testing.T) {
err = combinedCtrl.Deactivate(ctx)
require.Nil(t, err)
})
t.Run("switch commit logs", func(t *testing.T) {
err = idx.SwitchCommitLogs(ctx)
require.Nil(t, err)
})
t.Run("list files", func(t *testing.T) {
files, err := idx.ListFiles(ctx, dirName)
require.Nil(t, err)
// by this point there should be two files in the commitlog directory.
// one is the active log file, and the other is the previous active
// log which was in use prior to `SwitchCommitLogs`. additionally,
// maintenance has been paused, so we shouldn't see any .condensed
// files either.
//
// because `ListFiles` is used within the context of backups,
// it excludes any currently active log files, which are not part
// of the backup. in this case, the only other file is the prev
// commitlog, so we should only have 1 result here.
assert.Len(t, files, 1)
t.Run("verify commitlog dir contents", func(t *testing.T) {
// checking to ensure that indeed there are only 2 files in the
// commit log directory, and that one of them is the one result
// from `ListFiles`, and that the other is not a .condensed file
ls, err := os.ReadDir(path.Join(dirName, fmt.Sprintf("%s.hnsw.commitlog.d", indexID)))
require.Nil(t, err)
assert.Len(t, ls, 2)
var prevLogFound bool
for _, info := range ls {
if path.Base(files[0]) == info.Name() {
prevLogFound = true
}
assert.Empty(t, path.Ext(info.Name()))
}
assert.True(t, prevLogFound, "previous commitlog not found in commitlog root dir")
})
})
t.Run("resume maintenance", func(t *testing.T) {
err = combinedCtrl.Activate()
require.Nil(t, err)
})
err = idx.Shutdown(ctx)
require.Nil(t, err)
err = combinedCtrl.Unregister(ctx)
require.Nil(t, err)
}
|