Spaces:
Running
Running
// _ _ | |
// __ _____ __ ___ ___ __ _| |_ ___ | |
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
// \ V V / __/ (_| |\ V /| | (_| | || __/ | |
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
// | |
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. | |
// | |
// CONTACT: [email protected] | |
// | |
package backup | |
import ( | |
"context" | |
"encoding/json" | |
"fmt" | |
"io" | |
"os" | |
"path" | |
"runtime" | |
"sync/atomic" | |
"time" | |
"github.com/prometheus/client_golang/prometheus" | |
"github.com/sirupsen/logrus" | |
"github.com/weaviate/weaviate/entities/backup" | |
"github.com/weaviate/weaviate/entities/modulecapabilities" | |
"github.com/weaviate/weaviate/usecases/monitoring" | |
"golang.org/x/sync/errgroup" | |
) | |
// TODO adjust or make configurable | |
const ( | |
storeTimeout = 24 * time.Hour | |
metaTimeout = 20 * time.Minute | |
// DefaultChunkSize if size is not specified | |
DefaultChunkSize = 1 << 27 // 128MB | |
// maxChunkSize is the upper bound on the chunk size | |
maxChunkSize = 1 << 29 // 512MB | |
// minChunkSize is the lower bound on the chunk size | |
minChunkSize = 1 << 21 // 2MB | |
// maxCPUPercentage max CPU percentage can be consumed by the file writer | |
maxCPUPercentage = 80 | |
// DefaultCPUPercentage default CPU percentage can be consumed by the file writer | |
DefaultCPUPercentage = 50 | |
) | |
const ( | |
// BackupFile used by a node to store its metadata | |
BackupFile = "backup.json" | |
// GlobalBackupFile used by coordinator to store its metadata | |
GlobalBackupFile = "backup_config.json" | |
GlobalRestoreFile = "restore_config.json" | |
_TempDirectory = ".backup.tmp" | |
) | |
var _NUMCPU = runtime.NumCPU() | |
type objStore struct { | |
b modulecapabilities.BackupBackend | |
BasePath string | |
} | |
func (s *objStore) HomeDir() string { | |
return s.b.HomeDir(s.BasePath) | |
} | |
func (s *objStore) WriteToFile(ctx context.Context, key, destPath string) error { | |
return s.b.WriteToFile(ctx, s.BasePath, key, destPath) | |
} | |
// SourceDataPath is data path of all source files | |
func (s *objStore) SourceDataPath() string { | |
return s.b.SourceDataPath() | |
} | |
func (s *objStore) Write(ctx context.Context, key string, r io.ReadCloser) (int64, error) { | |
return s.b.Write(ctx, s.BasePath, key, r) | |
} | |
func (s *objStore) Read(ctx context.Context, key string, w io.WriteCloser) (int64, error) { | |
return s.b.Read(ctx, s.BasePath, key, w) | |
} | |
func (s *objStore) Initialize(ctx context.Context) error { | |
return s.b.Initialize(ctx, s.BasePath) | |
} | |
// meta marshals and uploads metadata | |
func (s *objStore) putMeta(ctx context.Context, key string, desc interface{}) error { | |
bytes, err := json.Marshal(desc) | |
if err != nil { | |
return fmt.Errorf("marshal meta file %q: %w", key, err) | |
} | |
ctx, cancel := context.WithTimeout(ctx, metaTimeout) | |
defer cancel() | |
if err := s.b.PutObject(ctx, s.BasePath, key, bytes); err != nil { | |
return fmt.Errorf("upload meta file %q: %w", key, err) | |
} | |
return nil | |
} | |
func (s *objStore) meta(ctx context.Context, key string, dest interface{}) error { | |
bytes, err := s.b.GetObject(ctx, s.BasePath, key) | |
if err != nil { | |
return err | |
} | |
err = json.Unmarshal(bytes, dest) | |
if err != nil { | |
return fmt.Errorf("marshal meta file %q: %w", key, err) | |
} | |
return nil | |
} | |
type nodeStore struct { | |
objStore | |
} | |
// Meta gets meta data using standard path or deprecated old path | |
// | |
// adjustBasePath: sets the base path to the old path if the backup has been created prior to v1.17. | |
func (s *nodeStore) Meta(ctx context.Context, backupID string, adjustBasePath bool) (*backup.BackupDescriptor, error) { | |
var result backup.BackupDescriptor | |
err := s.meta(ctx, BackupFile, &result) | |
if err != nil { | |
cs := &objStore{s.b, backupID} // for backward compatibility | |
if err := cs.meta(ctx, BackupFile, &result); err == nil { | |
if adjustBasePath { | |
s.objStore.BasePath = backupID | |
} | |
return &result, nil | |
} | |
} | |
return &result, err | |
} | |
// meta marshals and uploads metadata | |
func (s *nodeStore) PutMeta(ctx context.Context, desc *backup.BackupDescriptor) error { | |
return s.putMeta(ctx, BackupFile, desc) | |
} | |
type coordStore struct { | |
objStore | |
} | |
// PutMeta puts coordinator's global metadata into object store | |
func (s *coordStore) PutMeta(ctx context.Context, filename string, desc *backup.DistributedBackupDescriptor) error { | |
return s.putMeta(ctx, filename, desc) | |
} | |
// Meta gets coordinator's global metadata from object store | |
func (s *coordStore) Meta(ctx context.Context, filename string) (*backup.DistributedBackupDescriptor, error) { | |
var result backup.DistributedBackupDescriptor | |
err := s.meta(ctx, filename, &result) | |
if err != nil && filename == GlobalBackupFile { | |
var oldBackup backup.BackupDescriptor | |
if err := s.meta(ctx, BackupFile, &oldBackup); err == nil { | |
return oldBackup.ToDistributed(), nil | |
} | |
} | |
return &result, err | |
} | |
// uploader uploads backup artifacts. This includes db files and metadata | |
type uploader struct { | |
sourcer Sourcer | |
backend nodeStore | |
backupID string | |
zipConfig | |
setStatus func(st backup.Status) | |
log logrus.FieldLogger | |
} | |
func newUploader(sourcer Sourcer, backend nodeStore, | |
backupID string, setstatus func(st backup.Status), l logrus.FieldLogger, | |
) *uploader { | |
return &uploader{ | |
sourcer, backend, | |
backupID, | |
newZipConfig(Compression{ | |
Level: DefaultCompression, | |
CPUPercentage: DefaultCPUPercentage, | |
ChunkSize: DefaultChunkSize, | |
}), | |
setstatus, | |
l, | |
} | |
} | |
func (u *uploader) withCompression(cfg zipConfig) *uploader { | |
u.zipConfig = cfg | |
return u | |
} | |
// all uploads all files in addition to the metadata file | |
func (u *uploader) all(ctx context.Context, classes []string, desc *backup.BackupDescriptor) (err error) { | |
u.setStatus(backup.Transferring) | |
desc.Status = string(backup.Transferring) | |
ch := u.sourcer.BackupDescriptors(ctx, desc.ID, classes) | |
defer func() { | |
// make sure context is not cancelled when uploading metadata | |
ctx := context.Background() | |
if err != nil { | |
desc.Error = err.Error() | |
err = fmt.Errorf("upload %w: %v", err, u.backend.PutMeta(ctx, desc)) | |
} else { | |
u.log.Info("start uploading meta data") | |
if err = u.backend.PutMeta(ctx, desc); err != nil { | |
desc.Status = string(backup.Transferred) | |
} | |
u.setStatus(backup.Success) | |
u.log.Info("finish uploading meta data") | |
} | |
}() | |
Loop: | |
for { | |
select { | |
case cdesc, ok := <-ch: | |
if !ok { | |
break Loop // we are done | |
} | |
if cdesc.Error != nil { | |
return cdesc.Error | |
} | |
u.log.WithField("class", cdesc.Name).Info("start uploading files") | |
if err := u.class(ctx, desc.ID, &cdesc); err != nil { | |
return err | |
} | |
desc.Classes = append(desc.Classes, cdesc) | |
u.log.WithField("class", cdesc.Name).Info("finish uploading files") | |
case <-ctx.Done(): | |
return ctx.Err() | |
} | |
} | |
u.setStatus(backup.Transferred) | |
desc.Status = string(backup.Success) | |
return nil | |
} | |
// class uploads one class | |
func (u *uploader) class(ctx context.Context, id string, desc *backup.ClassDescriptor) (err error) { | |
metric, err := monitoring.GetMetrics().BackupStoreDurations.GetMetricWithLabelValues(getType(u.backend.b), desc.Name) | |
if err == nil { | |
timer := prometheus.NewTimer(metric) | |
defer timer.ObserveDuration() | |
} | |
defer func() { | |
// backups need to be released anyway | |
go u.sourcer.ReleaseBackup(context.Background(), id, desc.Name) | |
}() | |
ctx, cancel := context.WithTimeout(ctx, storeTimeout) | |
defer cancel() | |
nShards := len(desc.Shards) | |
if nShards == 0 { | |
return nil | |
} | |
desc.Chunks = make(map[int32][]string, 1+nShards/2) | |
var ( | |
hasJobs atomic.Bool | |
lastChunk = int32(0) | |
nWorker = u.GoPoolSize | |
) | |
if nWorker > nShards { | |
nWorker = nShards | |
} | |
hasJobs.Store(nShards > 0) | |
// jobs produces work for the processor | |
jobs := func(xs []*backup.ShardDescriptor) <-chan *backup.ShardDescriptor { | |
sendCh := make(chan *backup.ShardDescriptor) | |
go func() { | |
defer close(sendCh) | |
defer hasJobs.Store(false) | |
for _, shard := range xs { | |
select { | |
case sendCh <- shard: | |
// cancellation will happen for two reasons: | |
// - 1. if the whole operation has been aborted, | |
// - 2. or if the processor routine returns an error | |
case <-ctx.Done(): | |
return | |
} | |
} | |
}() | |
return sendCh | |
} | |
// processor | |
processor := func(nWorker int, sender <-chan *backup.ShardDescriptor) <-chan chuckShards { | |
eg, ctx := errgroup.WithContext(ctx) | |
eg.SetLimit(nWorker) | |
recvCh := make(chan chuckShards, nWorker) | |
go func() { | |
defer close(recvCh) | |
for i := 0; i < nWorker; i++ { | |
eg.Go(func() error { | |
// operation might have been aborted see comment above | |
if err := ctx.Err(); err != nil { | |
return err | |
} | |
for hasJobs.Load() { | |
chunk := atomic.AddInt32(&lastChunk, 1) | |
shards, err := u.compress(ctx, desc.Name, chunk, sender) | |
if err != nil { | |
return err | |
} | |
if m := int32(len(shards)); m > 0 { | |
recvCh <- chuckShards{chunk, shards} | |
} | |
} | |
return err | |
}) | |
} | |
err = eg.Wait() | |
}() | |
return recvCh | |
} | |
for x := range processor(nWorker, jobs(desc.Shards)) { | |
desc.Chunks[x.chunk] = x.shards | |
} | |
return | |
} | |
type chuckShards struct { | |
chunk int32 | |
shards []string | |
} | |
func (u *uploader) compress(ctx context.Context, | |
class string, // class name | |
chunk int32, // chunk index | |
ch <-chan *backup.ShardDescriptor, // chan of shards | |
) ([]string, error) { | |
var ( | |
chunkKey = chunkKey(class, chunk) | |
shards = make([]string, 0, 10) | |
// add tolerance to enable better optimization of the chunk size | |
maxSize = int64(u.ChunkSize + u.ChunkSize/20) // size + 5% | |
) | |
zip, reader := NewZip(u.backend.SourceDataPath(), u.Level) | |
producer := func() error { | |
defer zip.Close() | |
lastShardSize := int64(0) | |
for shard := range ch { | |
if _, err := zip.WriteShard(ctx, shard); err != nil { | |
return err | |
} | |
shard.Chunk = chunk | |
shards = append(shards, shard.Name) | |
shard.ClearTemporary() | |
zip.gzw.Flush() // flush new shard | |
lastShardSize = zip.lastWritten() - lastShardSize | |
if zip.lastWritten()+lastShardSize > maxSize { | |
break | |
} | |
} | |
return nil | |
} | |
// consumer | |
var eg errgroup.Group | |
eg.Go(func() error { | |
if _, err := u.backend.Write(ctx, chunkKey, reader); err != nil { | |
return err | |
} | |
return nil | |
}) | |
if err := producer(); err != nil { | |
return shards, err | |
} | |
// wait for the consumer to finish | |
return shards, eg.Wait() | |
} | |
// fileWriter downloads files from object store and writes files to the destination folder destDir | |
type fileWriter struct { | |
sourcer Sourcer | |
backend nodeStore | |
tempDir string | |
destDir string | |
movedFiles []string // files successfully moved to destination folder | |
compressed bool | |
GoPoolSize int | |
} | |
func newFileWriter(sourcer Sourcer, backend nodeStore, | |
backupID string, compressed bool, | |
) *fileWriter { | |
destDir := backend.SourceDataPath() | |
return &fileWriter{ | |
sourcer: sourcer, | |
backend: backend, | |
destDir: destDir, | |
tempDir: path.Join(destDir, _TempDirectory), | |
movedFiles: make([]string, 0, 64), | |
compressed: compressed, | |
GoPoolSize: routinePoolSize(50), | |
} | |
} | |
func (fw *fileWriter) WithPoolPercentage(p int) *fileWriter { | |
fw.GoPoolSize = routinePoolSize(p) | |
return fw | |
} | |
// Write downloads files and put them in the destination directory | |
func (fw *fileWriter) Write(ctx context.Context, desc *backup.ClassDescriptor) (rollback func() error, err error) { | |
if len(desc.Shards) == 0 { // nothing to copy | |
return func() error { return nil }, nil | |
} | |
classTempDir := path.Join(fw.tempDir, desc.Name) | |
defer func() { | |
if err != nil { | |
if rerr := fw.rollBack(classTempDir); rerr != nil { | |
err = fmt.Errorf("%w: %v", err, rerr) | |
} | |
} | |
os.RemoveAll(classTempDir) | |
}() | |
if err := fw.writeTempFiles(ctx, classTempDir, desc); err != nil { | |
return nil, fmt.Errorf("get files: %w", err) | |
} | |
if err := fw.moveAll(classTempDir); err != nil { | |
return nil, fmt.Errorf("move files to destination: %w", err) | |
} | |
return func() error { return fw.rollBack(classTempDir) }, nil | |
} | |
// writeTempFiles writes class files into a temporary directory | |
// temporary directory path = d.tempDir/className | |
// Function makes sure that created files will be removed in case of an error | |
func (fw *fileWriter) writeTempFiles(ctx context.Context, classTempDir string, desc *backup.ClassDescriptor) (err error) { | |
if err := os.RemoveAll(classTempDir); err != nil { | |
return fmt.Errorf("remove %s: %w", classTempDir, err) | |
} | |
if err := os.MkdirAll(classTempDir, os.ModePerm); err != nil { | |
return fmt.Errorf("create temp class folder %s: %w", classTempDir, err) | |
} | |
ctx, cancel := context.WithCancel(ctx) | |
defer cancel() | |
// no compression processed as before | |
eg, ctx := errgroup.WithContext(ctx) | |
if !fw.compressed { | |
eg.SetLimit(2 * _NUMCPU) | |
for _, shard := range desc.Shards { | |
shard := shard | |
eg.Go(func() error { return fw.writeTempShard(ctx, shard, classTempDir) }) | |
} | |
return eg.Wait() | |
} | |
// source files are compressed | |
eg.SetLimit(fw.GoPoolSize) | |
for k := range desc.Chunks { | |
chunk := chunkKey(desc.Name, k) | |
eg.Go(func() error { | |
uz, w := NewUnzip(classTempDir) | |
go func() { | |
fw.backend.Read(ctx, chunk, w) | |
}() | |
_, err := uz.ReadChunk() | |
return err | |
}) | |
} | |
return eg.Wait() | |
} | |
func (fw *fileWriter) writeTempShard(ctx context.Context, sd *backup.ShardDescriptor, classTempDir string) error { | |
for _, key := range sd.Files { | |
destPath := path.Join(classTempDir, key) | |
destDir := path.Dir(destPath) | |
if err := os.MkdirAll(destDir, os.ModePerm); err != nil { | |
return fmt.Errorf("create folder %s: %w", destDir, err) | |
} | |
if err := fw.backend.WriteToFile(ctx, key, destPath); err != nil { | |
return fmt.Errorf("write file %s: %w", destPath, err) | |
} | |
} | |
destPath := path.Join(classTempDir, sd.DocIDCounterPath) | |
if err := os.WriteFile(destPath, sd.DocIDCounter, os.ModePerm); err != nil { | |
return fmt.Errorf("write counter file %s: %w", destPath, err) | |
} | |
destPath = path.Join(classTempDir, sd.PropLengthTrackerPath) | |
if err := os.WriteFile(destPath, sd.PropLengthTracker, os.ModePerm); err != nil { | |
return fmt.Errorf("write prop file %s: %w", destPath, err) | |
} | |
destPath = path.Join(classTempDir, sd.ShardVersionPath) | |
if err := os.WriteFile(destPath, sd.Version, os.ModePerm); err != nil { | |
return fmt.Errorf("write version file %s: %w", destPath, err) | |
} | |
return nil | |
} | |
// moveAll moves all files to the destination | |
func (fw *fileWriter) moveAll(classTempDir string) (err error) { | |
files, err := os.ReadDir(classTempDir) | |
if err != nil { | |
return fmt.Errorf("read %s", classTempDir) | |
} | |
destDir := fw.destDir | |
for _, key := range files { | |
from := path.Join(classTempDir, key.Name()) | |
to := path.Join(destDir, key.Name()) | |
if err := os.Rename(from, to); err != nil { | |
return fmt.Errorf("move %s %s: %w", from, to, err) | |
} | |
fw.movedFiles = append(fw.movedFiles, to) | |
} | |
return nil | |
} | |
// rollBack successfully written files | |
func (fw *fileWriter) rollBack(classTempDir string) (err error) { | |
// rollback successfully moved files | |
for _, fpath := range fw.movedFiles { | |
if rerr := os.RemoveAll(fpath); rerr != nil && err == nil { | |
err = fmt.Errorf("rollback %s: %w", fpath, rerr) | |
} | |
} | |
return err | |
} | |
func chunkKey(class string, id int32) string { | |
return fmt.Sprintf("%s/chunk-%d", class, id) | |
} | |
func routinePoolSize(percentage int) int { | |
if percentage == 0 { // default value | |
percentage = DefaultCPUPercentage | |
} else if percentage > maxCPUPercentage { | |
percentage = maxCPUPercentage | |
} | |
if x := (_NUMCPU * percentage) / 100; x > 0 { | |
return x | |
} | |
return 1 | |
} | |