File size: 5,029 Bytes
b110593
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
//                           _       _
// __      _____  __ ___   ___  __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
//  \ V  V /  __/ (_| |\ V /| | (_| | ||  __/
//   \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
//  Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
//  CONTACT: [email protected]
//

//go:build integrationTest
// +build integrationTest

package hnsw

import (
	"context"
	"fmt"
	"os"
	"path"
	"testing"
	"time"

	"github.com/sirupsen/logrus/hooks/test"
	"github.com/stretchr/testify/assert"
	"github.com/stretchr/testify/require"
	"github.com/weaviate/weaviate/adapters/repos/db/vector/hnsw/distancer"
	"github.com/weaviate/weaviate/entities/cyclemanager"
	enthnsw "github.com/weaviate/weaviate/entities/vectorindex/hnsw"
)

func TestBackup_Integration(t *testing.T) {
	ctx := context.Background()
	logger, _ := test.NewNullLogger()

	dirName := t.TempDir()
	indexID := "backup-integration-test"

	parentCommitLoggerCallbacks := cyclemanager.NewCallbackGroup("parentCommitLogger", logger, 1)
	parentCommitLoggerCycle := cyclemanager.NewManager(
		cyclemanager.HnswCommitLoggerCycleTicker(),
		parentCommitLoggerCallbacks.CycleCallback)
	parentCommitLoggerCycle.Start()
	defer parentCommitLoggerCycle.StopAndWait(ctx)
	commitLoggerCallbacks := cyclemanager.NewCallbackGroup("childCommitLogger", logger, 1)
	commitLoggerCallbacksCtrl := parentCommitLoggerCallbacks.Register("commitLogger", commitLoggerCallbacks.CycleCallback)

	parentTombstoneCleanupCallbacks := cyclemanager.NewCallbackGroup("parentTombstoneCleanup", logger, 1)
	parentTombstoneCleanupCycle := cyclemanager.NewManager(
		cyclemanager.NewFixedTicker(enthnsw.DefaultCleanupIntervalSeconds*time.Second),
		parentTombstoneCleanupCallbacks.CycleCallback)
	parentTombstoneCleanupCycle.Start()
	defer parentTombstoneCleanupCycle.StopAndWait(ctx)
	tombstoneCleanupCallbacks := cyclemanager.NewCallbackGroup("childTombstoneCleanup", logger, 1)
	tombstoneCleanupCallbacksCtrl := parentTombstoneCleanupCallbacks.Register("tombstoneCleanup", tombstoneCleanupCallbacks.CycleCallback)

	combinedCtrl := cyclemanager.NewCombinedCallbackCtrl(2, commitLoggerCallbacksCtrl, tombstoneCleanupCallbacksCtrl)

	idx, err := New(Config{
		RootPath:         dirName,
		ID:               indexID,
		Logger:           logger,
		DistanceProvider: distancer.NewCosineDistanceProvider(),
		VectorForIDThunk: testVectorForID,
		MakeCommitLoggerThunk: func() (CommitLogger, error) {
			return NewCommitLogger(dirName, indexID, logger, commitLoggerCallbacks)
		},
	}, enthnsw.NewDefaultUserConfig(), tombstoneCleanupCallbacks, cyclemanager.NewCallbackGroupNoop(), cyclemanager.NewCallbackGroupNoop(), nil)
	require.Nil(t, err)
	idx.PostStartup()

	t.Run("insert vector into index", func(t *testing.T) {
		for i := 0; i < 10; i++ {
			inc := float32(i)
			err := idx.Add(uint64(i), []float32{inc, inc + 1, inc + 2})
			require.Nil(t, err)
		}
	})

	// let the index age for a second so that
	// the commitlogger filenames, which are
	// based on current timestamp, can differ
	time.Sleep(time.Second)

	t.Run("pause maintenance", func(t *testing.T) {
		err = combinedCtrl.Deactivate(ctx)
		require.Nil(t, err)
	})

	t.Run("switch commit logs", func(t *testing.T) {
		err = idx.SwitchCommitLogs(ctx)
		require.Nil(t, err)
	})

	t.Run("list files", func(t *testing.T) {
		files, err := idx.ListFiles(ctx, dirName)
		require.Nil(t, err)

		// by this point there should be two files in the commitlog directory.
		// one is the active log file, and the other is the previous active
		// log which was in use prior to `SwitchCommitLogs`. additionally,
		// maintenance has been paused, so we shouldn't see any .condensed
		// files either.
		//
		// because `ListFiles` is used within the context of backups,
		// it excludes any currently active log files, which are not part
		// of the backup. in this case, the only other file is the prev
		// commitlog, so we should only have 1 result here.
		assert.Len(t, files, 1)

		t.Run("verify commitlog dir contents", func(t *testing.T) {
			// checking to ensure that indeed there are only 2 files in the
			// commit log directory, and that one of them is the one result
			// from `ListFiles`, and that the other is not a .condensed file
			ls, err := os.ReadDir(path.Join(dirName, fmt.Sprintf("%s.hnsw.commitlog.d", indexID)))
			require.Nil(t, err)
			assert.Len(t, ls, 2)

			var prevLogFound bool
			for _, info := range ls {
				if path.Base(files[0]) == info.Name() {
					prevLogFound = true
				}

				assert.Empty(t, path.Ext(info.Name()))
			}
			assert.True(t, prevLogFound, "previous commitlog not found in commitlog root dir")
		})
	})

	t.Run("resume maintenance", func(t *testing.T) {
		err = combinedCtrl.Activate()
		require.Nil(t, err)
	})

	err = idx.Shutdown(ctx)
	require.Nil(t, err)

	err = combinedCtrl.Unregister(ctx)
	require.Nil(t, err)
}