Spaces:
Running
Running
// _ _ | |
// __ _____ __ ___ ___ __ _| |_ ___ | |
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
// \ V V / __/ (_| |\ V /| | (_| | || __/ | |
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
// | |
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. | |
// | |
// CONTACT: [email protected] | |
// | |
package backup | |
import ( | |
"context" | |
"errors" | |
"fmt" | |
"time" | |
"github.com/sirupsen/logrus" | |
"github.com/weaviate/weaviate/entities/backup" | |
"github.com/weaviate/weaviate/entities/models" | |
) | |
var ( | |
errLocalBackendDBRO = errors.New("local filesystem backend is not viable for backing up a node cluster, try s3 or gcs") | |
errIncludeExclude = errors.New("malformed request: 'include' and 'exclude' cannot both contain values") | |
) | |
const ( | |
errMsgHigherVersion = "unable to restore backup as it was produced by a higher version" | |
) | |
// Scheduler assigns backup operations to coordinators. | |
type Scheduler struct { | |
// deps | |
logger logrus.FieldLogger | |
authorizer authorizer | |
backupper *coordinator | |
restorer *coordinator | |
backends BackupBackendProvider | |
} | |
// NewScheduler creates a new scheduler with two coordinators | |
func NewScheduler( | |
authorizer authorizer, | |
client client, | |
sourcer selector, | |
backends BackupBackendProvider, | |
nodeResolver nodeResolver, | |
logger logrus.FieldLogger, | |
) *Scheduler { | |
m := &Scheduler{ | |
logger: logger, | |
authorizer: authorizer, | |
backends: backends, | |
backupper: newCoordinator( | |
sourcer, | |
client, | |
logger, nodeResolver), | |
restorer: newCoordinator( | |
sourcer, | |
client, | |
logger, nodeResolver), | |
} | |
return m | |
} | |
func (s *Scheduler) Backup(ctx context.Context, pr *models.Principal, req *BackupRequest, | |
) (_ *models.BackupCreateResponse, err error) { | |
defer func(begin time.Time) { | |
logOperation(s.logger, "try_backup", req.ID, req.Backend, begin, err) | |
}(time.Now()) | |
path := fmt.Sprintf("backups/%s/%s", req.Backend, req.ID) | |
if err := s.authorizer.Authorize(pr, "add", path); err != nil { | |
return nil, err | |
} | |
store, err := coordBackend(s.backends, req.Backend, req.ID) | |
if err != nil { | |
err = fmt.Errorf("no backup backend %q: %w, did you enable the right module?", req.Backend, err) | |
return nil, backup.NewErrUnprocessable(err) | |
} | |
classes, err := s.validateBackupRequest(ctx, store, req) | |
if err != nil { | |
return nil, backup.NewErrUnprocessable(err) | |
} | |
if err := store.Initialize(ctx); err != nil { | |
return nil, backup.NewErrUnprocessable(fmt.Errorf("init uploader: %w", err)) | |
} | |
breq := Request{ | |
Method: OpCreate, | |
ID: req.ID, | |
Backend: req.Backend, | |
Classes: classes, | |
Compression: req.Compression, | |
} | |
if err := s.backupper.Backup(ctx, store, &breq); err != nil { | |
return nil, backup.NewErrUnprocessable(err) | |
} else { | |
st := s.backupper.lastOp.get() | |
status := string(st.Status) | |
return &models.BackupCreateResponse{ | |
Classes: classes, | |
ID: req.ID, | |
Backend: req.Backend, | |
Status: &status, | |
Path: st.Path, | |
}, nil | |
} | |
} | |
func (s *Scheduler) Restore(ctx context.Context, pr *models.Principal, | |
req *BackupRequest, | |
) (_ *models.BackupRestoreResponse, err error) { | |
defer func(begin time.Time) { | |
logOperation(s.logger, "try_restore", req.ID, req.Backend, begin, err) | |
}(time.Now()) | |
path := fmt.Sprintf("backups/%s/%s/restore", req.Backend, req.ID) | |
if err := s.authorizer.Authorize(pr, "restore", path); err != nil { | |
return nil, err | |
} | |
store, err := coordBackend(s.backends, req.Backend, req.ID) | |
if err != nil { | |
err = fmt.Errorf("no backup backend %q: %w, did you enable the right module?", req.Backend, err) | |
return nil, backup.NewErrUnprocessable(err) | |
} | |
meta, err := s.validateRestoreRequest(ctx, store, req) | |
if err != nil { | |
if errors.Is(err, errMetaNotFound) { | |
return nil, backup.NewErrNotFound(err) | |
} | |
return nil, backup.NewErrUnprocessable(err) | |
} | |
status := string(backup.Started) | |
data := &models.BackupRestoreResponse{ | |
Backend: req.Backend, | |
ID: req.ID, | |
Path: store.HomeDir(), | |
Classes: meta.Classes(), | |
} | |
rReq := Request{ | |
Method: OpRestore, | |
ID: req.ID, | |
Backend: req.Backend, | |
Compression: req.Compression, | |
} | |
err = s.restorer.Restore(ctx, store, &rReq, meta) | |
if err != nil { | |
status = string(backup.Failed) | |
data.Error = err.Error() | |
return nil, backup.NewErrUnprocessable(err) | |
} | |
data.Status = &status | |
return data, nil | |
} | |
func (s *Scheduler) BackupStatus(ctx context.Context, principal *models.Principal, | |
backend, backupID string, | |
) (_ *Status, err error) { | |
defer func(begin time.Time) { | |
logOperation(s.logger, "backup_status", backupID, backend, begin, err) | |
}(time.Now()) | |
path := fmt.Sprintf("backups/%s/%s", backend, backupID) | |
if err := s.authorizer.Authorize(principal, "get", path); err != nil { | |
return nil, err | |
} | |
store, err := coordBackend(s.backends, backend, backupID) | |
if err != nil { | |
err = fmt.Errorf("no backup provider %q: %w, did you enable the right module?", backend, err) | |
return nil, backup.NewErrUnprocessable(err) | |
} | |
req := &StatusRequest{OpCreate, backupID, backend} | |
st, err := s.backupper.OnStatus(ctx, store, req) | |
if err != nil { | |
return nil, backup.NewErrNotFound(err) | |
} | |
return st, nil | |
} | |
func (s *Scheduler) RestorationStatus(ctx context.Context, principal *models.Principal, backend, backupID string, | |
) (_ *Status, err error) { | |
defer func(begin time.Time) { | |
logOperation(s.logger, "restoration_status", backupID, backend, time.Now(), err) | |
}(time.Now()) | |
path := fmt.Sprintf("backups/%s/%s/restore", backend, backupID) | |
if err := s.authorizer.Authorize(principal, "get", path); err != nil { | |
return nil, err | |
} | |
store, err := coordBackend(s.backends, backend, backupID) | |
if err != nil { | |
err = fmt.Errorf("no backup provider %q: %w, did you enable the right module?", backend, err) | |
return nil, backup.NewErrUnprocessable(err) | |
} | |
req := &StatusRequest{OpRestore, backupID, backend} | |
st, err := s.restorer.OnStatus(ctx, store, req) | |
if err != nil { | |
return nil, backup.NewErrNotFound(err) | |
} | |
return st, nil | |
} | |
func coordBackend(provider BackupBackendProvider, backend, id string) (coordStore, error) { | |
caps, err := provider.BackupBackend(backend) | |
if err != nil { | |
return coordStore{}, err | |
} | |
return coordStore{objStore{b: caps, BasePath: id}}, nil | |
} | |
func (s *Scheduler) validateBackupRequest(ctx context.Context, store coordStore, req *BackupRequest) ([]string, error) { | |
if !store.b.IsExternal() && s.backupper.nodeResolver.NodeCount() > 1 { | |
return nil, errLocalBackendDBRO | |
} | |
if err := validateID(req.ID); err != nil { | |
return nil, err | |
} | |
if len(req.Include) > 0 && len(req.Exclude) > 0 { | |
return nil, errIncludeExclude | |
} | |
if dup := findDuplicate(req.Include); dup != "" { | |
return nil, fmt.Errorf("class list 'include' contains duplicate: %s", dup) | |
} | |
classes := req.Include | |
if len(classes) == 0 { | |
classes = s.backupper.selector.ListClasses(ctx) | |
// no classes exist in the DB | |
if len(classes) == 0 { | |
return nil, fmt.Errorf("no available classes to backup, there's nothing to do here") | |
} | |
} | |
if classes = filterClasses(classes, req.Exclude); len(classes) == 0 { | |
return nil, fmt.Errorf("empty class list: please choose from : %v", classes) | |
} | |
if err := s.backupper.selector.Backupable(ctx, classes); err != nil { | |
return nil, err | |
} | |
destPath := store.HomeDir() | |
// there is no backup with given id on the backend, regardless of its state (valid or corrupted) | |
_, err := store.Meta(ctx, GlobalBackupFile) | |
if err == nil { | |
return nil, fmt.Errorf("backup %q already exists at %q", req.ID, destPath) | |
} | |
if _, ok := err.(backup.ErrNotFound); !ok { | |
return nil, fmt.Errorf("check if backup %q exists at %q: %w", req.ID, destPath, err) | |
} | |
return classes, nil | |
} | |
func (s *Scheduler) validateRestoreRequest(ctx context.Context, store coordStore, req *BackupRequest) (*backup.DistributedBackupDescriptor, error) { | |
if !store.b.IsExternal() && s.restorer.nodeResolver.NodeCount() > 1 { | |
return nil, errLocalBackendDBRO | |
} | |
if len(req.Include) > 0 && len(req.Exclude) > 0 { | |
return nil, errIncludeExclude | |
} | |
if dup := findDuplicate(req.Include); dup != "" { | |
return nil, fmt.Errorf("class list 'include' contains duplicate: %s", dup) | |
} | |
destPath := store.HomeDir() | |
meta, err := store.Meta(ctx, GlobalBackupFile) | |
if err != nil { | |
notFoundErr := backup.ErrNotFound{} | |
if errors.As(err, ¬FoundErr) { | |
return nil, fmt.Errorf("backup id %q does not exist: %v: %w", req.ID, notFoundErr, errMetaNotFound) | |
} | |
return nil, fmt.Errorf("find backup %s: %w", destPath, err) | |
} | |
if meta.ID != req.ID { | |
return nil, fmt.Errorf("wrong backup file: expected %q got %q", req.ID, meta.ID) | |
} | |
if meta.Status != backup.Success { | |
return nil, fmt.Errorf("invalid backup %s status: %s", destPath, meta.Status) | |
} | |
if err := meta.Validate(); err != nil { | |
return nil, fmt.Errorf("corrupted backup file: %w", err) | |
} | |
if v := meta.Version; v > Version { | |
return nil, fmt.Errorf("%s: %s > %s", errMsgHigherVersion, v, Version) | |
} | |
cs := meta.Classes() | |
if len(req.Include) > 0 { | |
if first := meta.AllExist(req.Include); first != "" { | |
err = fmt.Errorf("class %s doesn't exist in the backup, but does have %v: ", first, cs) | |
return nil, err | |
} | |
meta.Include(req.Include) | |
} else { | |
meta.Exclude(req.Exclude) | |
} | |
if meta.RemoveEmpty().Count() == 0 { | |
return nil, fmt.Errorf("nothing left to restore: please choose from : %v", cs) | |
} | |
if len(req.NodeMapping) > 0 { | |
meta.NodeMapping = req.NodeMapping | |
meta.ApplyNodeMapping() | |
} | |
return meta, nil | |
} | |
func logOperation(logger logrus.FieldLogger, name, id, backend string, begin time.Time, err error) { | |
le := logger.WithField("action", name). | |
WithField("backup_id", id).WithField("backend", backend). | |
WithField("took", time.Since(begin)) | |
if err != nil { | |
le.Error(err) | |
} else { | |
le.Info() | |
} | |
} | |
// findDuplicate returns first duplicate if it is found, and "" otherwise | |
func findDuplicate(xs []string) string { | |
m := make(map[string]struct{}, len(xs)) | |
for _, x := range xs { | |
if _, ok := m[x]; ok { | |
return x | |
} | |
m[x] = struct{}{} | |
} | |
return "" | |
} | |