Spaces:
Running
Running
// _ _ | |
// __ _____ __ ___ ___ __ _| |_ ___ | |
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
// \ V V / __/ (_| |\ V /| | (_| | || __/ | |
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
// | |
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. | |
// | |
// CONTACT: [email protected] | |
// | |
package backup | |
import ( | |
"context" | |
"errors" | |
"fmt" | |
"reflect" | |
"sync" | |
"time" | |
"github.com/prometheus/client_golang/prometheus" | |
"github.com/sirupsen/logrus" | |
"github.com/weaviate/weaviate/entities/backup" | |
"github.com/weaviate/weaviate/usecases/monitoring" | |
) | |
type restorer struct { | |
node string // node name | |
logger logrus.FieldLogger | |
sourcer Sourcer | |
backends BackupBackendProvider | |
schema schemaManger | |
shardSyncChan | |
// TODO: keeping status in memory after restore has been done | |
// is not a proper solution for communicating status to the user. | |
// On app crash or restart this data will be lost | |
// This should be regarded as workaround and should be fixed asap | |
restoreStatusMap sync.Map | |
} | |
func newRestorer(node string, logger logrus.FieldLogger, | |
sourcer Sourcer, | |
backends BackupBackendProvider, | |
schema schemaManger, | |
) *restorer { | |
return &restorer{ | |
node: node, | |
logger: logger, | |
sourcer: sourcer, | |
backends: backends, | |
schema: schema, | |
shardSyncChan: shardSyncChan{coordChan: make(chan interface{}, 5)}, | |
} | |
} | |
func (r *restorer) restore(ctx context.Context, | |
req *Request, | |
desc *backup.BackupDescriptor, | |
store nodeStore, | |
) (CanCommitResponse, error) { | |
expiration := req.Duration | |
if expiration > _TimeoutShardCommit { | |
expiration = _TimeoutShardCommit | |
} | |
ret := CanCommitResponse{ | |
Method: OpCreate, | |
ID: req.ID, | |
Timeout: expiration, | |
} | |
destPath := store.HomeDir() | |
// make sure there is no active restore | |
if prevID := r.lastOp.renew(req.ID, destPath); prevID != "" { | |
err := fmt.Errorf("restore %s already in progress", prevID) | |
return ret, err | |
} | |
r.waitingForCoordinatorToCommit.Store(true) // is set to false by wait() | |
go func() { | |
var err error | |
status := Status{ | |
Path: destPath, | |
StartedAt: time.Now().UTC(), | |
Status: backup.Transferring, | |
} | |
defer func() { | |
status.CompletedAt = time.Now().UTC() | |
if err == nil { | |
status.Status = backup.Success | |
} else { | |
status.Err = err.Error() | |
status.Status = backup.Failed | |
} | |
r.restoreStatusMap.Store(basePath(req.Backend, req.ID), status) | |
r.lastOp.reset() | |
}() | |
if err = r.waitForCoordinator(expiration, req.ID); err != nil { | |
r.logger.WithField("action", "create_backup"). | |
Error(err) | |
r.lastAsyncError = err | |
return | |
} | |
err = r.restoreAll(context.Background(), desc, req.CPUPercentage, store, req.NodeMapping) | |
if err != nil { | |
r.logger.WithField("action", "restore").WithField("backup_id", desc.ID).Error(err) | |
} | |
}() | |
return ret, nil | |
} | |
func (r *restorer) restoreAll(ctx context.Context, | |
desc *backup.BackupDescriptor, cpuPercentage int, | |
store nodeStore, nodeMapping map[string]string, | |
) (err error) { | |
compressed := desc.Version > version1 | |
r.lastOp.set(backup.Transferring) | |
for _, cdesc := range desc.Classes { | |
if err := r.restoreOne(ctx, desc.ID, &cdesc, compressed, cpuPercentage, store, nodeMapping); err != nil { | |
return fmt.Errorf("restore class %s: %w", cdesc.Name, err) | |
} | |
r.logger.WithField("action", "restore"). | |
WithField("backup_id", desc.ID). | |
WithField("class", cdesc.Name).Info("successfully restored") | |
} | |
return nil | |
} | |
func getType(myvar interface{}) string { | |
if t := reflect.TypeOf(myvar); t.Kind() == reflect.Ptr { | |
return "*" + t.Elem().Name() | |
} else { | |
return t.Name() | |
} | |
} | |
func (r *restorer) restoreOne(ctx context.Context, | |
backupID string, desc *backup.ClassDescriptor, | |
compressed bool, cpuPercentage int, store nodeStore, nodeMapping map[string]string, | |
) (err error) { | |
metric, err := monitoring.GetMetrics().BackupRestoreDurations.GetMetricWithLabelValues(getType(store.b), desc.Name) | |
if err != nil { | |
timer := prometheus.NewTimer(metric) | |
defer timer.ObserveDuration() | |
} | |
if r.sourcer.ClassExists(desc.Name) { | |
return fmt.Errorf("already exists") | |
} | |
fw := newFileWriter(r.sourcer, store, backupID, compressed). | |
WithPoolPercentage(cpuPercentage) | |
rollback, err := fw.Write(ctx, desc) | |
if err != nil { | |
return fmt.Errorf("write files: %w", err) | |
} | |
if err := r.schema.RestoreClass(ctx, desc, nodeMapping); err != nil { | |
if rerr := rollback(); rerr != nil { | |
r.logger.WithField("className", desc.Name).WithField("action", "rollback").Error(rerr) | |
} | |
return fmt.Errorf("restore schema: %w", err) | |
} | |
return nil | |
} | |
func (r *restorer) status(backend, ID string) (Status, error) { | |
if st := r.lastOp.get(); st.ID == ID { | |
return Status{ | |
Path: st.Path, | |
StartedAt: st.Starttime, | |
Status: st.Status, | |
}, nil | |
} | |
ref := basePath(backend, ID) | |
istatus, ok := r.restoreStatusMap.Load(ref) | |
if !ok { | |
err := fmt.Errorf("status not found: %s", ref) | |
return Status{}, backup.NewErrNotFound(err) | |
} | |
return istatus.(Status), nil | |
} | |
func (r *restorer) validate(ctx context.Context, store *nodeStore, req *Request) (*backup.BackupDescriptor, []string, error) { | |
destPath := store.HomeDir() | |
meta, err := store.Meta(ctx, req.ID, true) | |
if err != nil { | |
nerr := backup.ErrNotFound{} | |
if errors.As(err, &nerr) { | |
return nil, nil, fmt.Errorf("restorer cannot validate: %w: %q (%w)", errMetaNotFound, destPath, err) | |
} | |
return nil, nil, fmt.Errorf("find backup %s: %w", destPath, err) | |
} | |
if meta.ID != req.ID { | |
return nil, nil, fmt.Errorf("wrong backup file: expected %q got %q", req.ID, meta.ID) | |
} | |
if meta.Status != string(backup.Success) { | |
err = fmt.Errorf("invalid backup %s status: %s", destPath, meta.Status) | |
return nil, nil, err | |
} | |
if err := meta.Validate(meta.Version > version1); err != nil { | |
return nil, nil, fmt.Errorf("corrupted backup file: %w", err) | |
} | |
if v := meta.Version; v > Version { | |
return nil, nil, fmt.Errorf("%s: %s > %s", errMsgHigherVersion, v, Version) | |
} | |
cs := meta.List() | |
if len(req.Classes) > 0 { | |
if first := meta.AllExist(req.Classes); first != "" { | |
err = fmt.Errorf("class %s doesn't exist in the backup, but does have %v: ", first, cs) | |
return nil, cs, err | |
} | |
meta.Include(req.Classes) | |
} | |
return meta, cs, nil | |
} | |