Spaces:
Running
Running
// _ _ | |
// __ _____ __ ___ ___ __ _| |_ ___ | |
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
// \ V V / __/ (_| |\ V /| | (_| | || __/ | |
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
// | |
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. | |
// | |
// CONTACT: [email protected] | |
// | |
package backup | |
import ( | |
"context" | |
"encoding/json" | |
"errors" | |
"strings" | |
"testing" | |
"time" | |
"github.com/stretchr/testify/assert" | |
"github.com/stretchr/testify/mock" | |
"github.com/weaviate/weaviate/entities/backup" | |
"github.com/weaviate/weaviate/entities/models" | |
) | |
// ErrAny represent a random error | |
var ( | |
ErrAny = errors.New("any error") | |
any = mock.Anything | |
) | |
func (r *restorer) Restore(ctx context.Context, | |
req *Request, | |
desc *backup.BackupDescriptor, | |
store nodeStore, | |
) (*models.BackupRestoreResponse, error) { | |
status := string(backup.Started) | |
returnData := &models.BackupRestoreResponse{ | |
Classes: req.Classes, | |
ID: req.ID, | |
Backend: req.Backend, | |
Status: &status, | |
Path: store.HomeDir(), | |
} | |
if _, err := r.restore(ctx, req, desc, store); err != nil { | |
return nil, err | |
} | |
return returnData, nil | |
} | |
func (r *restorer) waitForCompletion(backend, id string, n, ms int) Status { | |
for i := 0; i < n; i++ { | |
delay := time.Millisecond * time.Duration(ms) | |
time.Sleep(delay) | |
status, err := r.status(backend, id) | |
if err != nil { | |
continue | |
} | |
if status.Status == backup.Success || status.Status == backup.Failed { | |
return status | |
} | |
} | |
return Status{} | |
} | |
func TestRestoreStatus(t *testing.T) { | |
t.Parallel() | |
var ( | |
backendType = "s3" | |
id = "1234" | |
m = createManager(nil, nil, nil, nil) | |
starTime = time.Now().UTC() | |
nodeHome = id + "/" + nodeName | |
path = "bucket/backups/" + nodeHome | |
) | |
// initial state | |
_, err := m.restorer.status(backendType, id) | |
if err == nil || !strings.Contains(err.Error(), "not found") { | |
t.Errorf("must return an error if backup doesn't exist") | |
} | |
// active state | |
m.restorer.lastOp.reqStat = reqStat{ | |
Starttime: starTime, | |
ID: id, | |
Status: backup.Transferring, | |
Path: path, | |
} | |
st, err := m.restorer.status(backendType, id) | |
if err != nil { | |
t.Errorf("get active status: %v", err) | |
} | |
expected := Status{Path: path, StartedAt: starTime, Status: backup.Transferring} | |
if expected != st { | |
t.Errorf("get active status: got=%v want=%v", st, expected) | |
} | |
// cached status | |
m.restorer.lastOp.reset() | |
st.CompletedAt = starTime | |
m.restorer.restoreStatusMap.Store("s3/"+id, st) | |
st, err = m.restorer.status(backendType, id) | |
if err != nil { | |
t.Errorf("fetch status from map: %v", err) | |
} | |
expected.CompletedAt = starTime | |
if expected != st { | |
t.Errorf("fetch status from map got=%v want=%v", st, expected) | |
} | |
} | |
func TestRestoreRequestValidation(t *testing.T) { | |
var ( | |
cls = "MyClass" | |
backendName = "s3" | |
rawbytes = []byte("hello") | |
id = "1234" | |
timept = time.Now().UTC() | |
ctx = context.Background() | |
nodeHome = id + "/" + nodeName | |
path = "bucket/backups/" + nodeHome | |
req = &BackupRequest{ | |
Backend: backendName, | |
ID: id, | |
Include: []string{cls}, | |
Exclude: []string{}, | |
} | |
) | |
meta := backup.BackupDescriptor{ | |
ID: id, | |
StartedAt: timept, | |
Version: "1", | |
ServerVersion: "1", | |
Status: string(backup.Success), | |
Classes: []backup.ClassDescriptor{{ | |
Name: cls, Schema: rawbytes, ShardingState: rawbytes, | |
}}, | |
} | |
t.Run("BackendFailure", func(t *testing.T) { // backend provider fails | |
backend := newFakeBackend() | |
m2 := createManager(nil, nil, backend, ErrAny) | |
_, err := m2.Restore(ctx, nil, &BackupRequest{ | |
Backend: backendName, | |
ID: id, | |
Include: []string{cls}, | |
Exclude: []string{}, | |
}) | |
assert.NotNil(t, err) | |
assert.Contains(t, err.Error(), backendName) | |
}) | |
t.Run("GetMetadataFile", func(t *testing.T) { | |
backend := newFakeBackend() | |
backend.On("GetObject", ctx, nodeHome, BackupFile).Return(nil, ErrAny) | |
backend.On("GetObject", ctx, req.ID, BackupFile).Return(nil, ErrAny) | |
backend.On("HomeDir", mock.Anything).Return(path) | |
m2 := createManager(nil, nil, backend, nil) | |
_, err := m2.Restore(ctx, nil, req) | |
if err == nil || !strings.Contains(err.Error(), "find") { | |
t.Errorf("must return an error if it fails to get meta data: %v", err) | |
} | |
// meta data not found | |
backend = newFakeBackend() | |
backend.On("HomeDir", mock.Anything).Return(path) | |
backend.On("GetObject", ctx, nodeHome, BackupFile).Return(nil, backup.ErrNotFound{}) | |
backend.On("GetObject", ctx, req.ID, BackupFile).Return(nil, backup.ErrNotFound{}) | |
m3 := createManager(nil, nil, backend, nil) | |
_, err = m3.Restore(ctx, nil, req) | |
if _, ok := err.(backup.ErrNotFound); !ok { | |
t.Errorf("must return an error if meta data doesn't exist: %v", err) | |
} | |
}) | |
t.Run("FailedBackup", func(t *testing.T) { | |
backend := newFakeBackend() | |
bytes := marshalMeta(backup.BackupDescriptor{ID: id, Status: string(backup.Failed)}) | |
backend.On("GetObject", ctx, nodeHome, BackupFile).Return(bytes, nil) | |
backend.On("HomeDir", mock.Anything).Return(path) | |
m2 := createManager(nil, nil, backend, nil) | |
_, err := m2.Restore(ctx, nil, req) | |
assert.NotNil(t, err) | |
assert.Contains(t, err.Error(), backup.Failed) | |
assert.IsType(t, backup.ErrUnprocessable{}, err) | |
}) | |
t.Run("BackupWithHigherVersion", func(t *testing.T) { | |
var ( | |
backend = newFakeBackend() | |
meta = backup.BackupDescriptor{ | |
ID: id, | |
StartedAt: timept, | |
Version: "3.0", | |
ServerVersion: "1", | |
Status: string(backup.Success), | |
Classes: []backup.ClassDescriptor{{ | |
Name: cls, Schema: rawbytes, ShardingState: rawbytes, | |
}}, | |
} | |
bytes = marshalMeta(meta) | |
) | |
backend.On("GetObject", ctx, nodeHome, BackupFile).Return(bytes, nil) | |
backend.On("HomeDir", mock.Anything).Return(path) | |
m2 := createManager(nil, nil, backend, nil) | |
_, err := m2.Restore(ctx, nil, req) | |
assert.NotNil(t, err) | |
assert.Contains(t, err.Error(), errMsgHigherVersion) | |
assert.IsType(t, backup.ErrUnprocessable{}, err) | |
}) | |
t.Run("FailedOldBackup", func(t *testing.T) { | |
backend := newFakeBackend() | |
bytes := marshalMeta(backup.BackupDescriptor{ID: id, Status: string(backup.Failed)}) | |
backend.On("GetObject", ctx, nodeHome, BackupFile).Return(bytes, ErrAny) | |
backend.On("GetObject", ctx, id, BackupFile).Return(bytes, nil) | |
backend.On("HomeDir", mock.Anything).Return(path) | |
m2 := createManager(nil, nil, backend, nil) | |
_, err := m2.Restore(ctx, nil, req) | |
assert.NotNil(t, err) | |
assert.Contains(t, err.Error(), backup.Failed) | |
assert.IsType(t, backup.ErrUnprocessable{}, err) | |
}) | |
t.Run("CorruptedBackupFile", func(t *testing.T) { | |
backend := newFakeBackend() | |
bytes := marshalMeta(backup.BackupDescriptor{ID: id, Status: string(backup.Success)}) | |
backend.On("GetObject", ctx, nodeHome, BackupFile).Return(bytes, nil) | |
backend.On("HomeDir", mock.Anything).Return(path) | |
m2 := createManager(nil, nil, backend, nil) | |
_, err := m2.Restore(ctx, nil, req) | |
assert.NotNil(t, err) | |
assert.IsType(t, backup.ErrUnprocessable{}, err) | |
assert.Contains(t, err.Error(), "corrupted") | |
}) | |
t.Run("WrongBackupFile", func(t *testing.T) { | |
backend := newFakeBackend() | |
bytes := marshalMeta(backup.BackupDescriptor{ID: "123", Status: string(backup.Success)}) | |
backend.On("GetObject", ctx, nodeHome, BackupFile).Return(bytes, nil) | |
backend.On("HomeDir", mock.Anything).Return(path) | |
m2 := createManager(nil, nil, backend, nil) | |
_, err := m2.Restore(ctx, nil, req) | |
assert.NotNil(t, err) | |
assert.IsType(t, backup.ErrUnprocessable{}, err) | |
assert.Contains(t, err.Error(), "wrong backup file") | |
}) | |
t.Run("UnknownClass", func(t *testing.T) { | |
backend := newFakeBackend() | |
bytes := marshalMeta(meta) | |
backend.On("GetObject", ctx, nodeHome, BackupFile).Return(bytes, nil) | |
backend.On("HomeDir", mock.Anything).Return(path) | |
m2 := createManager(nil, nil, backend, nil) | |
_, err := m2.Restore(ctx, nil, &BackupRequest{ID: id, Include: []string{"unknown"}}) | |
assert.NotNil(t, err) | |
assert.Contains(t, err.Error(), "unknown") | |
}) | |
t.Run("EmptyResultClassList", func(t *testing.T) { // backup was successful but class list is empty | |
backend := newFakeBackend() | |
bytes := marshalMeta(meta) | |
backend.On("GetObject", ctx, nodeHome, BackupFile).Return(bytes, nil) | |
backend.On("HomeDir", mock.Anything).Return(path) | |
m2 := createManager(nil, nil, backend, nil) | |
_, err := m2.Restore(ctx, nil, &BackupRequest{ID: id, Exclude: []string{cls}}) | |
assert.NotNil(t, err) | |
assert.Contains(t, err.Error(), "empty") | |
}) | |
} | |
func TestManagerRestoreBackup(t *testing.T) { | |
var ( | |
cls = "DemoClass" | |
backendName = "gcs" | |
backupID = "1" | |
rawbytes = []byte("hello") | |
timept = time.Now().UTC() | |
ctx = context.Background() | |
nodeHome = backupID + "/" + nodeName | |
path = "bucket/backups/" + nodeHome | |
) | |
meta2 := backup.BackupDescriptor{ | |
ID: backupID, | |
StartedAt: timept, | |
Version: Version, | |
ServerVersion: "1", | |
Status: string(backup.Success), | |
Classes: []backup.ClassDescriptor{{ | |
Name: cls, Schema: rawbytes, ShardingState: rawbytes, | |
Chunks: map[int32][]string{1: {"Shard1"}}, | |
Shards: []*backup.ShardDescriptor{ | |
{ | |
Name: "Shard1", Node: "Node-1", | |
Chunk: 1, | |
}, | |
}, | |
}}, | |
} | |
meta1 := backup.BackupDescriptor{ | |
ID: backupID, | |
StartedAt: timept, | |
Version: version1, | |
ServerVersion: "1", | |
Status: string(backup.Success), | |
Classes: []backup.ClassDescriptor{{ | |
Name: cls, Schema: rawbytes, ShardingState: rawbytes, | |
Chunks: map[int32][]string{1: {"Shard1"}}, | |
Shards: []*backup.ShardDescriptor{ | |
{ | |
Name: "Shard1", Node: "Node-1", | |
Files: []string{"dir1/file1", "dir2/file2"}, | |
DocIDCounterPath: "counter.txt", | |
ShardVersionPath: "version.txt", | |
PropLengthTrackerPath: "prop.txt", | |
DocIDCounter: rawbytes, | |
Version: rawbytes, | |
PropLengthTracker: rawbytes, | |
Chunk: 1, | |
}, | |
}, | |
}}, | |
} | |
t.Run("ClassAlreadyExists", func(t *testing.T) { | |
var ( | |
req1 = BackupRequest{ | |
ID: backupID, | |
Include: []string{cls}, | |
Backend: backendName, | |
} | |
backend = newFakeBackend() | |
sourcer = &fakeSourcer{} | |
dataPath = t.TempDir() | |
) | |
sourcer.On("ClassExists", cls).Return(true) | |
bytes := marshalMeta(meta2) | |
backend.On("GetObject", ctx, nodeHome, BackupFile).Return(bytes, nil) | |
backend.On("HomeDir", mock.Anything).Return(path) | |
backend.On("SourceDataPath").Return(dataPath) | |
backend.On("Read", any, nodeHome, mock.Anything, mock.Anything).Return(any, nil) | |
m := createManager(sourcer, nil, backend, nil) | |
resp1, err := m.Restore(ctx, nil, &req1) | |
assert.Nil(t, err) | |
status1 := string(backup.Started) | |
want1 := &models.BackupRestoreResponse{ | |
Backend: backendName, | |
Classes: req1.Include, | |
ID: backupID, | |
Status: &status1, | |
Path: path, | |
} | |
assert.Equal(t, resp1, want1) | |
lastStatus := m.restorer.waitForCompletion(req1.Backend, req1.ID, 10, 50) | |
assert.Nil(t, err) | |
assert.Equal(t, backup.Failed, lastStatus.Status) | |
}) | |
t.Run("AnotherBackupIsInProgress", func(t *testing.T) { | |
req1 := BackupRequest{ | |
ID: backupID, | |
Include: []string{cls}, | |
Backend: backendName, | |
} | |
backend := newFakeBackend() | |
sourcer := &fakeSourcer{} | |
sourcer.On("ClassExists", cls).Return(false) | |
bytes := marshalMeta(meta2) | |
backend.On("GetObject", ctx, nodeHome, BackupFile).Return(bytes, nil) | |
backend.On("HomeDir", mock.Anything).Return(path) | |
// simulate work by delaying return of SourceDataPath() | |
backend.On("SourceDataPath").Return(t.TempDir()).After(time.Hour) | |
m2 := createManager(sourcer, nil, backend, nil) | |
_, err := m2.Restore(ctx, nil, &BackupRequest{ID: backupID}) | |
assert.Nil(t, err) | |
m := createManager(sourcer, nil, backend, nil) | |
resp1, err := m.Restore(ctx, nil, &req1) | |
assert.Nil(t, err) | |
status1 := string(backup.Started) | |
want1 := &models.BackupRestoreResponse{ | |
Backend: backendName, | |
Classes: req1.Include, | |
ID: backupID, | |
Status: &status1, | |
Path: path, | |
} | |
assert.Equal(t, resp1, want1) | |
// another caller | |
resp2, err := m.Restore(ctx, nil, &req1) | |
assert.NotNil(t, err) | |
assert.Contains(t, err.Error(), "already in progress") | |
assert.IsType(t, backup.ErrUnprocessable{}, err) | |
assert.Nil(t, resp2) | |
}) | |
t.Run("Success", func(t *testing.T) { | |
var ( | |
req1 = BackupRequest{ | |
ID: backupID, | |
Include: []string{cls}, | |
Backend: backendName, | |
} | |
backend = newFakeBackend() | |
sourcer = &fakeSourcer{} | |
dataPath = t.TempDir() | |
) | |
sourcer.On("ClassExists", cls).Return(false) | |
bytes := marshalMeta(meta2) | |
backend.On("GetObject", ctx, nodeHome, BackupFile).Return(bytes, nil) | |
backend.On("HomeDir", mock.Anything).Return(path) | |
backend.On("SourceDataPath").Return(dataPath) | |
backend.On("Read", any, nodeHome, mock.Anything, mock.Anything).Return(any, nil) | |
m := createManager(sourcer, nil, backend, nil) | |
resp1, err := m.Restore(ctx, nil, &req1) | |
assert.Nil(t, err) | |
status1 := string(backup.Started) | |
want1 := &models.BackupRestoreResponse{ | |
Backend: backendName, | |
Classes: req1.Include, | |
ID: backupID, | |
Status: &status1, | |
Path: path, | |
} | |
assert.Equal(t, resp1, want1) | |
assert.Nil(t, err) | |
lastStatus := m.restorer.waitForCompletion(req1.Backend, req1.ID, 10, 50) | |
assert.Equal(t, backup.Success, lastStatus.Status) | |
}) | |
readSourceFile := func(t *testing.T, newVerion bool) { | |
req1 := BackupRequest{ | |
ID: backupID, | |
Include: []string{cls}, | |
Backend: backendName, | |
} | |
backend := newFakeBackend() | |
sourcer := &fakeSourcer{} | |
sourcer.On("ClassExists", cls).Return(false) | |
var bytes []byte | |
if newVerion { | |
bytes = marshalMeta(meta2) | |
backend.On("Read", any, nodeHome, mock.Anything, mock.Anything).Return(any, ErrAny) | |
} else { | |
bytes = marshalMeta(meta1) | |
backend.On("WriteToFile", any, nodeHome, mock.Anything, mock.Anything).Return(ErrAny) | |
} | |
backend.On("GetObject", ctx, nodeHome, BackupFile).Return(bytes, nil) | |
backend.On("HomeDir", mock.Anything).Return(path) | |
backend.On("SourceDataPath").Return(t.TempDir()) | |
m := createManager(sourcer, nil, backend, nil) | |
resp1, err := m.Restore(ctx, nil, &req1) | |
assert.Nil(t, err) | |
status1 := string(backup.Started) | |
want1 := &models.BackupRestoreResponse{ | |
Backend: backendName, | |
Classes: req1.Include, | |
ID: backupID, | |
Status: &status1, | |
Path: path, | |
} | |
assert.Equal(t, resp1, want1) | |
lastStatus := m.restorer.waitForCompletion(req1.Backend, req1.ID, 10, 50) | |
assert.Nil(t, err) | |
assert.Equal(t, backup.Failed, lastStatus.Status) | |
// | |
} | |
t.Run("ReadSourceFile", func(t *testing.T) { | |
readSourceFile(t, true) | |
}) | |
t.Run("ReadSourceFileV1", func(t *testing.T) { | |
readSourceFile(t, false) | |
}) | |
t.Run("RestoreClassFails", func(t *testing.T) { | |
req1 := BackupRequest{ | |
ID: backupID, | |
Include: []string{cls}, | |
Backend: backendName, | |
} | |
backend := newFakeBackend() | |
sourcer := &fakeSourcer{} | |
schema := fakeSchemaManger{errRestoreClass: ErrAny, nodeName: nodeName} | |
sourcer.On("ClassExists", cls).Return(false) | |
bytes := marshalMeta(meta2) | |
backend.On("GetObject", ctx, nodeHome, BackupFile).Return(bytes, nil) | |
backend.On("HomeDir", mock.Anything).Return(path) | |
backend.On("SourceDataPath").Return(t.TempDir()) | |
backend.On("Read", any, nodeHome, mock.Anything, mock.Anything).Return(any, nil) | |
m := createManager(sourcer, &schema, backend, nil) | |
resp1, err := m.Restore(ctx, nil, &req1) | |
assert.Nil(t, err) | |
status1 := string(backup.Started) | |
want1 := &models.BackupRestoreResponse{ | |
Backend: backendName, | |
Classes: req1.Include, | |
ID: backupID, | |
Status: &status1, | |
Path: path, | |
} | |
assert.Equal(t, resp1, want1) | |
lastStatus := m.restorer.waitForCompletion(req1.Backend, req1.ID, 10, 50) | |
assert.Nil(t, err) | |
assert.Equal(t, lastStatus.Status, backup.Failed) | |
}) | |
} | |
func TestManagerCoordinatedRestore(t *testing.T) { | |
var ( | |
backendName = "gcs" | |
rawbytes = []byte("hello") | |
timept = time.Now().UTC() | |
cls = "Class-A" | |
backupID = "2" | |
ctx = context.Background() | |
nodeHome = backupID + "/" + nodeName | |
path = "bucket/backups/" + nodeHome | |
req = Request{ | |
Method: OpRestore, | |
ID: backupID, | |
Classes: []string{cls}, | |
Backend: backendName, | |
Duration: time.Millisecond * 20, | |
} | |
) | |
metadata := backup.BackupDescriptor{ | |
ID: backupID, | |
StartedAt: timept, | |
Version: "1", | |
ServerVersion: "1", | |
Status: string(backup.Success), | |
Classes: []backup.ClassDescriptor{{ | |
Name: cls, Schema: rawbytes, ShardingState: rawbytes, | |
Shards: []*backup.ShardDescriptor{ | |
{ | |
Name: "Shard1", Node: "Node-1", | |
Files: []string{"dir1/file1", "dir2/file2"}, | |
DocIDCounterPath: "counter.txt", | |
ShardVersionPath: "version.txt", | |
PropLengthTrackerPath: "prop.txt", | |
DocIDCounter: rawbytes, | |
Version: rawbytes, | |
PropLengthTracker: rawbytes, | |
}, | |
}, | |
}}, | |
} | |
t.Run("GetMetadataFile", func(t *testing.T) { | |
backend := newFakeBackend() | |
backend.On("GetObject", ctx, nodeHome, BackupFile).Return(nil, backup.ErrNotFound{}) | |
backend.On("GetObject", ctx, backupID, BackupFile).Return(nil, backup.ErrNotFound{}) | |
backend.On("HomeDir", mock.Anything).Return(path) | |
bm := createManager(nil, nil, backend, nil) | |
resp := bm.OnCanCommit(ctx, &req) | |
assert.Contains(t, resp.Err, errMetaNotFound.Error()) | |
assert.Equal(t, resp.Timeout, time.Duration(0)) | |
}) | |
t.Run("AnotherBackupIsInProgress", func(t *testing.T) { | |
backend := newFakeBackend() | |
sourcer := &fakeSourcer{} | |
sourcer.On("ClassExists", cls).Return(false) | |
bytes := marshalMeta(metadata) | |
backend.On("GetObject", ctx, nodeHome, BackupFile).Return(bytes, nil) | |
backend.On("HomeDir", mock.Anything).Return(path) | |
// simulate work by delaying return of SourceDataPath() | |
backend.On("SourceDataPath").Return(t.TempDir()).After(time.Minute * 2) | |
m := createManager(sourcer, nil, backend, nil) | |
resp := m.OnCanCommit(ctx, &req) | |
assert.Equal(t, resp.Err, "") | |
resp = m.OnCanCommit(ctx, &req) | |
assert.Contains(t, resp.Err, "already in progress") | |
assert.Equal(t, time.Duration(0), resp.Timeout) | |
}) | |
t.Run("Success", func(t *testing.T) { | |
req := req | |
req.Duration = time.Hour | |
backend := newFakeBackend() | |
sourcer := &fakeSourcer{} | |
sourcer.On("ClassExists", cls).Return(false) | |
bytes := marshalMeta(metadata) | |
backend.On("GetObject", ctx, nodeHome, BackupFile).Return(bytes, nil) | |
backend.On("HomeDir", mock.Anything).Return(path) | |
backend.On("SourceDataPath").Return(t.TempDir()) | |
backend.On("WriteToFile", any, nodeHome, mock.Anything, mock.Anything).Return(nil) | |
m := createManager(sourcer, nil, backend, nil) | |
resp1 := m.OnCanCommit(ctx, &req) | |
want1 := &CanCommitResponse{ | |
Method: OpRestore, | |
ID: req.ID, | |
Timeout: _TimeoutShardCommit, | |
} | |
assert.Equal(t, want1, resp1) | |
err := m.OnCommit(ctx, &StatusRequest{Method: OpRestore, ID: req.ID, Backend: req.Backend}) | |
assert.Nil(t, err) | |
lastStatus := m.restorer.waitForCompletion(req.Backend, req.ID, 10, 50) | |
assert.Nil(t, err) | |
assert.Equal(t, lastStatus.Status, backup.Success) | |
}) | |
t.Run("Abort", func(t *testing.T) { | |
req := req | |
req.Duration = time.Hour | |
backend := newFakeBackend() | |
sourcer := &fakeSourcer{} | |
sourcer.On("ClassExists", cls).Return(false) | |
bytes := marshalMeta(metadata) | |
backend.On("GetObject", ctx, nodeHome, BackupFile).Return(bytes, nil) | |
backend.On("HomeDir", mock.Anything).Return(path) | |
backend.On("SourceDataPath").Return(t.TempDir()) | |
backend.On("WriteToFile", any, nodeHome, mock.Anything, mock.Anything).Return(nil) | |
m := createManager(sourcer, nil, backend, nil) | |
resp1 := m.OnCanCommit(ctx, &req) | |
want1 := &CanCommitResponse{ | |
Method: OpRestore, | |
ID: req.ID, | |
Timeout: _TimeoutShardCommit, | |
} | |
assert.Equal(t, want1, resp1) | |
err := m.OnAbort(ctx, &AbortRequest{Method: OpRestore, ID: req.ID}) | |
assert.Nil(t, err) | |
lastStatus := m.restorer.waitForCompletion(req.Backend, req.ID, 10, 50) | |
assert.Nil(t, err) | |
assert.Equal(t, lastStatus.Status, backup.Failed) | |
}) | |
} | |
func TestRestoreOnStatus(t *testing.T) { | |
t.Parallel() | |
var ( | |
backendType = "s3" | |
id = "1234" | |
m = createManager(nil, nil, nil, nil) | |
ctx = context.Background() | |
starTime = time.Now().UTC() | |
nodeHome = id + "/" + nodeName | |
path = "bucket/backups/" + nodeHome | |
req = StatusRequest{ | |
Method: OpRestore, | |
ID: id, | |
Backend: backendType, | |
} | |
) | |
// initial state | |
got := m.OnStatus(ctx, &req) | |
if !strings.Contains(got.Err, "not found") { | |
t.Errorf("must return an error if backup doesn't exist") | |
} | |
// active state | |
m.restorer.lastOp.reqStat = reqStat{ | |
Starttime: starTime, | |
ID: id, | |
Status: backup.Transferring, | |
Path: path, | |
} | |
got = m.OnStatus(ctx, &req) | |
expected := StatusResponse{Method: OpRestore, ID: req.ID, Status: backup.Transferring} | |
if expected != *got { | |
t.Errorf("get active status: got=%v want=%v", got, expected) | |
} | |
// cached status | |
m.restorer.lastOp.reset() | |
st := Status{Path: path, StartedAt: starTime, Status: backup.Transferring, CompletedAt: starTime} | |
m.restorer.restoreStatusMap.Store("s3/"+id, st) | |
got = m.OnStatus(ctx, &req) | |
if expected != *got { | |
t.Errorf("fetch status from map got=%v want=%v", st, expected) | |
} | |
} | |
func marshalMeta(m backup.BackupDescriptor) []byte { | |
bytes, _ := json.MarshalIndent(m, "", "") | |
return bytes | |
} | |