KevinStephenson
Adding in weaviate code
b110593
raw
history blame
4.53 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package lsmkv
import (
"time"
"github.com/pkg/errors"
)
type BucketOption func(b *Bucket) error
func WithStrategy(strategy string) BucketOption {
return func(b *Bucket) error {
switch strategy {
case StrategyReplace, StrategyMapCollection, StrategySetCollection,
StrategyRoaringSet:
default:
return errors.Errorf("unrecognized strategy %q", strategy)
}
b.strategy = strategy
return nil
}
}
func WithMemtableThreshold(threshold uint64) BucketOption {
return func(b *Bucket) error {
b.memtableThreshold = threshold
return nil
}
}
func WithWalThreshold(threshold uint64) BucketOption {
return func(b *Bucket) error {
b.walThreshold = threshold
return nil
}
}
func WithIdleThreshold(threshold time.Duration) BucketOption {
return func(b *Bucket) error {
b.flushAfterIdle = threshold
return nil
}
}
func WithSecondaryIndices(count uint16) BucketOption {
return func(b *Bucket) error {
b.secondaryIndices = count
return nil
}
}
func WithLegacyMapSorting() BucketOption {
return func(b *Bucket) error {
b.legacyMapSortingBeforeCompaction = true
return nil
}
}
func WithPread(with bool) BucketOption {
return func(b *Bucket) error {
b.mmapContents = !with
return nil
}
}
func WithDynamicMemtableSizing(
initialMB, maxMB, minActiveSeconds, maxActiveSeconds int,
) BucketOption {
return func(b *Bucket) error {
mb := 1024 * 1024
cfg := memtableSizeAdvisorCfg{
initial: initialMB * mb,
stepSize: 10 * mb,
maxSize: maxMB * mb,
minDuration: time.Duration(minActiveSeconds) * time.Second,
maxDuration: time.Duration(maxActiveSeconds) * time.Second,
}
b.memtableResizer = newMemtableSizeAdvisor(cfg)
return nil
}
}
type secondaryIndexKeys [][]byte
type SecondaryKeyOption func(s secondaryIndexKeys) error
func WithSecondaryKey(pos int, key []byte) SecondaryKeyOption {
return func(s secondaryIndexKeys) error {
if pos > len(s) {
return errors.Errorf("set secondary index %d on an index of length %d",
pos, len(s))
}
s[pos] = key
return nil
}
}
func WithMonitorCount() BucketOption {
return func(b *Bucket) error {
if b.strategy != StrategyReplace {
return errors.Errorf("count monitoring only supported on 'replace' buckets")
}
b.monitorCount = true
return nil
}
}
func WithKeepTombstones(keepTombstones bool) BucketOption {
return func(b *Bucket) error {
b.keepTombstones = keepTombstones
return nil
}
}
func WithUseBloomFilter(useBloomFilter bool) BucketOption {
return func(b *Bucket) error {
b.useBloomFilter = useBloomFilter
return nil
}
}
func WithCalcCountNetAdditions(calcCountNetAdditions bool) BucketOption {
return func(b *Bucket) error {
b.calcCountNetAdditions = calcCountNetAdditions
return nil
}
}
/*
Background for this option:
We use the LSM store in two places:
Our existing key/value and inverted buckets
As part of the new brute-force based index (to be built this week).
Brute-force index
This is a simple disk-index where we use a cursor to iterate over all objects. This is what we need the force-compaction for. The experimentation so far has shown that the cursor is much more performant on a single segment than it is on multiple segments. This is because with a single segment it’s essentially just one conitiguuous chunk of data on disk that we read through. But with multiple segments (and an unpredicatable order) it ends up being many tiny reads (inefficient).
Existing uses of the LSM store
For existing uses, e.g. the object store, we don’t want to force-compact. This is because they can grow massive. For example, you could have a 100GB segment, then a new write leads to a new segment that is just a few bytes. If we would force-compact those two we would write 100GB every time the user sends a few bytes to Weaviate. In this case, the existing tiered compaction strategy makes more sense.
Configurability of buckets
*/
func WithForceCompation(opt bool) BucketOption {
return func(b *Bucket) error {
b.forceCompaction = opt
return nil
}
}