KevinStephenson
Adding in weaviate code
b110593
raw
history blame
9.63 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package db
import (
"context"
"fmt"
"sync"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/weaviate/weaviate/entities/backup"
"github.com/weaviate/weaviate/entities/schema"
)
type BackupState struct {
BackupID string
InProgress bool
}
// Backupable returns whether all given class can be backed up.
func (db *DB) Backupable(ctx context.Context, classes []string) error {
for _, c := range classes {
className := schema.ClassName(c)
idx := db.GetIndex(className)
if idx == nil || idx.Config.ClassName != className {
return fmt.Errorf("class %v doesn't exist", c)
}
}
return nil
}
// ListBackupable returns a list of all classes which can be backed up.
func (db *DB) ListBackupable() []string {
cs := make([]string, 0, len(db.indices))
db.indexLock.RLock()
defer db.indexLock.RUnlock()
for _, idx := range db.indices {
cls := string(idx.Config.ClassName)
cs = append(cs, cls)
}
return cs
}
// BackupDescriptors returns a channel of class descriptors.
// Class descriptor records everything needed to restore a class
// If an error happens a descriptor with an error will be written to the channel just before closing it.
func (db *DB) BackupDescriptors(ctx context.Context, bakid string, classes []string,
) <-chan backup.ClassDescriptor {
ds := make(chan backup.ClassDescriptor, len(classes))
go func() {
for _, c := range classes {
desc := backup.ClassDescriptor{Name: c}
idx := db.GetIndex(schema.ClassName(c))
if idx == nil {
desc.Error = fmt.Errorf("class %v doesn't exist any more", c)
} else if err := idx.descriptor(ctx, bakid, &desc); err != nil {
desc.Error = fmt.Errorf("backup class %v descriptor: %w", c, err)
} else {
desc.Error = ctx.Err()
}
ds <- desc
if desc.Error != nil {
break
}
}
close(ds)
}()
return ds
}
func (db *DB) ShardsBackup(
ctx context.Context, bakID, class string, shards []string,
) (_ backup.ClassDescriptor, err error) {
cd := backup.ClassDescriptor{Name: class}
idx := db.GetIndex(schema.ClassName(class))
if idx == nil {
return cd, fmt.Errorf("no index for class %q", class)
}
if err := idx.initBackup(bakID); err != nil {
return cd, fmt.Errorf("init backup state for class %q: %w", class, err)
}
defer func() {
if err != nil {
go idx.ReleaseBackup(ctx, bakID)
}
}()
sm := make(map[string]ShardLike, len(shards))
for _, shardName := range shards {
shard := idx.shards.Load(shardName)
if shard == nil {
return cd, fmt.Errorf("no shard %q for class %q", shardName, class)
}
sm[shardName] = shard
}
// prevent writing into the index during collection of metadata
idx.backupMutex.Lock()
defer idx.backupMutex.Unlock()
for shardName, shard := range sm {
if err := shard.BeginBackup(ctx); err != nil {
return cd, fmt.Errorf("class %q: shard %q: begin backup: %w", class, shardName, err)
}
sd := backup.ShardDescriptor{Name: shardName}
if err := shard.ListBackupFiles(ctx, &sd); err != nil {
return cd, fmt.Errorf("class %q: shard %q: list backup files: %w", class, shardName, err)
}
cd.Shards = append(cd.Shards, &sd)
}
return cd, nil
}
// ReleaseBackup release resources acquired by the index during backup
func (db *DB) ReleaseBackup(ctx context.Context, bakID, class string) (err error) {
fields := logrus.Fields{
"op": "release_backup",
"class": class,
"id": bakID,
}
db.logger.WithFields(fields).Debug("starting")
begin := time.Now()
defer func() {
l := db.logger.WithFields(fields).WithField("took", time.Since(begin))
if err != nil {
l.Error(err)
return
}
l.Debug("finish")
}()
idx := db.GetIndex(schema.ClassName(class))
if idx != nil {
return idx.ReleaseBackup(ctx, bakID)
}
return nil
}
func (db *DB) ClassExists(name string) bool {
return db.IndexExists(schema.ClassName(name))
}
// Returns the list of nodes where shards of class are contained.
// If there are no shards for the class, returns an empty list
// If there are shards for the class but no nodes are found, return an error
func (db *DB) Shards(ctx context.Context, class string) ([]string, error) {
unique := make(map[string]struct{})
ss := db.schemaGetter.CopyShardingState(class)
if len(ss.Physical) == 0 {
return []string{}, nil
}
for _, shard := range ss.Physical {
for _, node := range shard.BelongsToNodes {
unique[node] = struct{}{}
}
}
var (
nodes = make([]string, len(unique))
counter = 0
)
for node := range unique {
nodes[counter] = node
counter++
}
if len(nodes) == 0 {
return nil, fmt.Errorf("found %v shards, but has 0 nodes", len(ss.Physical))
}
return nodes, nil
}
func (db *DB) ListClasses(ctx context.Context) []string {
classes := db.schemaGetter.GetSchemaSkipAuth().Objects.Classes
classNames := make([]string, len(classes))
for i, class := range classes {
classNames[i] = class.Class
}
return classNames
}
// descriptor record everything needed to restore a class
func (i *Index) descriptor(ctx context.Context, backupID string, desc *backup.ClassDescriptor) (err error) {
if err := i.initBackup(backupID); err != nil {
return err
}
defer func() {
if err != nil {
go i.ReleaseBackup(ctx, backupID)
}
}()
// prevent writing into the index during collection of metadata
i.backupMutex.Lock()
defer i.backupMutex.Unlock()
if err = i.ForEachShard(func(name string, s ShardLike) error {
if err = s.BeginBackup(ctx); err != nil {
return fmt.Errorf("pause compaction and flush: %w", err)
}
var sd backup.ShardDescriptor
if err := s.ListBackupFiles(ctx, &sd); err != nil {
return fmt.Errorf("list shard %v files: %w", s.Name(), err)
}
desc.Shards = append(desc.Shards, &sd)
return nil
}); err != nil {
return err
}
if desc.ShardingState, err = i.marshalShardingState(); err != nil {
return fmt.Errorf("marshal sharding state %w", err)
}
if desc.Schema, err = i.marshalSchema(); err != nil {
return fmt.Errorf("marshal schema %w", err)
}
return nil
}
// ReleaseBackup marks the specified backup as inactive and restarts all
// async background and maintenance processes. It errors if the backup does not exist
// or is already inactive.
func (i *Index) ReleaseBackup(ctx context.Context, id string) error {
defer i.resetBackupState()
if err := i.resumeMaintenanceCycles(ctx); err != nil {
return err
}
return nil
}
func (i *Index) initBackup(id string) error {
new := &BackupState{
BackupID: id,
InProgress: true,
}
if !i.lastBackup.CompareAndSwap(nil, new) {
bid := ""
if x := i.lastBackup.Load(); x != nil {
bid = x.BackupID
}
return errors.Errorf(
"cannot create new backup, backup ‘%s’ is not yet released, this "+
"means its contents have not yet been fully copied to its destination, "+
"try again later", bid)
}
return nil
}
func (i *Index) resetBackupState() {
i.lastBackup.Store(nil)
}
func (i *Index) resumeMaintenanceCycles(ctx context.Context) (lastErr error) {
i.ForEachShard(func(name string, shard ShardLike) error {
if err := shard.resumeMaintenanceCycles(ctx); err != nil {
lastErr = err
i.logger.WithField("shard", name).WithField("op", "resume_maintenance").Error(err)
}
time.Sleep(time.Millisecond * 10)
return nil
})
return lastErr
}
func (i *Index) marshalShardingState() ([]byte, error) {
b, err := i.getSchema.CopyShardingState(i.Config.ClassName.String()).JSON()
if err != nil {
return nil, errors.Wrap(err, "marshal sharding state")
}
return b, nil
}
func (i *Index) marshalSchema() ([]byte, error) {
schema := i.getSchema.GetSchemaSkipAuth()
b, err := schema.GetClass(i.Config.ClassName).MarshalBinary()
if err != nil {
return nil, errors.Wrap(err, "marshal schema")
}
return b, err
}
const (
mutexRetryDuration = time.Millisecond * 500
mutexNotifyDuration = 20 * time.Second
)
// backupMutex is an adapter built around rwmutex that facilitates cooperative blocking between write and read locks
type backupMutex struct {
sync.RWMutex
log logrus.FieldLogger
retryDuration time.Duration
notifyDuration time.Duration
}
// LockWithContext attempts to acquire a write lock while respecting the provided context.
// It reports whether the lock acquisition was successful or if the context has been cancelled.
func (m *backupMutex) LockWithContext(ctx context.Context) error {
return m.lock(ctx, m.TryLock)
}
func (m *backupMutex) lock(ctx context.Context, tryLock func() bool) error {
if tryLock() {
return nil
}
curTime := time.Now()
t := time.NewTicker(m.retryDuration)
defer t.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-t.C:
if tryLock() {
return nil
}
if time.Since(curTime) > m.notifyDuration {
curTime = time.Now()
m.log.Info("backup process waiting for ongoing writes to finish")
}
}
}
}
func (s *backupMutex) RLockGuard(reader func() error) error {
s.RLock()
defer s.RUnlock()
return reader()
}