File size: 2,687 Bytes
b110593
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
//                           _       _
// __      _____  __ ___   ___  __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
//  \ V  V /  __/ (_| |\ V /| | (_| | ||  __/
//   \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
//  Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
//  CONTACT: [email protected]
//

package lsmkv

import (
	"context"
	"io/fs"
	"path"
	"path/filepath"

	"github.com/pkg/errors"
	"github.com/weaviate/weaviate/entities/storagestate"
)

// FlushMemtable flushes any active memtable and returns only once the memtable
// has been fully flushed and a stable state on disk has been reached.
//
// This is a preparatory stage for creating backups.
//
// Method should be run only if flushCycle is not running
// (was not started, is stopped, or noop impl is provided)
func (b *Bucket) FlushMemtable() error {
	if b.isReadOnly() {
		return errors.Wrap(storagestate.ErrStatusReadOnly, "flush memtable")
	}

	// this lock does not currently _need_ to be
	// obtained, as the only other place that
	// grabs this lock is the flush cycle, which
	// has just been stopped above.
	//
	// that being said, we will lock here anyway
	// as flushLock may be added elsewhere in the
	// future
	b.flushLock.Lock()
	if b.active == nil && b.flushing == nil {
		b.flushLock.Unlock()
		return nil
	}
	b.flushLock.Unlock()

	stat, err := b.active.commitlog.file.Stat()
	if err != nil {
		b.logger.WithField("action", "lsm_wal_stat").
			WithField("path", b.dir).
			WithError(err).
			Fatal("bucket backup memtable flush failed")
	}

	// attempting a flush&switch on when the active memtable
	// or WAL is empty results in a corrupted backup attempt
	if b.active.Size() > 0 || stat.Size() > 0 {
		if err := b.FlushAndSwitch(); err != nil {
			return err
		}
	}
	return nil
}

// ListFiles lists all files that currently exist in the Bucket. The files are only
// in a stable state if the memtable is empty, and if compactions are paused. If one
// of those conditions is not given, it errors
func (b *Bucket) ListFiles(ctx context.Context, basePath string) ([]string, error) {
	var (
		bucketRoot = b.disk.dir
		files      []string
	)

	err := filepath.WalkDir(bucketRoot, func(currPath string, d fs.DirEntry, err error) error {
		if d.IsDir() {
			return nil
		}
		// ignore .wal files because they are not immutable
		if filepath.Ext(currPath) == ".wal" {
			return nil
		}
		files = append(files, path.Join(basePath, path.Base(currPath)))
		return nil
	})
	if err != nil {
		return nil, errors.Errorf("failed to list files for bucket: %s", err)
	}

	return files, nil
}