KevinStephenson
Adding in weaviate code
b110593
raw
history blame
14.1 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package backup
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/sirupsen/logrus"
"github.com/weaviate/weaviate/entities/backup"
"github.com/weaviate/weaviate/usecases/config"
"golang.org/x/sync/errgroup"
)
// Op is the kind of a backup operation
type Op string
const (
OpCreate Op = "create"
OpRestore Op = "restore"
)
var (
errNoShardFound = errors.New("no shard found")
errCannotCommit = errors.New("cannot commit")
errMetaNotFound = errors.New("metadata not found")
errUnknownOp = errors.New("unknown backup operation")
)
const (
_BookingPeriod = time.Second * 20
_TimeoutNodeDown = 7 * time.Minute
_TimeoutQueryStatus = 5 * time.Second
_TimeoutCanCommit = 8 * time.Second
_NextRoundPeriod = 10 * time.Second
_MaxNumberConns = 16
)
type nodeMap map[string]*backup.NodeDescriptor
// participantStatus tracks status of a participant in a DBRO
type participantStatus struct {
Status backup.Status
LastTime time.Time
Reason string
}
// selector is used to select participant nodes
type selector interface {
// Shards gets all nodes on which this class is sharded
Shards(ctx context.Context, class string) ([]string, error)
// ListClasses returns a list of all existing classes
// This will be needed if user doesn't include any classes
ListClasses(ctx context.Context) []string
// Backupable returns whether all given class can be backed up.
Backupable(_ context.Context, classes []string) error
}
// coordinator coordinates a distributed backup and restore operation (DBRO):
//
// - It determines what request to send to which shard.
//
// - I will return an error, If any shards refuses to participate in DBRO.
//
// - It keeps all metadata needed to resume a DBRO in an external storage (e.g. s3).
//
// - When it starts it will check for any broken DBROs using its metadata.
//
// - It can resume a broken a DBRO
//
// - It marks the whole DBRO as failed if any shard fails to do its BRO.
//
// - The coordinator will try to repair previous DBROs whenever it is possible
type coordinator struct {
// dependencies
selector selector
client client
log logrus.FieldLogger
nodeResolver nodeResolver
// state
Participants map[string]participantStatus
descriptor *backup.DistributedBackupDescriptor
shardSyncChan
// timeouts
timeoutNodeDown time.Duration
timeoutQueryStatus time.Duration
timeoutCanCommit time.Duration
timeoutNextRound time.Duration
}
// newcoordinator creates an instance which coordinates distributed BRO operations among many shards.
func newCoordinator(
selector selector,
client client,
log logrus.FieldLogger,
nodeResolver nodeResolver,
) *coordinator {
return &coordinator{
selector: selector,
client: client,
log: log,
nodeResolver: nodeResolver,
Participants: make(map[string]participantStatus, 16),
timeoutNodeDown: _TimeoutNodeDown,
timeoutQueryStatus: _TimeoutQueryStatus,
timeoutCanCommit: _TimeoutCanCommit,
timeoutNextRound: _NextRoundPeriod,
}
}
// Backup coordinates a distributed backup among participants
func (c *coordinator) Backup(ctx context.Context, store coordStore, req *Request) error {
req.Method = OpCreate
groups, err := c.groupByShard(ctx, req.Classes)
if err != nil {
return err
}
// make sure there is no active backup
if prevID := c.lastOp.renew(req.ID, store.HomeDir()); prevID != "" {
return fmt.Errorf("backup %s already in progress", prevID)
}
c.descriptor = &backup.DistributedBackupDescriptor{
StartedAt: time.Now().UTC(),
Status: backup.Started,
ID: req.ID,
Nodes: groups,
Version: Version,
ServerVersion: config.ServerVersion,
}
for key := range c.Participants {
delete(c.Participants, key)
}
nodes, err := c.canCommit(ctx, req)
if err != nil {
c.lastOp.reset()
return err
}
if err := store.PutMeta(ctx, GlobalBackupFile, c.descriptor); err != nil {
c.lastOp.reset()
return fmt.Errorf("cannot init meta file: %w", err)
}
statusReq := StatusRequest{
Method: OpCreate,
ID: req.ID,
Backend: req.Backend,
}
go func() {
defer c.lastOp.reset()
ctx := context.Background()
c.commit(ctx, &statusReq, nodes, false)
if err := store.PutMeta(ctx, GlobalBackupFile, c.descriptor); err != nil {
c.log.WithField("action", OpCreate).
WithField("backup_id", req.ID).Errorf("put_meta: %v", err)
}
}()
return nil
}
// Restore coordinates a distributed restoration among participants
func (c *coordinator) Restore(
ctx context.Context,
store coordStore,
req *Request,
desc *backup.DistributedBackupDescriptor,
) error {
req.Method = OpRestore
// make sure there is no active backup
if prevID := c.lastOp.renew(desc.ID, store.HomeDir()); prevID != "" {
return fmt.Errorf("restoration %s already in progress", prevID)
}
for key := range c.Participants {
delete(c.Participants, key)
}
c.descriptor = desc.ResetStatus()
nodes, err := c.canCommit(ctx, req)
if err != nil {
c.lastOp.reset()
return err
}
// initial put so restore status is immediately available
if err := store.PutMeta(ctx, GlobalRestoreFile, c.descriptor); err != nil {
c.lastOp.reset()
req := &AbortRequest{Method: OpRestore, ID: desc.ID, Backend: req.Backend}
c.abortAll(ctx, req, nodes)
return fmt.Errorf("put initial metadata: %w", err)
}
statusReq := StatusRequest{Method: OpRestore, ID: desc.ID, Backend: req.Backend}
go func() {
defer c.lastOp.reset()
ctx := context.Background()
c.commit(ctx, &statusReq, nodes, true)
if err := store.PutMeta(ctx, GlobalRestoreFile, c.descriptor); err != nil {
c.log.WithField("action", OpRestore).WithField("backup_id", desc.ID).Errorf("put_meta: %v", err)
}
}()
return nil
}
func (c *coordinator) OnStatus(ctx context.Context, store coordStore, req *StatusRequest) (*Status, error) {
// check if backup is still active
st := c.lastOp.get()
if st.ID == req.ID {
return &Status{Path: st.Path, StartedAt: st.Starttime, Status: st.Status}, nil
}
filename := GlobalBackupFile
if req.Method == OpRestore {
filename = GlobalRestoreFile
}
// The backup might have been already created.
meta, err := store.Meta(ctx, filename)
if err != nil {
path := fmt.Sprintf("%s/%s", req.ID, filename)
return nil, fmt.Errorf("coordinator cannot get status: %w: %q: %v", errMetaNotFound, path, err)
}
return &Status{
Path: store.HomeDir(),
StartedAt: meta.StartedAt,
CompletedAt: meta.CompletedAt,
Status: meta.Status,
Err: meta.Error,
}, nil
}
// canCommit asks candidates if they agree to participate in DBRO
// It returns and error if any candidates refuses to participate
func (c *coordinator) canCommit(ctx context.Context, req *Request) (map[string]string, error) {
ctx, cancel := context.WithTimeout(ctx, c.timeoutCanCommit)
defer cancel()
type nodeHost struct {
node, host string
}
type pair struct {
n nodeHost
r *Request
}
id := c.descriptor.ID
nodeMapping := c.descriptor.NodeMapping
groups := c.descriptor.Nodes
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(_MaxNumberConns)
reqChan := make(chan pair)
g.Go(func() error {
defer close(reqChan)
for node, gr := range groups {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// If we have a nodeMapping with the node name from the backup, replace the node with the new one
node = c.descriptor.ToMappedNodeName(node)
host, found := c.nodeResolver.NodeHostname(node)
if !found {
return fmt.Errorf("cannot resolve hostname for %q", node)
}
reqChan <- pair{
nodeHost{node, host},
&Request{
Method: req.Method,
ID: id,
Backend: req.Backend,
Classes: gr.Classes,
Duration: _BookingPeriod,
NodeMapping: nodeMapping,
Compression: req.Compression,
},
}
}
return nil
})
mutex := sync.RWMutex{}
nodes := make(map[string]string, len(groups))
for pair := range reqChan {
pair := pair
g.Go(func() error {
resp, err := c.client.CanCommit(ctx, pair.n.host, pair.r)
if err == nil && resp.Timeout == 0 {
err = fmt.Errorf("%w : %v", errCannotCommit, resp.Err)
}
if err != nil {
return fmt.Errorf("node %q: %w", pair.n, err)
}
mutex.Lock()
nodes[pair.n.node] = pair.n.host
mutex.Unlock()
return nil
})
}
abortReq := &AbortRequest{Method: req.Method, ID: id, Backend: req.Backend}
if err := g.Wait(); err != nil {
c.abortAll(ctx, abortReq, nodes)
return nil, err
}
return nodes, nil
}
// commit tells each participant to commit its backup operation
// It stores the final result in the provided backend
func (c *coordinator) commit(ctx context.Context,
req *StatusRequest,
node2Addr map[string]string,
toleratePartialFailure bool,
) {
// create a new copy for commitAll and queryAll to mutate
node2Host := make(map[string]string, len(node2Addr))
for k, v := range node2Addr {
node2Host[k] = v
}
nFailures := c.commitAll(ctx, req, node2Host)
retryAfter := c.timeoutNextRound / 5 // 2s for first time
canContinue := len(node2Host) > 0 && (toleratePartialFailure || nFailures == 0)
for canContinue {
<-time.After(retryAfter)
retryAfter = c.timeoutNextRound
nFailures += c.queryAll(ctx, req, node2Host)
canContinue = len(node2Host) > 0 && (toleratePartialFailure || nFailures == 0)
}
if !toleratePartialFailure && nFailures > 0 {
req := &AbortRequest{Method: req.Method, ID: req.ID, Backend: req.Backend}
c.abortAll(context.Background(), req, node2Addr)
}
c.descriptor.CompletedAt = time.Now().UTC()
status := backup.Success
reason := ""
groups := c.descriptor.Nodes
for node, p := range c.Participants {
st := groups[c.descriptor.ToOriginalNodeName(node)]
st.Status, st.Error = p.Status, p.Reason
if p.Status != backup.Success {
status = backup.Failed
reason = p.Reason
}
groups[node] = st
}
c.descriptor.Status = status
c.descriptor.Error = reason
}
// queryAll queries all participant and store their statuses internally
//
// It returns the number of failed node backups
func (c *coordinator) queryAll(ctx context.Context, req *StatusRequest, nodes map[string]string) int {
ctx, cancel := context.WithTimeout(ctx, c.timeoutQueryStatus)
defer cancel()
rs := make([]partialStatus, len(nodes))
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(_MaxNumberConns)
i := 0
for node, hostname := range nodes {
j := i
hostname := hostname
rs[j].node = node
g.Go(func() error {
rs[j].StatusResponse, rs[j].err = c.client.Status(ctx, hostname, req)
return nil
})
i++
}
g.Wait()
n, now := 0, time.Now()
for _, r := range rs {
st := c.Participants[r.node]
if r.err == nil {
st.LastTime, st.Status, st.Reason = now, r.Status, r.Err
if r.Status == backup.Success {
delete(nodes, r.node)
}
if r.Status == backup.Failed {
delete(nodes, r.node)
n++
}
} else if now.Sub(st.LastTime) > c.timeoutNodeDown {
n++
st.Status = backup.Failed
st.Reason = "might be down:" + r.err.Error()
delete(nodes, r.node)
}
c.Participants[r.node] = st
}
return n
}
// commitAll tells all participants to proceed with their backup operations
// It returns the number of failures
func (c *coordinator) commitAll(ctx context.Context, req *StatusRequest, nodes map[string]string) int {
type pair struct {
node string
err error
}
errChan := make(chan pair)
aCounter := int64(len(nodes))
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(_MaxNumberConns)
for node, hostname := range nodes {
node, hostname := node, hostname
g.Go(func() error {
defer func() {
if atomic.AddInt64(&aCounter, -1) == 0 {
close(errChan)
}
}()
err := c.client.Commit(ctx, hostname, req)
if err != nil {
errChan <- pair{node, err}
}
return nil
})
}
nFailures := 0
for x := range errChan {
st := c.Participants[x.node]
st.Status = backup.Failed
st.Reason = "might be down:" + x.err.Error()
c.Participants[x.node] = st
c.log.WithField("action", req.Method).
WithField("backup_id", req.ID).
WithField("node", x.node).Error(x.err)
delete(nodes, x.node)
nFailures++
continue
}
return nFailures
}
// abortAll tells every node to abort transaction
func (c *coordinator) abortAll(ctx context.Context, req *AbortRequest, nodes map[string]string) {
for name, hostname := range nodes {
if err := c.client.Abort(ctx, hostname, req); err != nil {
c.log.WithField("action", req.Method).
WithField("backup_id", req.ID).
WithField("node", name).Errorf("abort %v", err)
}
}
}
// groupByShard returns classes group by nodes
func (c *coordinator) groupByShard(ctx context.Context, classes []string) (nodeMap, error) {
m := make(nodeMap, 32)
for _, cls := range classes {
nodes, err := c.selector.Shards(ctx, cls)
if err != nil {
return nil, fmt.Errorf("class %q: %w", cls, errNoShardFound)
}
for _, node := range nodes {
nd, ok := m[node]
if !ok {
nd = &backup.NodeDescriptor{Classes: make([]string, 0, 5)}
}
nd.Classes = append(nd.Classes, cls)
m[node] = nd
}
}
return m, nil
}
// partialStatus tracks status of a single backup operation
type partialStatus struct {
node string
*StatusResponse
err error
}