Spaces:
Running
Running
// _ _ | |
// __ _____ __ ___ ___ __ _| |_ ___ | |
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
// \ V V / __/ (_| |\ V /| | (_| | || __/ | |
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
// | |
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. | |
// | |
// CONTACT: [email protected] | |
// | |
//go:build integrationTest | |
// +build integrationTest | |
package hnsw | |
import ( | |
"context" | |
"fmt" | |
"os" | |
"path" | |
"testing" | |
"time" | |
"github.com/sirupsen/logrus/hooks/test" | |
"github.com/stretchr/testify/assert" | |
"github.com/stretchr/testify/require" | |
"github.com/weaviate/weaviate/adapters/repos/db/vector/hnsw/distancer" | |
"github.com/weaviate/weaviate/entities/cyclemanager" | |
enthnsw "github.com/weaviate/weaviate/entities/vectorindex/hnsw" | |
) | |
func TestBackup_Integration(t *testing.T) { | |
ctx := context.Background() | |
logger, _ := test.NewNullLogger() | |
dirName := t.TempDir() | |
indexID := "backup-integration-test" | |
parentCommitLoggerCallbacks := cyclemanager.NewCallbackGroup("parentCommitLogger", logger, 1) | |
parentCommitLoggerCycle := cyclemanager.NewManager( | |
cyclemanager.HnswCommitLoggerCycleTicker(), | |
parentCommitLoggerCallbacks.CycleCallback) | |
parentCommitLoggerCycle.Start() | |
defer parentCommitLoggerCycle.StopAndWait(ctx) | |
commitLoggerCallbacks := cyclemanager.NewCallbackGroup("childCommitLogger", logger, 1) | |
commitLoggerCallbacksCtrl := parentCommitLoggerCallbacks.Register("commitLogger", commitLoggerCallbacks.CycleCallback) | |
parentTombstoneCleanupCallbacks := cyclemanager.NewCallbackGroup("parentTombstoneCleanup", logger, 1) | |
parentTombstoneCleanupCycle := cyclemanager.NewManager( | |
cyclemanager.NewFixedTicker(enthnsw.DefaultCleanupIntervalSeconds*time.Second), | |
parentTombstoneCleanupCallbacks.CycleCallback) | |
parentTombstoneCleanupCycle.Start() | |
defer parentTombstoneCleanupCycle.StopAndWait(ctx) | |
tombstoneCleanupCallbacks := cyclemanager.NewCallbackGroup("childTombstoneCleanup", logger, 1) | |
tombstoneCleanupCallbacksCtrl := parentTombstoneCleanupCallbacks.Register("tombstoneCleanup", tombstoneCleanupCallbacks.CycleCallback) | |
combinedCtrl := cyclemanager.NewCombinedCallbackCtrl(2, commitLoggerCallbacksCtrl, tombstoneCleanupCallbacksCtrl) | |
idx, err := New(Config{ | |
RootPath: dirName, | |
ID: indexID, | |
Logger: logger, | |
DistanceProvider: distancer.NewCosineDistanceProvider(), | |
VectorForIDThunk: testVectorForID, | |
MakeCommitLoggerThunk: func() (CommitLogger, error) { | |
return NewCommitLogger(dirName, indexID, logger, commitLoggerCallbacks) | |
}, | |
}, enthnsw.NewDefaultUserConfig(), tombstoneCleanupCallbacks, cyclemanager.NewCallbackGroupNoop(), cyclemanager.NewCallbackGroupNoop(), nil) | |
require.Nil(t, err) | |
idx.PostStartup() | |
t.Run("insert vector into index", func(t *testing.T) { | |
for i := 0; i < 10; i++ { | |
inc := float32(i) | |
err := idx.Add(uint64(i), []float32{inc, inc + 1, inc + 2}) | |
require.Nil(t, err) | |
} | |
}) | |
// let the index age for a second so that | |
// the commitlogger filenames, which are | |
// based on current timestamp, can differ | |
time.Sleep(time.Second) | |
t.Run("pause maintenance", func(t *testing.T) { | |
err = combinedCtrl.Deactivate(ctx) | |
require.Nil(t, err) | |
}) | |
t.Run("switch commit logs", func(t *testing.T) { | |
err = idx.SwitchCommitLogs(ctx) | |
require.Nil(t, err) | |
}) | |
t.Run("list files", func(t *testing.T) { | |
files, err := idx.ListFiles(ctx, dirName) | |
require.Nil(t, err) | |
// by this point there should be two files in the commitlog directory. | |
// one is the active log file, and the other is the previous active | |
// log which was in use prior to `SwitchCommitLogs`. additionally, | |
// maintenance has been paused, so we shouldn't see any .condensed | |
// files either. | |
// | |
// because `ListFiles` is used within the context of backups, | |
// it excludes any currently active log files, which are not part | |
// of the backup. in this case, the only other file is the prev | |
// commitlog, so we should only have 1 result here. | |
assert.Len(t, files, 1) | |
t.Run("verify commitlog dir contents", func(t *testing.T) { | |
// checking to ensure that indeed there are only 2 files in the | |
// commit log directory, and that one of them is the one result | |
// from `ListFiles`, and that the other is not a .condensed file | |
ls, err := os.ReadDir(path.Join(dirName, fmt.Sprintf("%s.hnsw.commitlog.d", indexID))) | |
require.Nil(t, err) | |
assert.Len(t, ls, 2) | |
var prevLogFound bool | |
for _, info := range ls { | |
if path.Base(files[0]) == info.Name() { | |
prevLogFound = true | |
} | |
assert.Empty(t, path.Ext(info.Name())) | |
} | |
assert.True(t, prevLogFound, "previous commitlog not found in commitlog root dir") | |
}) | |
}) | |
t.Run("resume maintenance", func(t *testing.T) { | |
err = combinedCtrl.Activate() | |
require.Nil(t, err) | |
}) | |
err = idx.Shutdown(ctx) | |
require.Nil(t, err) | |
err = combinedCtrl.Unregister(ctx) | |
require.Nil(t, err) | |
} | |