KevinStephenson
Adding in weaviate code
b110593
raw
history blame
6.49 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package backup
import (
"context"
"errors"
"fmt"
"reflect"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"github.com/weaviate/weaviate/entities/backup"
"github.com/weaviate/weaviate/usecases/monitoring"
)
type restorer struct {
node string // node name
logger logrus.FieldLogger
sourcer Sourcer
backends BackupBackendProvider
schema schemaManger
shardSyncChan
// TODO: keeping status in memory after restore has been done
// is not a proper solution for communicating status to the user.
// On app crash or restart this data will be lost
// This should be regarded as workaround and should be fixed asap
restoreStatusMap sync.Map
}
func newRestorer(node string, logger logrus.FieldLogger,
sourcer Sourcer,
backends BackupBackendProvider,
schema schemaManger,
) *restorer {
return &restorer{
node: node,
logger: logger,
sourcer: sourcer,
backends: backends,
schema: schema,
shardSyncChan: shardSyncChan{coordChan: make(chan interface{}, 5)},
}
}
func (r *restorer) restore(ctx context.Context,
req *Request,
desc *backup.BackupDescriptor,
store nodeStore,
) (CanCommitResponse, error) {
expiration := req.Duration
if expiration > _TimeoutShardCommit {
expiration = _TimeoutShardCommit
}
ret := CanCommitResponse{
Method: OpCreate,
ID: req.ID,
Timeout: expiration,
}
destPath := store.HomeDir()
// make sure there is no active restore
if prevID := r.lastOp.renew(req.ID, destPath); prevID != "" {
err := fmt.Errorf("restore %s already in progress", prevID)
return ret, err
}
r.waitingForCoordinatorToCommit.Store(true) // is set to false by wait()
go func() {
var err error
status := Status{
Path: destPath,
StartedAt: time.Now().UTC(),
Status: backup.Transferring,
}
defer func() {
status.CompletedAt = time.Now().UTC()
if err == nil {
status.Status = backup.Success
} else {
status.Err = err.Error()
status.Status = backup.Failed
}
r.restoreStatusMap.Store(basePath(req.Backend, req.ID), status)
r.lastOp.reset()
}()
if err = r.waitForCoordinator(expiration, req.ID); err != nil {
r.logger.WithField("action", "create_backup").
Error(err)
r.lastAsyncError = err
return
}
err = r.restoreAll(context.Background(), desc, req.CPUPercentage, store, req.NodeMapping)
if err != nil {
r.logger.WithField("action", "restore").WithField("backup_id", desc.ID).Error(err)
}
}()
return ret, nil
}
func (r *restorer) restoreAll(ctx context.Context,
desc *backup.BackupDescriptor, cpuPercentage int,
store nodeStore, nodeMapping map[string]string,
) (err error) {
compressed := desc.Version > version1
r.lastOp.set(backup.Transferring)
for _, cdesc := range desc.Classes {
if err := r.restoreOne(ctx, desc.ID, &cdesc, compressed, cpuPercentage, store, nodeMapping); err != nil {
return fmt.Errorf("restore class %s: %w", cdesc.Name, err)
}
r.logger.WithField("action", "restore").
WithField("backup_id", desc.ID).
WithField("class", cdesc.Name).Info("successfully restored")
}
return nil
}
func getType(myvar interface{}) string {
if t := reflect.TypeOf(myvar); t.Kind() == reflect.Ptr {
return "*" + t.Elem().Name()
} else {
return t.Name()
}
}
func (r *restorer) restoreOne(ctx context.Context,
backupID string, desc *backup.ClassDescriptor,
compressed bool, cpuPercentage int, store nodeStore, nodeMapping map[string]string,
) (err error) {
metric, err := monitoring.GetMetrics().BackupRestoreDurations.GetMetricWithLabelValues(getType(store.b), desc.Name)
if err != nil {
timer := prometheus.NewTimer(metric)
defer timer.ObserveDuration()
}
if r.sourcer.ClassExists(desc.Name) {
return fmt.Errorf("already exists")
}
fw := newFileWriter(r.sourcer, store, backupID, compressed).
WithPoolPercentage(cpuPercentage)
rollback, err := fw.Write(ctx, desc)
if err != nil {
return fmt.Errorf("write files: %w", err)
}
if err := r.schema.RestoreClass(ctx, desc, nodeMapping); err != nil {
if rerr := rollback(); rerr != nil {
r.logger.WithField("className", desc.Name).WithField("action", "rollback").Error(rerr)
}
return fmt.Errorf("restore schema: %w", err)
}
return nil
}
func (r *restorer) status(backend, ID string) (Status, error) {
if st := r.lastOp.get(); st.ID == ID {
return Status{
Path: st.Path,
StartedAt: st.Starttime,
Status: st.Status,
}, nil
}
ref := basePath(backend, ID)
istatus, ok := r.restoreStatusMap.Load(ref)
if !ok {
err := fmt.Errorf("status not found: %s", ref)
return Status{}, backup.NewErrNotFound(err)
}
return istatus.(Status), nil
}
func (r *restorer) validate(ctx context.Context, store *nodeStore, req *Request) (*backup.BackupDescriptor, []string, error) {
destPath := store.HomeDir()
meta, err := store.Meta(ctx, req.ID, true)
if err != nil {
nerr := backup.ErrNotFound{}
if errors.As(err, &nerr) {
return nil, nil, fmt.Errorf("restorer cannot validate: %w: %q (%w)", errMetaNotFound, destPath, err)
}
return nil, nil, fmt.Errorf("find backup %s: %w", destPath, err)
}
if meta.ID != req.ID {
return nil, nil, fmt.Errorf("wrong backup file: expected %q got %q", req.ID, meta.ID)
}
if meta.Status != string(backup.Success) {
err = fmt.Errorf("invalid backup %s status: %s", destPath, meta.Status)
return nil, nil, err
}
if err := meta.Validate(meta.Version > version1); err != nil {
return nil, nil, fmt.Errorf("corrupted backup file: %w", err)
}
if v := meta.Version; v > Version {
return nil, nil, fmt.Errorf("%s: %s > %s", errMsgHigherVersion, v, Version)
}
cs := meta.List()
if len(req.Classes) > 0 {
if first := meta.AllExist(req.Classes); first != "" {
err = fmt.Errorf("class %s doesn't exist in the backup, but does have %v: ", first, cs)
return nil, cs, err
}
meta.Include(req.Classes)
}
return meta, cs, nil
}