Spaces:
Running
Running
File size: 4,528 Bytes
b110593 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package lsmkv
import (
"time"
"github.com/pkg/errors"
)
type BucketOption func(b *Bucket) error
func WithStrategy(strategy string) BucketOption {
return func(b *Bucket) error {
switch strategy {
case StrategyReplace, StrategyMapCollection, StrategySetCollection,
StrategyRoaringSet:
default:
return errors.Errorf("unrecognized strategy %q", strategy)
}
b.strategy = strategy
return nil
}
}
func WithMemtableThreshold(threshold uint64) BucketOption {
return func(b *Bucket) error {
b.memtableThreshold = threshold
return nil
}
}
func WithWalThreshold(threshold uint64) BucketOption {
return func(b *Bucket) error {
b.walThreshold = threshold
return nil
}
}
func WithIdleThreshold(threshold time.Duration) BucketOption {
return func(b *Bucket) error {
b.flushAfterIdle = threshold
return nil
}
}
func WithSecondaryIndices(count uint16) BucketOption {
return func(b *Bucket) error {
b.secondaryIndices = count
return nil
}
}
func WithLegacyMapSorting() BucketOption {
return func(b *Bucket) error {
b.legacyMapSortingBeforeCompaction = true
return nil
}
}
func WithPread(with bool) BucketOption {
return func(b *Bucket) error {
b.mmapContents = !with
return nil
}
}
func WithDynamicMemtableSizing(
initialMB, maxMB, minActiveSeconds, maxActiveSeconds int,
) BucketOption {
return func(b *Bucket) error {
mb := 1024 * 1024
cfg := memtableSizeAdvisorCfg{
initial: initialMB * mb,
stepSize: 10 * mb,
maxSize: maxMB * mb,
minDuration: time.Duration(minActiveSeconds) * time.Second,
maxDuration: time.Duration(maxActiveSeconds) * time.Second,
}
b.memtableResizer = newMemtableSizeAdvisor(cfg)
return nil
}
}
type secondaryIndexKeys [][]byte
type SecondaryKeyOption func(s secondaryIndexKeys) error
func WithSecondaryKey(pos int, key []byte) SecondaryKeyOption {
return func(s secondaryIndexKeys) error {
if pos > len(s) {
return errors.Errorf("set secondary index %d on an index of length %d",
pos, len(s))
}
s[pos] = key
return nil
}
}
func WithMonitorCount() BucketOption {
return func(b *Bucket) error {
if b.strategy != StrategyReplace {
return errors.Errorf("count monitoring only supported on 'replace' buckets")
}
b.monitorCount = true
return nil
}
}
func WithKeepTombstones(keepTombstones bool) BucketOption {
return func(b *Bucket) error {
b.keepTombstones = keepTombstones
return nil
}
}
func WithUseBloomFilter(useBloomFilter bool) BucketOption {
return func(b *Bucket) error {
b.useBloomFilter = useBloomFilter
return nil
}
}
func WithCalcCountNetAdditions(calcCountNetAdditions bool) BucketOption {
return func(b *Bucket) error {
b.calcCountNetAdditions = calcCountNetAdditions
return nil
}
}
/*
Background for this option:
We use the LSM store in two places:
Our existing key/value and inverted buckets
As part of the new brute-force based index (to be built this week).
Brute-force index
This is a simple disk-index where we use a cursor to iterate over all objects. This is what we need the force-compaction for. The experimentation so far has shown that the cursor is much more performant on a single segment than it is on multiple segments. This is because with a single segment it’s essentially just one conitiguuous chunk of data on disk that we read through. But with multiple segments (and an unpredicatable order) it ends up being many tiny reads (inefficient).
Existing uses of the LSM store
For existing uses, e.g. the object store, we don’t want to force-compact. This is because they can grow massive. For example, you could have a 100GB segment, then a new write leads to a new segment that is just a few bytes. If we would force-compact those two we would write 100GB every time the user sends a few bytes to Weaviate. In this case, the existing tiered compaction strategy makes more sense.
Configurability of buckets
*/
func WithForceCompation(opt bool) BucketOption {
return func(b *Bucket) error {
b.forceCompaction = opt
return nil
}
}
|