// _ _ // __ _____ __ ___ ___ __ _| |_ ___ // \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ // \ V V / __/ (_| |\ V /| | (_| | || __/ // \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| // // Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. // // CONTACT: hello@weaviate.io // package backup import ( "context" "errors" "os" "path/filepath" "testing" "time" "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/weaviate/weaviate/entities/backup" "github.com/weaviate/weaviate/entities/models" "github.com/weaviate/weaviate/entities/modulecapabilities" ) const ( nodeName = "Node-1" ) var errNotFound = backup.NewErrNotFound(errors.New("not found")) func (r *backupper) waitForCompletion(n, ms int) backup.Status { for i := 0; i < n; i++ { time.Sleep(time.Millisecond * time.Duration(ms)) if i < 1 { continue } if x := r.lastOp.get(); x.Status != "" { return x.Status } } return "" } func TestBackupStatus(t *testing.T) { t.Parallel() var ( backendName = "s3" id = "1234" ctx = context.Background() starTime = time.Date(2022, 1, 1, 1, 0, 0, 0, time.UTC) nodeHome = id + "/" + nodeName path = "bucket/backups/" + nodeHome rawstatus = string(backup.Transferring) want = &models.BackupCreateStatusResponse{ ID: id, Path: path, Status: &rawstatus, Backend: backendName, } ) t.Run("ActiveState", func(t *testing.T) { m := createManager(nil, nil, nil, nil) m.backupper.lastOp.reqStat = reqStat{ Starttime: starTime, ID: id, Status: backup.Transferring, Path: path, } st, err := m.backupper.Status(ctx, backendName, id) assert.Nil(t, err) assert.Equal(t, want, st) }) t.Run("GetBackupProvider", func(t *testing.T) { m := createManager(nil, nil, nil, ErrAny) _, err := m.backupper.Status(ctx, backendName, id) assert.NotNil(t, err) }) t.Run("MetadataNotFound", func(t *testing.T) { backend := &fakeBackend{} backend.On("GetObject", ctx, nodeHome, BackupFile).Return(nil, ErrAny) backend.On("GetObject", ctx, id, BackupFile).Return(nil, ErrAny) m := createManager(nil, nil, backend, nil) _, err := m.backupper.Status(ctx, backendName, id) assert.NotNil(t, err) nerr := backup.ErrNotFound{} if !errors.As(err, &nerr) { t.Errorf("error want=%v got=%v", nerr, err) } }) t.Run("ReadFromMetadata", func(t *testing.T) { backend := &fakeBackend{} bytes := marshalMeta(backup.BackupDescriptor{Status: string(backup.Transferring)}) backend.On("GetObject", ctx, nodeHome, BackupFile).Return(bytes, nil) backend.On("HomeDir", mock.Anything).Return(path) m := createManager(nil, nil, backend, nil) got, err := m.backupper.Status(ctx, backendName, id) assert.Nil(t, err) assert.Equal(t, want, got) }) t.Run("ReadFromMetadataError", func(t *testing.T) { backend := &fakeBackend{} st := string(backup.Failed) bytes := marshalMeta(backup.BackupDescriptor{Status: st, Error: "error1"}) want = &models.BackupCreateStatusResponse{ ID: id, Path: path, Status: &st, Backend: backendName, } backend.On("GetObject", ctx, nodeHome, BackupFile).Return(bytes, nil) backend.On("HomeDir", mock.Anything).Return(path) m := createManager(nil, nil, backend, nil) _, err := m.backupper.Status(ctx, backendName, id) assert.NotNil(t, err) assert.ErrorContains(t, err, "error1") }) } func TestBackupOnStatus(t *testing.T) { t.Parallel() var ( backendName = "s3" id = "1234" ctx = context.Background() starTime = time.Date(2022, 1, 1, 1, 0, 0, 0, time.UTC) nodeHome = id + "/" + nodeName path = "bucket/backups/" + nodeHome req = StatusRequest{ Method: OpCreate, ID: id, Backend: backendName, } ) t.Run("ActiveState", func(t *testing.T) { m := createManager(nil, nil, nil, nil) m.backupper.lastOp.reqStat = reqStat{ Starttime: starTime, ID: id, Status: backup.Transferring, Path: path, } want := &StatusResponse{ Method: OpCreate, ID: id, Status: backup.Transferring, } st := m.OnStatus(ctx, &req) assert.Equal(t, want, st) }) t.Run("GetBackupProvider", func(t *testing.T) { want := &StatusResponse{ Method: OpCreate, ID: id, Status: backup.Failed, } m := createManager(nil, nil, nil, ErrAny) got := m.OnStatus(ctx, &req) assert.Contains(t, got.Err, req.Backend) want.Err = got.Err assert.Equal(t, want, got) }) t.Run("MetadataNotFound", func(t *testing.T) { want := &StatusResponse{ Method: OpCreate, ID: id, Status: backup.Failed, } backend := &fakeBackend{} backend.On("GetObject", ctx, nodeHome, BackupFile).Return(nil, ErrAny) backend.On("GetObject", ctx, id, BackupFile).Return(nil, ErrAny) m := createManager(nil, nil, backend, nil) got := m.OnStatus(ctx, &req) assert.Contains(t, got.Err, errMetaNotFound.Error()) want.Err = got.Err assert.Equal(t, want, got) }) t.Run("ReadFromMetadata", func(t *testing.T) { want := &StatusResponse{ Method: OpCreate, ID: id, Status: backup.Success, } backend := &fakeBackend{} bytes := marshalMeta(backup.BackupDescriptor{Status: string(backup.Success)}) backend.On("GetObject", ctx, nodeHome, BackupFile).Return(bytes, nil) backend.On("HomeDir", mock.Anything).Return(path) m := createManager(nil, nil, backend, nil) got := m.OnStatus(ctx, &req) assert.Equal(t, want, got) }) } func TestManagerCreateBackup(t *testing.T) { t.Parallel() var ( cls = "Class-A" cls2 = "Class-B" backendName = "gcs" backupID = "1" ctx = context.Background() nodeHome = backupID + "/" + nodeName path = "bucket/backups/" + nodeHome req = BackupRequest{ ID: backupID, Include: []string{cls}, Backend: backendName, } ) t.Run("AnotherBackupIsInProgress", func(t *testing.T) { req1 := BackupRequest{ ID: backupID, Include: []string{cls}, Backend: backendName, } sourcer := &fakeSourcer{} // first sourcer.On("Backupable", ctx, req1.Include).Return(nil) sourcer.On("CreateBackup", ctx, any).Return(nil, nil) sourcer.On("ReleaseBackup", ctx, any).Return(nil) var ch <-chan backup.ClassDescriptor sourcer.On("BackupDescriptors", any, any, any).Return(ch) // just block backend := &fakeBackend{} // second backend.On("GetObject", ctx, nodeHome, BackupFile).Return(nil, backup.ErrNotFound{}) backend.On("GetObject", ctx, backupID, BackupFile).Return(nil, backup.ErrNotFound{}) backend.On("HomeDir", any).Return(path) sourcer.On("Backupable", any, req1.Include).Return(nil) backend.On("Initialize", ctx, nodeHome).Return(nil) sourcer.On("CreateBackup", ctx, any).Return(nil, ErrAny) sourcer.On("ReleaseBackup", ctx, any).Return(nil) m := createManager(sourcer, nil, backend, nil) resp1, err := m.Backup(ctx, nil, &req1) assert.Nil(t, err) status1 := string(backup.Started) want1 := &models.BackupCreateResponse{ Backend: backendName, Classes: req1.Include, ID: backupID, Status: &status1, Path: path, } assert.Equal(t, resp1, want1) resp2, err := m.Backup(ctx, nil, &req1) assert.NotNil(t, err) assert.Contains(t, err.Error(), "already in progress") assert.IsType(t, backup.ErrUnprocessable{}, err) assert.Nil(t, resp2) }) t.Run("InitMetadata", func(t *testing.T) { classes := []string{cls} sourcer := &fakeSourcer{} sourcer.On("Backupable", ctx, classes).Return(nil) backend := &fakeBackend{} backend.On("HomeDir", mock.Anything).Return(path) backend.On("GetObject", ctx, nodeHome, BackupFile).Return(nil, errNotFound) backend.On("GetObject", ctx, backupID, BackupFile).Return(nil, errNotFound) backend.On("Initialize", ctx, nodeHome).Return(errors.New("init meta failed")) bm := createManager(sourcer, nil, backend, nil) meta, err := bm.Backup(ctx, nil, &BackupRequest{ Backend: backendName, ID: backupID, Include: classes, }) assert.Nil(t, meta) assert.NotNil(t, err) assert.Contains(t, err.Error(), "init") assert.IsType(t, backup.ErrUnprocessable{}, err) }) t.Run("Success", func(t *testing.T) { var ( classes = []string{cls} sourcePath = t.TempDir() sourcer = &fakeSourcer{} backend = newFakeBackend() ) sourcer.On("Backupable", ctx, classes).Return(nil) ch := fakeBackupDescriptor(genClassDescriptions(t, sourcePath, cls, cls2)...) sourcer.On("BackupDescriptors", any, backupID, mock.Anything).Return(ch) sourcer.On("ReleaseBackup", ctx, backupID, mock.Anything).Return(nil) backend.On("HomeDir", mock.Anything).Return(path) backend.On("SourceDataPath").Return(sourcePath) backend.On("GetObject", ctx, nodeHome, BackupFile).Return(nil, errNotFound) backend.On("GetObject", ctx, backupID, BackupFile).Return(nil, errNotFound) backend.On("Initialize", ctx, nodeHome).Return(nil) backend.On("PutObject", any, nodeHome, BackupFile, mock.Anything).Return(nil).Twice() backend.On("Write", any, nodeHome, any, any).Return(any, nil) m := createManager(sourcer, nil, backend, nil) resp, err := m.Backup(ctx, nil, &req) assert.Nil(t, err) status1 := string(backup.Started) want1 := &models.BackupCreateResponse{ Backend: backendName, Classes: req.Include, ID: backupID, Status: &status1, Path: path, } assert.Equal(t, resp, want1) m.backupper.waitForCompletion(10, 50) assert.Equal(t, string(backup.Success), backend.meta.Status) assert.Equal(t, backend.meta.Error, "") }) t.Run("PutFile", func(t *testing.T) { var ( classes = []string{cls} sourcePath = t.TempDir() sourcer = &fakeSourcer{} backend = newFakeBackend() ) sourcer.On("Backupable", ctx, classes).Return(nil) ch := fakeBackupDescriptor(genClassDescriptions(t, sourcePath, cls, cls2)...) sourcer.On("BackupDescriptors", any, backupID, mock.Anything).Return(ch) sourcer.On("ReleaseBackup", ctx, backupID, mock.Anything).Return(nil) backend.On("HomeDir", mock.Anything).Return(path) backend.On("SourceDataPath").Return(sourcePath) backend.On("GetObject", ctx, nodeHome, BackupFile).Return(nil, errNotFound) backend.On("GetObject", ctx, backupID, BackupFile).Return(nil, errNotFound) backend.On("Initialize", ctx, nodeHome).Return(nil) backend.On("Write", any, nodeHome, any, any).Return(any, ErrAny).Once() backend.On("PutObject", any, nodeHome, BackupFile, any).Return(nil).Once() m := createManager(sourcer, nil, backend, nil) resp, err := m.Backup(ctx, nil, &req) assert.Nil(t, err) status1 := string(backup.Started) want1 := &models.BackupCreateResponse{ Backend: backendName, Classes: req.Include, ID: backupID, Status: &status1, Path: path, } assert.Equal(t, resp, want1) m.backupper.waitForCompletion(10, 50) assert.Equal(t, string(backup.Transferring), backend.meta.Status) assert.Contains(t, backend.meta.Error, "pipe") }) t.Run("ClassDescriptor", func(t *testing.T) { var ( classes = []string{cls} sourcer = &fakeSourcer{} sourcePath = t.TempDir() errNotFound = errNotFound backend = newFakeBackend() ) sourcer.On("Backupable", ctx, classes).Return(nil) cs := genClassDescriptions(t, sourcePath, cls, cls2) cs[1].Error = ErrAny ch := fakeBackupDescriptor(cs...) sourcer.On("BackupDescriptors", any, backupID, mock.Anything).Return(ch) sourcer.On("ReleaseBackup", ctx, backupID, mock.Anything).Return(nil) backend.On("HomeDir", mock.Anything).Return(path) backend.On("SourceDataPath").Return(sourcePath) backend.On("GetObject", ctx, nodeHome, BackupFile).Return(nil, errNotFound) backend.On("GetObject", ctx, backupID, BackupFile).Return(nil, errNotFound) backend.On("Initialize", ctx, nodeHome).Return(nil) backend.On("Write", mock.Anything, nodeHome, mock.Anything, mock.Anything).Return(any, nil) backend.On("PutObject", mock.Anything, nodeHome, BackupFile, mock.Anything).Return(nil).Once() m := createManager(sourcer, nil, backend, nil) resp, err := m.Backup(ctx, nil, &req) assert.Nil(t, err) status1 := string(backup.Started) want1 := &models.BackupCreateResponse{ Backend: backendName, Classes: req.Include, ID: backupID, Status: &status1, Path: path, } assert.Equal(t, resp, want1) m.backupper.waitForCompletion(10, 50) assert.Nil(t, err) assert.Equal(t, backend.meta.Status, string(backup.Transferring)) assert.Equal(t, backend.meta.Error, ErrAny.Error()) }) } func TestManagerCoordinatedBackup(t *testing.T) { t.Parallel() var ( cls = "Class-A" cls2 = "Class-B" backendName = "gcs" backupID = "1" ctx = context.Background() nodeHome = backupID + "/" + nodeName path = "bucket/backups/" + nodeHome req = Request{ Method: OpCreate, ID: backupID, Classes: []string{cls, cls2}, Backend: backendName, Duration: time.Millisecond * 20, } any = mock.Anything ) t.Run("BackendUnregistered", func(t *testing.T) { backendError := errors.New("I do not exist") bm := createManager(nil, nil, nil, backendError) ret := bm.OnCanCommit(ctx, &req) assert.Contains(t, ret.Err, backendName) }) t.Run("ClassNotBackupable", func(t *testing.T) { backend := &fakeBackend{} backend.On("HomeDir", mock.Anything).Return(path) backend.On("GetObject", ctx, nodeHome, BackupFile).Return(nil, errNotFound) sourcer := &fakeSourcer{} sourcer.On("Backupable", ctx, req.Classes).Return(ErrAny) bm := createManager(sourcer, nil, backend, nil) resp := bm.OnCanCommit(ctx, &req) assert.Contains(t, resp.Err, ErrAny.Error()) assert.Equal(t, resp.Timeout, time.Duration(0)) }) t.Run("InitializeBackend", func(t *testing.T) { backend := &fakeBackend{} backend.On("HomeDir", mock.Anything).Return(path) backend.On("GetObject", ctx, nodeHome, BackupFile).Return(nil, errNotFound) sourcer := &fakeSourcer{} sourcer.On("Backupable", ctx, req.Classes).Return(nil) backend.On("Initialize", ctx, nodeHome).Return(errors.New("init meta failed")) bm := createManager(sourcer, nil, backend, nil) resp := bm.OnCanCommit(ctx, &req) assert.Contains(t, resp.Err, "init") assert.Equal(t, resp.Timeout, time.Duration(0)) }) t.Run("AnotherBackupIsInProgress", func(t *testing.T) { // first sourcer := &fakeSourcer{} sourcer.On("Backupable", ctx, req.Classes).Return(nil) sourcer.On("CreateBackup", mock.Anything, mock.Anything).Return(nil, nil) sourcer.On("ReleaseBackup", mock.Anything, mock.Anything).Return(nil) var ch <-chan backup.ClassDescriptor sourcer.On("BackupDescriptors", any, any, any).Return(ch) backend := &fakeBackend{} backend.On("GetObject", ctx, nodeHome, BackupFile).Return(nil, backup.ErrNotFound{}) backend.On("HomeDir", mock.Anything).Return(path) backend.On("Initialize", ctx, mock.Anything).Return(nil) m := createManager(sourcer, nil, backend, nil) // second resp1 := m.OnCanCommit(ctx, &req) want1 := &CanCommitResponse{ Method: OpCreate, ID: req.ID, Timeout: req.Duration, } assert.Equal(t, resp1, want1) resp := m.OnCanCommit(ctx, &req) assert.Contains(t, resp.Err, "already in progress") assert.Equal(t, resp.Timeout, time.Duration(0)) }) t.Run("Success", func(t *testing.T) { var ( sourcePath = t.TempDir() sourcer = &fakeSourcer{} backend = newFakeBackend() ) sourcer.On("Backupable", ctx, req.Classes).Return(nil) ch := fakeBackupDescriptor(genClassDescriptions(t, sourcePath, cls, cls2)...) sourcer.On("BackupDescriptors", any, backupID, mock.Anything).Return(ch) sourcer.On("ReleaseBackup", ctx, backupID, mock.Anything).Return(nil) backend.On("HomeDir", mock.Anything).Return(path) backend.On("SourceDataPath").Return(sourcePath) backend.On("GetObject", ctx, nodeHome, BackupFile).Return(nil, errNotFound) backend.On("Initialize", ctx, nodeHome).Return(nil) backend.On("PutObject", mock.Anything, nodeHome, BackupFile, mock.Anything).Return(nil).Once() backend.On("Write", mock.Anything, nodeHome, mock.Anything, mock.Anything).Return(any, nil) m := createManager(sourcer, nil, backend, nil) req := req req.Duration = time.Hour got := m.OnCanCommit(ctx, &req) want := &CanCommitResponse{OpCreate, req.ID, _TimeoutShardCommit, ""} assert.Equal(t, got, want) err := m.OnCommit(ctx, &StatusRequest{OpCreate, req.ID, backendName}) assert.Nil(t, err) m.backupper.waitForCompletion(20, 50) assert.Equal(t, string(backup.Success), backend.meta.Status) assert.Equal(t, "", backend.meta.Error) }) t.Run("AbortBeforeCommit", func(t *testing.T) { var ( sourcePath = t.TempDir() sourcer = &fakeSourcer{} backend = newFakeBackend() ) sourcer.On("Backupable", ctx, req.Classes).Return(nil) ch := fakeBackupDescriptor(genClassDescriptions(t, sourcePath, cls, cls2)...) sourcer.On("BackupDescriptors", any, backupID, mock.Anything).Return(ch) sourcer.On("ReleaseBackup", ctx, backupID, mock.Anything).Return(nil) backend.On("HomeDir", mock.Anything).Return(path) backend.On("SourceDataPath").Return(sourcePath) backend.On("GetObject", ctx, nodeHome, BackupFile).Return(nil, errNotFound) backend.On("Initialize", ctx, nodeHome).Return(nil) backend.On("PutObject", mock.Anything, nodeHome, BackupFile, mock.Anything).Return(nil).Once() backend.On("Write", mock.Anything, nodeHome, mock.Anything, mock.Anything).Return(any, nil) m := createManager(sourcer, nil, backend, nil) req := req req.Duration = time.Hour got := m.OnCanCommit(ctx, &req) want := &CanCommitResponse{OpCreate, req.ID, _TimeoutShardCommit, ""} assert.Equal(t, got, want) err := m.OnAbort(ctx, &AbortRequest{OpCreate, req.ID, backendName}) assert.Nil(t, err) m.backupper.waitForCompletion(20, 50) assert.Contains(t, m.backupper.lastAsyncError.Error(), "abort") }) t.Run("AbortCommit", func(t *testing.T) { var ( sourcePath = t.TempDir() sourcer = &fakeSourcer{} backend = newFakeBackend() m = createManager(sourcer, nil, backend, nil) ) sourcer.On("Backupable", ctx, req.Classes).Return(nil) ch := fakeBackupDescriptor(genClassDescriptions(t, sourcePath, cls, cls2)...) sourcer.On("BackupDescriptors", any, backupID, mock.Anything).Return(ch).RunFn = func(a mock.Arguments) { m.OnAbort(ctx, &AbortRequest{OpCreate, req.ID, backendName}) // give the abort request time to propagate time.Sleep(10 * time.Millisecond) } sourcer.On("ReleaseBackup", ctx, backupID, mock.Anything).Return(nil) // backend backend.On("HomeDir", mock.Anything).Return(path) backend.On("SourceDataPath").Return(sourcePath) backend.On("GetObject", ctx, nodeHome, BackupFile).Return(nil, errNotFound) backend.On("Initialize", ctx, nodeHome).Return(nil) backend.On("PutObject", mock.Anything, nodeHome, BackupFile, mock.Anything).Return(nil).Once() backend.On("Write", mock.Anything, nodeHome, mock.Anything, mock.Anything).Return(any, nil) req := req req.Duration = time.Hour got := m.OnCanCommit(ctx, &req) want := &CanCommitResponse{OpCreate, req.ID, _TimeoutShardCommit, ""} assert.Equal(t, got, want) err := m.OnCommit(ctx, &StatusRequest{OpCreate, req.ID, backendName}) assert.Nil(t, err) m.backupper.waitForCompletion(20, 50) errMsg := context.Canceled.Error() assert.Equal(t, string(backup.Transferring), backend.meta.Status) assert.Equal(t, errMsg, backend.meta.Error) assert.Contains(t, m.backupper.lastAsyncError.Error(), errMsg) }) t.Run("ExpirationTimeout", func(t *testing.T) { var ( sourcePath = t.TempDir() sourcer = &fakeSourcer{} backend = newFakeBackend() ) sourcer.On("Backupable", ctx, req.Classes).Return(nil) ch := fakeBackupDescriptor(genClassDescriptions(t, sourcePath, cls, cls2)...) sourcer.On("BackupDescriptors", any, backupID, mock.Anything).Return(ch) sourcer.On("ReleaseBackup", ctx, backupID, mock.Anything).Return(nil) backend.On("HomeDir", mock.Anything).Return(path) backend.On("SourceDataPath").Return(sourcePath) backend.On("GetObject", ctx, nodeHome, BackupFile).Return(nil, errNotFound) backend.On("Initialize", ctx, nodeHome).Return(nil) backend.On("PutObject", mock.Anything, nodeHome, BackupFile, mock.Anything).Return(nil).Once() backend.On("Write", mock.Anything, backupID, mock.Anything, mock.Anything).Return(any, nil) m := createManager(sourcer, nil, backend, nil) req := req req.Duration = time.Millisecond * 10 got := m.OnCanCommit(ctx, &req) want := &CanCommitResponse{OpCreate, req.ID, req.Duration, ""} assert.Equal(t, got, want) m.backupper.waitForCompletion(20, 50) assert.Contains(t, m.backupper.lastAsyncError.Error(), "timed out") }) } func genClassDescriptions(t *testing.T, sourcePath string, classes ...string) []backup.ClassDescriptor { ret := make([]backup.ClassDescriptor, len(classes)) rawbytes := []byte("raw") subDir := filepath.Join(sourcePath, "dir1") if err := os.MkdirAll(subDir, os.ModePerm); err != nil { t.Fatalf("create test subdirectory %s: %v", subDir, err) } files := []string{"dir1/file1", "dir1/file2", "counter.txt", "version.txt", "prop.txt"} for _, p := range files { p = filepath.Join(sourcePath, p) if err := os.WriteFile(p, rawbytes, os.ModePerm); err != nil { t.Fatalf("create test file %s: %v", p, err) } } for i, cls := range classes { ret[i] = backup.ClassDescriptor{ Name: cls, Schema: rawbytes, ShardingState: rawbytes, Shards: []*backup.ShardDescriptor{ { Name: "Shard1", Node: "Node-1", Files: files[0:2], DocIDCounterPath: files[2], ShardVersionPath: files[3], PropLengthTrackerPath: files[4], DocIDCounter: rawbytes, Version: rawbytes, PropLengthTracker: rawbytes, }, }, } } return ret } func fakeBackupDescriptor(descs ...backup.ClassDescriptor) <-chan backup.ClassDescriptor { ch := make(chan backup.ClassDescriptor, len(descs)) go func() { for _, cls := range descs { ch <- cls } close(ch) }() return ch } func createManager(sourcer Sourcer, schema schemaManger, backend modulecapabilities.BackupBackend, backendErr error) *Handler { backends := &fakeBackupBackendProvider{backend, backendErr} if sourcer == nil { sourcer = &fakeSourcer{} } if schema == nil { schema = &fakeSchemaManger{nodeName: nodeName} } logger, _ := test.NewNullLogger() return NewHandler(logger, &fakeAuthorizer{}, schema, sourcer, backends) }