Spaces:
Running
Running
// _ _ | |
// __ _____ __ ___ ___ __ _| |_ ___ | |
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
// \ V V / __/ (_| |\ V /| | (_| | || __/ | |
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
// | |
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. | |
// | |
// CONTACT: [email protected] | |
// | |
package backup | |
import ( | |
"context" | |
"errors" | |
"fmt" | |
"sync" | |
"sync/atomic" | |
"time" | |
"github.com/sirupsen/logrus" | |
"github.com/weaviate/weaviate/entities/backup" | |
"github.com/weaviate/weaviate/usecases/config" | |
"golang.org/x/sync/errgroup" | |
) | |
// Op is the kind of a backup operation | |
type Op string | |
const ( | |
OpCreate Op = "create" | |
OpRestore Op = "restore" | |
) | |
var ( | |
errNoShardFound = errors.New("no shard found") | |
errCannotCommit = errors.New("cannot commit") | |
errMetaNotFound = errors.New("metadata not found") | |
errUnknownOp = errors.New("unknown backup operation") | |
) | |
const ( | |
_BookingPeriod = time.Second * 20 | |
_TimeoutNodeDown = 7 * time.Minute | |
_TimeoutQueryStatus = 5 * time.Second | |
_TimeoutCanCommit = 8 * time.Second | |
_NextRoundPeriod = 10 * time.Second | |
_MaxNumberConns = 16 | |
) | |
type nodeMap map[string]*backup.NodeDescriptor | |
// participantStatus tracks status of a participant in a DBRO | |
type participantStatus struct { | |
Status backup.Status | |
LastTime time.Time | |
Reason string | |
} | |
// selector is used to select participant nodes | |
type selector interface { | |
// Shards gets all nodes on which this class is sharded | |
Shards(ctx context.Context, class string) ([]string, error) | |
// ListClasses returns a list of all existing classes | |
// This will be needed if user doesn't include any classes | |
ListClasses(ctx context.Context) []string | |
// Backupable returns whether all given class can be backed up. | |
Backupable(_ context.Context, classes []string) error | |
} | |
// coordinator coordinates a distributed backup and restore operation (DBRO): | |
// | |
// - It determines what request to send to which shard. | |
// | |
// - I will return an error, If any shards refuses to participate in DBRO. | |
// | |
// - It keeps all metadata needed to resume a DBRO in an external storage (e.g. s3). | |
// | |
// - When it starts it will check for any broken DBROs using its metadata. | |
// | |
// - It can resume a broken a DBRO | |
// | |
// - It marks the whole DBRO as failed if any shard fails to do its BRO. | |
// | |
// - The coordinator will try to repair previous DBROs whenever it is possible | |
type coordinator struct { | |
// dependencies | |
selector selector | |
client client | |
log logrus.FieldLogger | |
nodeResolver nodeResolver | |
// state | |
Participants map[string]participantStatus | |
descriptor *backup.DistributedBackupDescriptor | |
shardSyncChan | |
// timeouts | |
timeoutNodeDown time.Duration | |
timeoutQueryStatus time.Duration | |
timeoutCanCommit time.Duration | |
timeoutNextRound time.Duration | |
} | |
// newcoordinator creates an instance which coordinates distributed BRO operations among many shards. | |
func newCoordinator( | |
selector selector, | |
client client, | |
log logrus.FieldLogger, | |
nodeResolver nodeResolver, | |
) *coordinator { | |
return &coordinator{ | |
selector: selector, | |
client: client, | |
log: log, | |
nodeResolver: nodeResolver, | |
Participants: make(map[string]participantStatus, 16), | |
timeoutNodeDown: _TimeoutNodeDown, | |
timeoutQueryStatus: _TimeoutQueryStatus, | |
timeoutCanCommit: _TimeoutCanCommit, | |
timeoutNextRound: _NextRoundPeriod, | |
} | |
} | |
// Backup coordinates a distributed backup among participants | |
func (c *coordinator) Backup(ctx context.Context, store coordStore, req *Request) error { | |
req.Method = OpCreate | |
groups, err := c.groupByShard(ctx, req.Classes) | |
if err != nil { | |
return err | |
} | |
// make sure there is no active backup | |
if prevID := c.lastOp.renew(req.ID, store.HomeDir()); prevID != "" { | |
return fmt.Errorf("backup %s already in progress", prevID) | |
} | |
c.descriptor = &backup.DistributedBackupDescriptor{ | |
StartedAt: time.Now().UTC(), | |
Status: backup.Started, | |
ID: req.ID, | |
Nodes: groups, | |
Version: Version, | |
ServerVersion: config.ServerVersion, | |
} | |
for key := range c.Participants { | |
delete(c.Participants, key) | |
} | |
nodes, err := c.canCommit(ctx, req) | |
if err != nil { | |
c.lastOp.reset() | |
return err | |
} | |
if err := store.PutMeta(ctx, GlobalBackupFile, c.descriptor); err != nil { | |
c.lastOp.reset() | |
return fmt.Errorf("cannot init meta file: %w", err) | |
} | |
statusReq := StatusRequest{ | |
Method: OpCreate, | |
ID: req.ID, | |
Backend: req.Backend, | |
} | |
go func() { | |
defer c.lastOp.reset() | |
ctx := context.Background() | |
c.commit(ctx, &statusReq, nodes, false) | |
if err := store.PutMeta(ctx, GlobalBackupFile, c.descriptor); err != nil { | |
c.log.WithField("action", OpCreate). | |
WithField("backup_id", req.ID).Errorf("put_meta: %v", err) | |
} | |
}() | |
return nil | |
} | |
// Restore coordinates a distributed restoration among participants | |
func (c *coordinator) Restore( | |
ctx context.Context, | |
store coordStore, | |
req *Request, | |
desc *backup.DistributedBackupDescriptor, | |
) error { | |
req.Method = OpRestore | |
// make sure there is no active backup | |
if prevID := c.lastOp.renew(desc.ID, store.HomeDir()); prevID != "" { | |
return fmt.Errorf("restoration %s already in progress", prevID) | |
} | |
for key := range c.Participants { | |
delete(c.Participants, key) | |
} | |
c.descriptor = desc.ResetStatus() | |
nodes, err := c.canCommit(ctx, req) | |
if err != nil { | |
c.lastOp.reset() | |
return err | |
} | |
// initial put so restore status is immediately available | |
if err := store.PutMeta(ctx, GlobalRestoreFile, c.descriptor); err != nil { | |
c.lastOp.reset() | |
req := &AbortRequest{Method: OpRestore, ID: desc.ID, Backend: req.Backend} | |
c.abortAll(ctx, req, nodes) | |
return fmt.Errorf("put initial metadata: %w", err) | |
} | |
statusReq := StatusRequest{Method: OpRestore, ID: desc.ID, Backend: req.Backend} | |
go func() { | |
defer c.lastOp.reset() | |
ctx := context.Background() | |
c.commit(ctx, &statusReq, nodes, true) | |
if err := store.PutMeta(ctx, GlobalRestoreFile, c.descriptor); err != nil { | |
c.log.WithField("action", OpRestore).WithField("backup_id", desc.ID).Errorf("put_meta: %v", err) | |
} | |
}() | |
return nil | |
} | |
func (c *coordinator) OnStatus(ctx context.Context, store coordStore, req *StatusRequest) (*Status, error) { | |
// check if backup is still active | |
st := c.lastOp.get() | |
if st.ID == req.ID { | |
return &Status{Path: st.Path, StartedAt: st.Starttime, Status: st.Status}, nil | |
} | |
filename := GlobalBackupFile | |
if req.Method == OpRestore { | |
filename = GlobalRestoreFile | |
} | |
// The backup might have been already created. | |
meta, err := store.Meta(ctx, filename) | |
if err != nil { | |
path := fmt.Sprintf("%s/%s", req.ID, filename) | |
return nil, fmt.Errorf("coordinator cannot get status: %w: %q: %v", errMetaNotFound, path, err) | |
} | |
return &Status{ | |
Path: store.HomeDir(), | |
StartedAt: meta.StartedAt, | |
CompletedAt: meta.CompletedAt, | |
Status: meta.Status, | |
Err: meta.Error, | |
}, nil | |
} | |
// canCommit asks candidates if they agree to participate in DBRO | |
// It returns and error if any candidates refuses to participate | |
func (c *coordinator) canCommit(ctx context.Context, req *Request) (map[string]string, error) { | |
ctx, cancel := context.WithTimeout(ctx, c.timeoutCanCommit) | |
defer cancel() | |
type nodeHost struct { | |
node, host string | |
} | |
type pair struct { | |
n nodeHost | |
r *Request | |
} | |
id := c.descriptor.ID | |
nodeMapping := c.descriptor.NodeMapping | |
groups := c.descriptor.Nodes | |
g, ctx := errgroup.WithContext(ctx) | |
g.SetLimit(_MaxNumberConns) | |
reqChan := make(chan pair) | |
g.Go(func() error { | |
defer close(reqChan) | |
for node, gr := range groups { | |
select { | |
case <-ctx.Done(): | |
return ctx.Err() | |
default: | |
} | |
// If we have a nodeMapping with the node name from the backup, replace the node with the new one | |
node = c.descriptor.ToMappedNodeName(node) | |
host, found := c.nodeResolver.NodeHostname(node) | |
if !found { | |
return fmt.Errorf("cannot resolve hostname for %q", node) | |
} | |
reqChan <- pair{ | |
nodeHost{node, host}, | |
&Request{ | |
Method: req.Method, | |
ID: id, | |
Backend: req.Backend, | |
Classes: gr.Classes, | |
Duration: _BookingPeriod, | |
NodeMapping: nodeMapping, | |
Compression: req.Compression, | |
}, | |
} | |
} | |
return nil | |
}) | |
mutex := sync.RWMutex{} | |
nodes := make(map[string]string, len(groups)) | |
for pair := range reqChan { | |
pair := pair | |
g.Go(func() error { | |
resp, err := c.client.CanCommit(ctx, pair.n.host, pair.r) | |
if err == nil && resp.Timeout == 0 { | |
err = fmt.Errorf("%w : %v", errCannotCommit, resp.Err) | |
} | |
if err != nil { | |
return fmt.Errorf("node %q: %w", pair.n, err) | |
} | |
mutex.Lock() | |
nodes[pair.n.node] = pair.n.host | |
mutex.Unlock() | |
return nil | |
}) | |
} | |
abortReq := &AbortRequest{Method: req.Method, ID: id, Backend: req.Backend} | |
if err := g.Wait(); err != nil { | |
c.abortAll(ctx, abortReq, nodes) | |
return nil, err | |
} | |
return nodes, nil | |
} | |
// commit tells each participant to commit its backup operation | |
// It stores the final result in the provided backend | |
func (c *coordinator) commit(ctx context.Context, | |
req *StatusRequest, | |
node2Addr map[string]string, | |
toleratePartialFailure bool, | |
) { | |
// create a new copy for commitAll and queryAll to mutate | |
node2Host := make(map[string]string, len(node2Addr)) | |
for k, v := range node2Addr { | |
node2Host[k] = v | |
} | |
nFailures := c.commitAll(ctx, req, node2Host) | |
retryAfter := c.timeoutNextRound / 5 // 2s for first time | |
canContinue := len(node2Host) > 0 && (toleratePartialFailure || nFailures == 0) | |
for canContinue { | |
<-time.After(retryAfter) | |
retryAfter = c.timeoutNextRound | |
nFailures += c.queryAll(ctx, req, node2Host) | |
canContinue = len(node2Host) > 0 && (toleratePartialFailure || nFailures == 0) | |
} | |
if !toleratePartialFailure && nFailures > 0 { | |
req := &AbortRequest{Method: req.Method, ID: req.ID, Backend: req.Backend} | |
c.abortAll(context.Background(), req, node2Addr) | |
} | |
c.descriptor.CompletedAt = time.Now().UTC() | |
status := backup.Success | |
reason := "" | |
groups := c.descriptor.Nodes | |
for node, p := range c.Participants { | |
st := groups[c.descriptor.ToOriginalNodeName(node)] | |
st.Status, st.Error = p.Status, p.Reason | |
if p.Status != backup.Success { | |
status = backup.Failed | |
reason = p.Reason | |
} | |
groups[node] = st | |
} | |
c.descriptor.Status = status | |
c.descriptor.Error = reason | |
} | |
// queryAll queries all participant and store their statuses internally | |
// | |
// It returns the number of failed node backups | |
func (c *coordinator) queryAll(ctx context.Context, req *StatusRequest, nodes map[string]string) int { | |
ctx, cancel := context.WithTimeout(ctx, c.timeoutQueryStatus) | |
defer cancel() | |
rs := make([]partialStatus, len(nodes)) | |
g, ctx := errgroup.WithContext(ctx) | |
g.SetLimit(_MaxNumberConns) | |
i := 0 | |
for node, hostname := range nodes { | |
j := i | |
hostname := hostname | |
rs[j].node = node | |
g.Go(func() error { | |
rs[j].StatusResponse, rs[j].err = c.client.Status(ctx, hostname, req) | |
return nil | |
}) | |
i++ | |
} | |
g.Wait() | |
n, now := 0, time.Now() | |
for _, r := range rs { | |
st := c.Participants[r.node] | |
if r.err == nil { | |
st.LastTime, st.Status, st.Reason = now, r.Status, r.Err | |
if r.Status == backup.Success { | |
delete(nodes, r.node) | |
} | |
if r.Status == backup.Failed { | |
delete(nodes, r.node) | |
n++ | |
} | |
} else if now.Sub(st.LastTime) > c.timeoutNodeDown { | |
n++ | |
st.Status = backup.Failed | |
st.Reason = "might be down:" + r.err.Error() | |
delete(nodes, r.node) | |
} | |
c.Participants[r.node] = st | |
} | |
return n | |
} | |
// commitAll tells all participants to proceed with their backup operations | |
// It returns the number of failures | |
func (c *coordinator) commitAll(ctx context.Context, req *StatusRequest, nodes map[string]string) int { | |
type pair struct { | |
node string | |
err error | |
} | |
errChan := make(chan pair) | |
aCounter := int64(len(nodes)) | |
g, ctx := errgroup.WithContext(ctx) | |
g.SetLimit(_MaxNumberConns) | |
for node, hostname := range nodes { | |
node, hostname := node, hostname | |
g.Go(func() error { | |
defer func() { | |
if atomic.AddInt64(&aCounter, -1) == 0 { | |
close(errChan) | |
} | |
}() | |
err := c.client.Commit(ctx, hostname, req) | |
if err != nil { | |
errChan <- pair{node, err} | |
} | |
return nil | |
}) | |
} | |
nFailures := 0 | |
for x := range errChan { | |
st := c.Participants[x.node] | |
st.Status = backup.Failed | |
st.Reason = "might be down:" + x.err.Error() | |
c.Participants[x.node] = st | |
c.log.WithField("action", req.Method). | |
WithField("backup_id", req.ID). | |
WithField("node", x.node).Error(x.err) | |
delete(nodes, x.node) | |
nFailures++ | |
continue | |
} | |
return nFailures | |
} | |
// abortAll tells every node to abort transaction | |
func (c *coordinator) abortAll(ctx context.Context, req *AbortRequest, nodes map[string]string) { | |
for name, hostname := range nodes { | |
if err := c.client.Abort(ctx, hostname, req); err != nil { | |
c.log.WithField("action", req.Method). | |
WithField("backup_id", req.ID). | |
WithField("node", name).Errorf("abort %v", err) | |
} | |
} | |
} | |
// groupByShard returns classes group by nodes | |
func (c *coordinator) groupByShard(ctx context.Context, classes []string) (nodeMap, error) { | |
m := make(nodeMap, 32) | |
for _, cls := range classes { | |
nodes, err := c.selector.Shards(ctx, cls) | |
if err != nil { | |
return nil, fmt.Errorf("class %q: %w", cls, errNoShardFound) | |
} | |
for _, node := range nodes { | |
nd, ok := m[node] | |
if !ok { | |
nd = &backup.NodeDescriptor{Classes: make([]string, 0, 5)} | |
} | |
nd.Classes = append(nd.Classes, cls) | |
m[node] = nd | |
} | |
} | |
return m, nil | |
} | |
// partialStatus tracks status of a single backup operation | |
type partialStatus struct { | |
node string | |
*StatusResponse | |
err error | |
} | |