Spaces:
Running
Running
File size: 6,120 Bytes
b110593 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package db
import (
"strings"
"time"
"github.com/weaviate/weaviate/entities/cyclemanager"
"github.com/weaviate/weaviate/entities/vectorindex/hnsw"
enthnsw "github.com/weaviate/weaviate/entities/vectorindex/hnsw"
)
type indexCycleCallbacks struct {
compactionCallbacks cyclemanager.CycleCallbackGroup
compactionCycle cyclemanager.CycleManager
flushCallbacks cyclemanager.CycleCallbackGroup
flushCycle cyclemanager.CycleManager
vectorCommitLoggerCallbacks cyclemanager.CycleCallbackGroup
vectorCommitLoggerCycle cyclemanager.CycleManager
vectorTombstoneCleanupCallbacks cyclemanager.CycleCallbackGroup
vectorTombstoneCleanupCycle cyclemanager.CycleManager
geoPropsCommitLoggerCallbacks cyclemanager.CycleCallbackGroup
geoPropsCommitLoggerCycle cyclemanager.CycleManager
geoPropsTombstoneCleanupCallbacks cyclemanager.CycleCallbackGroup
geoPropsTombstoneCleanupCycle cyclemanager.CycleManager
}
func (index *Index) initCycleCallbacks() {
vectorTombstoneCleanupIntervalSeconds := hnsw.DefaultCleanupIntervalSeconds
if hnswUserConfig, ok := index.vectorIndexUserConfig.(hnsw.UserConfig); ok {
vectorTombstoneCleanupIntervalSeconds = hnswUserConfig.CleanupIntervalSeconds
}
id := func(elems ...string) string {
elems = append([]string{"index", index.ID()}, elems...)
return strings.Join(elems, "/")
}
compactionCallbacks := cyclemanager.NewCallbackGroup(id("compaction"), index.logger, _NUMCPU*2)
compactionCycle := cyclemanager.NewManager(
cyclemanager.CompactionCycleTicker(),
compactionCallbacks.CycleCallback)
flushCallbacks := cyclemanager.NewCallbackGroup(id("flush"), index.logger, _NUMCPU*2)
flushCycle := cyclemanager.NewManager(
cyclemanager.MemtableFlushCycleTicker(),
flushCallbacks.CycleCallback)
vectorCommitLoggerCallbacks := cyclemanager.NewCallbackGroup(id("vector", "commit_logger"), index.logger, _NUMCPU*2)
// Previously we had an interval of 10s in here, which was changed to
// 0.5s as part of gh-1867. There's really no way to wait so long in
// between checks: If you are running on a low-powered machine, the
// interval will simply find that there is no work and do nothing in
// each iteration. However, if you are running on a very powerful
// machine within 10s you could have potentially created two units of
// work, but we'll only be handling one every 10s. This means
// uncombined/uncondensed hnsw commit logs will keep piling up can only
// be processes long after the initial insert is complete. This also
// means that if there is a crash during importing a lot of work needs
// to be done at startup, since the commit logs still contain too many
// redundancies. So as of now it seems there are only advantages to
// running the cleanup checks and work much more often.
//
// update: switched to dynamic intervals with values between 500ms and 10s
// introduced to address https://github.com/weaviate/weaviate/issues/2783
vectorCommitLoggerCycle := cyclemanager.NewManager(
cyclemanager.HnswCommitLoggerCycleTicker(),
vectorCommitLoggerCallbacks.CycleCallback)
vectorTombstoneCleanupCallbacks := cyclemanager.NewCallbackGroup(id("vector", "tombstone_cleanup"), index.logger, _NUMCPU*2)
vectorTombstoneCleanupCycle := cyclemanager.NewManager(
cyclemanager.NewFixedTicker(time.Duration(vectorTombstoneCleanupIntervalSeconds)*time.Second),
vectorTombstoneCleanupCallbacks.CycleCallback)
geoPropsCommitLoggerCallbacks := cyclemanager.NewCallbackGroup(id("geo_props", "commit_logger"), index.logger, _NUMCPU*2)
geoPropsCommitLoggerCycle := cyclemanager.NewManager(
cyclemanager.GeoCommitLoggerCycleTicker(),
geoPropsCommitLoggerCallbacks.CycleCallback)
geoPropsTombstoneCleanupCallbacks := cyclemanager.NewCallbackGroup(id("geo_props", "tombstone_cleanup"), index.logger, _NUMCPU*2)
geoPropsTombstoneCleanupCycle := cyclemanager.NewManager(
cyclemanager.NewFixedTicker(enthnsw.DefaultCleanupIntervalSeconds*time.Second),
geoPropsTombstoneCleanupCallbacks.CycleCallback)
index.cycleCallbacks = &indexCycleCallbacks{
compactionCallbacks: compactionCallbacks,
compactionCycle: compactionCycle,
flushCallbacks: flushCallbacks,
flushCycle: flushCycle,
vectorCommitLoggerCallbacks: vectorCommitLoggerCallbacks,
vectorCommitLoggerCycle: vectorCommitLoggerCycle,
vectorTombstoneCleanupCallbacks: vectorTombstoneCleanupCallbacks,
vectorTombstoneCleanupCycle: vectorTombstoneCleanupCycle,
geoPropsCommitLoggerCallbacks: geoPropsCommitLoggerCallbacks,
geoPropsCommitLoggerCycle: geoPropsCommitLoggerCycle,
geoPropsTombstoneCleanupCallbacks: geoPropsTombstoneCleanupCallbacks,
geoPropsTombstoneCleanupCycle: geoPropsTombstoneCleanupCycle,
}
}
func (index *Index) initCycleCallbacksNoop() {
index.cycleCallbacks = &indexCycleCallbacks{
compactionCallbacks: cyclemanager.NewCallbackGroupNoop(),
compactionCycle: cyclemanager.NewManagerNoop(),
flushCallbacks: cyclemanager.NewCallbackGroupNoop(),
flushCycle: cyclemanager.NewManagerNoop(),
vectorCommitLoggerCallbacks: cyclemanager.NewCallbackGroupNoop(),
vectorCommitLoggerCycle: cyclemanager.NewManagerNoop(),
vectorTombstoneCleanupCallbacks: cyclemanager.NewCallbackGroupNoop(),
vectorTombstoneCleanupCycle: cyclemanager.NewManagerNoop(),
geoPropsCommitLoggerCallbacks: cyclemanager.NewCallbackGroupNoop(),
geoPropsCommitLoggerCycle: cyclemanager.NewManagerNoop(),
geoPropsTombstoneCleanupCallbacks: cyclemanager.NewCallbackGroupNoop(),
geoPropsTombstoneCleanupCycle: cyclemanager.NewManagerNoop(),
}
}
|