Spaces:
Running
Running
// _ _ | |
// __ _____ __ ___ ___ __ _| |_ ___ | |
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
// \ V V / __/ (_| |\ V /| | (_| | || __/ | |
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
// | |
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. | |
// | |
// CONTACT: [email protected] | |
// | |
package db | |
import ( | |
"fmt" | |
"math/rand" | |
"os" | |
"path" | |
"strings" | |
"testing" | |
"github.com/sirupsen/logrus/hooks/test" | |
"github.com/stretchr/testify/assert" | |
"github.com/stretchr/testify/require" | |
"github.com/weaviate/weaviate/adapters/repos/db/helpers" | |
"github.com/weaviate/weaviate/entities/models" | |
"github.com/weaviate/weaviate/entities/schema" | |
"github.com/weaviate/weaviate/usecases/sharding" | |
) | |
const ( | |
numClasses = 100 | |
numShards = 10 | |
uppercase = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" | |
lowercase = "abcdefghijklmnopqrstuvwxyz" | |
digits = "0123456789" | |
chars = uppercase + lowercase + digits | |
localNode = "node1" | |
) | |
var ( | |
rootFiles = []string{ | |
"classifications.db", | |
"modules.db", | |
"schema.db", | |
} | |
indexDirExts = []string{ | |
".hnsw.commitlog.d", | |
"_someGeoProp.hnsw.commitlog.d", | |
"_lsm", | |
} | |
indexFileExts = []string{ | |
".indexcount", | |
".proplengths", | |
".version", | |
} | |
migratedRootFiles = append(rootFiles, | |
"migration1.22.fs.hierarchy") | |
) | |
func TestFileStructureMigration(t *testing.T) { | |
shardsByClass := make(map[string][]string, numClasses) | |
t.Run("generate index and shard names", func(t *testing.T) { | |
for i := 0; i < numClasses; i++ { | |
c := randClassName() | |
shardsByClass[c] = make([]string, numShards) | |
for j := 0; j < numShards; j++ { | |
s := randShardName() | |
shardsByClass[c][j] = s | |
} | |
} | |
}) | |
root := t.TempDir() | |
t.Run("write test db files", func(t *testing.T) { | |
for _, f := range rootFiles { | |
require.Nil(t, os.WriteFile(path.Join(root, f), nil, os.ModePerm)) | |
} | |
for class, shards := range shardsByClass { | |
for _, shard := range shards { | |
idx := path.Join(root, fmt.Sprintf("%s_%s", strings.ToLower(class), shard)) | |
for _, ext := range indexDirExts { | |
require.Nil(t, os.MkdirAll(idx+ext, os.ModePerm)) | |
} | |
for _, ext := range indexFileExts { | |
require.Nil(t, os.WriteFile(idx+ext, nil, os.ModePerm)) | |
} | |
pqDir := path.Join(root, class, shard, "compressed_objects") | |
require.Nil(t, os.MkdirAll(pqDir, os.ModePerm)) | |
} | |
} | |
}) | |
files, err := os.ReadDir(root) | |
require.Nil(t, err) | |
t.Run("assert expected flat contents length", func(t *testing.T) { | |
// Flat structure root contains: | |
// - (3 dirs + 3 files) per shard per index | |
// - dirs: main commilog, geo prop commitlog, lsm store | |
// - files: indexcount, proplengths, version | |
// - 1 dir per index; shards dirs are nested | |
// - pq store | |
// - 3 root db files | |
expectedLen := numClasses*(numShards*(len(indexDirExts)+len(indexFileExts))+1) + len(rootFiles) | |
require.Len(t, files, expectedLen) | |
}) | |
t.Run("migrate the db", func(t *testing.T) { | |
classes := make([]*models.Class, numClasses) | |
states := make(map[string]*sharding.State, numClasses) | |
i := 0 | |
for class, shards := range shardsByClass { | |
classes[i] = &models.Class{ | |
Class: class, | |
Properties: []*models.Property{{ | |
Name: "someGeoProp", | |
DataType: schema.DataTypeGeoCoordinates.PropString(), | |
}}, | |
} | |
states[class] = &sharding.State{ | |
Physical: make(map[string]sharding.Physical), | |
} | |
states[class].SetLocalName(localNode) | |
for _, shard := range shards { | |
states[class].Physical[shard] = sharding.Physical{ | |
Name: shard, | |
BelongsToNodes: []string{localNode}, | |
} | |
} | |
i++ | |
} | |
db := testDB(root, classes, states) | |
require.Nil(t, db.migrateFileStructureIfNecessary()) | |
}) | |
files, err = os.ReadDir(root) | |
require.Nil(t, err) | |
t.Run("assert expected hierarchical contents length", func(t *testing.T) { | |
// After migration, the hierarchical structure root contains: | |
// - one dir per index | |
// - 3 original root db files, and one additional which is the FS migration indicator | |
expectedLen := numClasses + len(migratedRootFiles) | |
require.Len(t, files, expectedLen) | |
}) | |
t.Run("assert all db files were migrated", func(t *testing.T) { | |
var foundRootFiles []string | |
for _, f := range files { | |
if f.IsDir() { | |
idx := f | |
shardsRoot, err := os.ReadDir(path.Join(root, idx.Name())) | |
require.Nil(t, err) | |
for _, shard := range shardsRoot { | |
assertShardRootContents(t, shardsByClass, root, idx, shard) | |
} | |
} else { | |
foundRootFiles = append(foundRootFiles, f.Name()) | |
} | |
} | |
assert.ElementsMatch(t, migratedRootFiles, foundRootFiles) | |
}) | |
} | |
func assertShardRootContents(t *testing.T, shardsByClass map[string][]string, root string, idx, shard os.DirEntry) { | |
assert.True(t, shard.IsDir()) | |
// Whatever we find in this shard directory, it should be able to | |
// be mapped back to the original flat structure root contents | |
lowercasedClasses := make(map[string]string, len(shardsByClass)) | |
for class := range shardsByClass { | |
lowercasedClasses[strings.ToLower(class)] = class | |
} | |
require.Contains(t, lowercasedClasses, idx.Name()) | |
assert.Contains(t, shardsByClass[lowercasedClasses[idx.Name()]], shard.Name()) | |
// Now we will get a set of all expected files within the shard dir. | |
// Check to see if all of these files are found. | |
expected := expectedShardContents() | |
shardFiles, err := os.ReadDir(path.Join(root, idx.Name(), shard.Name())) | |
require.Nil(t, err) | |
for _, sf := range shardFiles { | |
expected[sf.Name()] = true | |
} | |
expected.assert(t) | |
// Check if pq store was migrated to main store as "vectors_compressed" subdir | |
pqDir := path.Join(root, idx.Name(), shard.Name(), "lsm", helpers.VectorsCompressedBucketLSM) | |
info, err := os.Stat(pqDir) | |
require.NoError(t, err) | |
assert.True(t, info.IsDir()) | |
} | |
func testDB(root string, classes []*models.Class, states map[string]*sharding.State) *DB { | |
logger, _ := test.NewNullLogger() | |
return &DB{ | |
config: Config{RootPath: root}, | |
logger: logger, | |
schemaGetter: &fakeMigrationSchemaGetter{ | |
sch: schema.Schema{Objects: &models.Schema{Classes: classes}}, | |
states: states, | |
}, | |
} | |
} | |
func randClassName() string { | |
return randStringBytes(16) | |
} | |
func randShardName() string { | |
return randStringBytes(8) | |
} | |
func randStringBytes(n int) string { | |
b := make([]byte, n) | |
for i := range b { | |
switch { | |
case i == 0: | |
b[i] = randChar(uppercase) | |
case i == n/2: | |
b[i] = []byte("_")[0] | |
default: | |
b[i] = randChar(chars) | |
} | |
} | |
return string(b) | |
} | |
func randChar(str string) byte { | |
return str[rand.Intn(len(str))] | |
} | |
type shardContents map[string]bool | |
func expectedShardContents() shardContents { | |
return shardContents{ | |
"main.hnsw.commitlog.d": false, | |
"geo.someGeoProp.hnsw.commitlog.d": false, | |
"lsm": false, | |
"indexcount": false, | |
"proplengths": false, | |
"version": false, | |
} | |
} | |
func (c shardContents) assert(t *testing.T) { | |
for name, found := range c { | |
assert.True(t, found, "didn't find %q in shard contents", name) | |
} | |
} | |
type fakeMigrationSchemaGetter struct { | |
sch schema.Schema | |
states map[string]*sharding.State | |
} | |
func (sg *fakeMigrationSchemaGetter) GetSchemaSkipAuth() schema.Schema { | |
return sg.sch | |
} | |
func (sg *fakeMigrationSchemaGetter) Nodes() []string { | |
return nil | |
} | |
func (sg *fakeMigrationSchemaGetter) NodeName() string { | |
return "" | |
} | |
func (sg *fakeMigrationSchemaGetter) ClusterHealthScore() int { | |
return 0 | |
} | |
func (sg *fakeMigrationSchemaGetter) ResolveParentNodes(string, string) (map[string]string, error) { | |
return nil, nil | |
} | |
func (sg *fakeMigrationSchemaGetter) CopyShardingState(class string) *sharding.State { | |
return sg.states[class] | |
} | |
func (sg *fakeMigrationSchemaGetter) ShardOwner(class, shard string) (string, error) { | |
return "", nil | |
} | |
func (sg *fakeMigrationSchemaGetter) TenantShard(class, tenant string) (string, string) { | |
return "", "" | |
} | |
func (sg *fakeMigrationSchemaGetter) ShardFromUUID(class string, uuid []byte) string { | |
return "" | |
} | |
func (sg *fakeMigrationSchemaGetter) ShardReplicas(class, shard string) ([]string, error) { | |
return nil, nil | |
} | |