Spaces:
Running
Running
// _ _ | |
// __ _____ __ ___ ___ __ _| |_ ___ | |
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
// \ V V / __/ (_| |\ V /| | (_| | || __/ | |
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
// | |
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. | |
// | |
// CONTACT: [email protected] | |
// | |
package hnsw | |
import ( | |
"context" | |
"fmt" | |
"os" | |
"path/filepath" | |
"sort" | |
"strconv" | |
"strings" | |
"sync" | |
"time" | |
"github.com/pkg/errors" | |
"github.com/sirupsen/logrus" | |
"github.com/weaviate/weaviate/adapters/repos/db/vector/compressionhelpers" | |
"github.com/weaviate/weaviate/adapters/repos/db/vector/hnsw/commitlog" | |
"github.com/weaviate/weaviate/entities/cyclemanager" | |
"github.com/weaviate/weaviate/entities/errorcompounder" | |
) | |
const defaultCommitLogSize = 500 * 1024 * 1024 | |
func commitLogFileName(rootPath, indexName, fileName string) string { | |
return fmt.Sprintf("%s/%s", commitLogDirectory(rootPath, indexName), fileName) | |
} | |
func commitLogDirectory(rootPath, name string) string { | |
return fmt.Sprintf("%s/%s.hnsw.commitlog.d", rootPath, name) | |
} | |
func NewCommitLogger(rootPath, name string, logger logrus.FieldLogger, | |
maintenanceCallbacks cyclemanager.CycleCallbackGroup, opts ...CommitlogOption, | |
) (*hnswCommitLogger, error) { | |
l := &hnswCommitLogger{ | |
rootPath: rootPath, | |
id: name, | |
condensor: NewMemoryCondensor(logger), | |
logger: logger, | |
// both can be overwritten using functional options | |
maxSizeIndividual: defaultCommitLogSize / 5, | |
maxSizeCombining: defaultCommitLogSize, | |
} | |
for _, o := range opts { | |
if err := o(l); err != nil { | |
return nil, err | |
} | |
} | |
fd, err := getLatestCommitFileOrCreate(rootPath, name) | |
if err != nil { | |
return nil, err | |
} | |
id := func(elems ...string) string { | |
elems = append([]string{"commit_logger"}, elems...) | |
elems = append(elems, l.id) | |
return strings.Join(elems, "/") | |
} | |
l.commitLogger = commitlog.NewLoggerWithFile(fd) | |
l.switchLogsCallbackCtrl = maintenanceCallbacks.Register(id("switch_logs"), l.startSwitchLogs) | |
l.condenseLogsCallbackCtrl = maintenanceCallbacks.Register(id("condense_logs"), l.startCombineAndCondenseLogs) | |
return l, nil | |
} | |
func getLatestCommitFileOrCreate(rootPath, name string) (*os.File, error) { | |
dir := commitLogDirectory(rootPath, name) | |
err := os.MkdirAll(dir, os.ModePerm) | |
if err != nil { | |
return nil, errors.Wrap(err, "create commit logger directory") | |
} | |
fileName, ok, err := getCurrentCommitLogFileName(dir) | |
if err != nil { | |
return nil, errors.Wrap(err, "find commit logger file in directory") | |
} | |
if !ok { | |
// this is a new commit log, initialize with the current time stamp | |
fileName = fmt.Sprintf("%d", time.Now().Unix()) | |
} | |
fd, err := os.OpenFile(commitLogFileName(rootPath, name, fileName), | |
os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0o666) | |
if err != nil { | |
return nil, errors.Wrap(err, "create commit log file") | |
} | |
return fd, nil | |
} | |
// getCommitFileNames in order, from old to new | |
func getCommitFileNames(rootPath, name string) ([]string, error) { | |
dir := commitLogDirectory(rootPath, name) | |
err := os.MkdirAll(dir, os.ModePerm) | |
if err != nil { | |
return nil, errors.Wrap(err, "create commit logger directory") | |
} | |
files, err := os.ReadDir(dir) | |
if err != nil { | |
return nil, errors.Wrap(err, "browse commit logger directory") | |
} | |
files = removeTmpScratchOrHiddenFiles(files) | |
files, err = removeTmpCombiningFiles(dir, files) | |
if err != nil { | |
return nil, errors.Wrap(err, "remove temporary files") | |
} | |
if len(files) == 0 { | |
return nil, nil | |
} | |
ec := &errorcompounder.ErrorCompounder{} | |
sort.Slice(files, func(a, b int) bool { | |
ts1, err := asTimeStamp(files[a].Name()) | |
if err != nil { | |
ec.Add(err) | |
} | |
ts2, err := asTimeStamp(files[b].Name()) | |
if err != nil { | |
ec.Add(err) | |
} | |
return ts1 < ts2 | |
}) | |
if err := ec.ToError(); err != nil { | |
return nil, err | |
} | |
out := make([]string, len(files)) | |
for i, file := range files { | |
out[i] = commitLogFileName(rootPath, name, file.Name()) | |
} | |
return out, nil | |
} | |
// getCurrentCommitLogFileName returns the fileName and true if a file was | |
// present. If no file was present, the second arg is false. | |
func getCurrentCommitLogFileName(dirPath string) (string, bool, error) { | |
files, err := os.ReadDir(dirPath) | |
if err != nil { | |
return "", false, errors.Wrap(err, "browse commit logger directory") | |
} | |
if len(files) == 0 { | |
return "", false, nil | |
} | |
files = removeTmpScratchOrHiddenFiles(files) | |
files, err = removeTmpCombiningFiles(dirPath, files) | |
if err != nil { | |
return "", false, errors.Wrap(err, "clean up tmp combining files") | |
} | |
ec := &errorcompounder.ErrorCompounder{} | |
sort.Slice(files, func(a, b int) bool { | |
ts1, err := asTimeStamp(files[a].Name()) | |
if err != nil { | |
ec.Add(err) | |
} | |
ts2, err := asTimeStamp(files[b].Name()) | |
if err != nil { | |
ec.Add(err) | |
} | |
return ts1 > ts2 | |
}) | |
if err := ec.ToError(); err != nil { | |
return "", false, err | |
} | |
return files[0].Name(), true, nil | |
} | |
func removeTmpScratchOrHiddenFiles(in []os.DirEntry) []os.DirEntry { | |
out := make([]os.DirEntry, len(in)) | |
i := 0 | |
for _, info := range in { | |
if strings.HasSuffix(info.Name(), ".scratch.tmp") { | |
continue | |
} | |
if strings.HasPrefix(info.Name(), ".") { | |
continue | |
} | |
out[i] = info | |
i++ | |
} | |
return out[:i] | |
} | |
func removeTmpCombiningFiles(dirPath string, | |
in []os.DirEntry, | |
) ([]os.DirEntry, error) { | |
out := make([]os.DirEntry, len(in)) | |
i := 0 | |
for _, info := range in { | |
if strings.HasSuffix(info.Name(), ".combined.tmp") { | |
// a temporary combining file was found which means that the combining | |
// process never completed, this file is thus considered corrupt (too | |
// short) and must be deleted. The original sources still exist (because | |
// the only get deleted after the .tmp file is removed), so it's safe to | |
// delete this without data loss. | |
if err := os.Remove(filepath.Join(dirPath, info.Name())); err != nil { | |
return out, errors.Wrap(err, "remove tmp combining file") | |
} | |
continue | |
} | |
out[i] = info | |
i++ | |
} | |
return out[:i], nil | |
} | |
func asTimeStamp(in string) (int64, error) { | |
return strconv.ParseInt(strings.TrimSuffix(in, ".condensed"), 10, 64) | |
} | |
type condensor interface { | |
Do(filename string) error | |
} | |
type hnswCommitLogger struct { | |
// protect against concurrent attempts to write in the underlying file or | |
// buffer | |
sync.Mutex | |
rootPath string | |
id string | |
condensor condensor | |
logger logrus.FieldLogger | |
maxSizeIndividual int64 | |
maxSizeCombining int64 | |
commitLogger *commitlog.Logger | |
switchLogsCallbackCtrl cyclemanager.CycleCallbackCtrl | |
condenseLogsCallbackCtrl cyclemanager.CycleCallbackCtrl | |
} | |
type HnswCommitType uint8 // 256 options, plenty of room for future extensions | |
const ( | |
AddNode HnswCommitType = iota | |
SetEntryPointMaxLevel | |
AddLinkAtLevel | |
ReplaceLinksAtLevel | |
AddTombstone | |
RemoveTombstone | |
ClearLinks | |
DeleteNode | |
ResetIndex | |
ClearLinksAtLevel // added in v1.8.0-rc.1, see https://github.com/weaviate/weaviate/issues/1701 | |
AddLinksAtLevel // added in v1.8.0-rc.1, see https://github.com/weaviate/weaviate/issues/1705 | |
AddPQ | |
) | |
func (t HnswCommitType) String() string { | |
switch t { | |
case AddNode: | |
return "AddNode" | |
case SetEntryPointMaxLevel: | |
return "SetEntryPointWithMaxLayer" | |
case AddLinkAtLevel: | |
return "AddLinkAtLevel" | |
case AddLinksAtLevel: | |
return "AddLinksAtLevel" | |
case ReplaceLinksAtLevel: | |
return "ReplaceLinksAtLevel" | |
case AddTombstone: | |
return "AddTombstone" | |
case RemoveTombstone: | |
return "RemoveTombstone" | |
case ClearLinks: | |
return "ClearLinks" | |
case DeleteNode: | |
return "DeleteNode" | |
case ResetIndex: | |
return "ResetIndex" | |
case ClearLinksAtLevel: | |
return "ClearLinksAtLevel" | |
case AddPQ: | |
return "AddProductQuantizer" | |
} | |
return "unknown commit type" | |
} | |
func (l *hnswCommitLogger) ID() string { | |
return l.id | |
} | |
func (l *hnswCommitLogger) AddPQ(data compressionhelpers.PQData) error { | |
l.Lock() | |
defer l.Unlock() | |
return l.commitLogger.AddPQ(data) | |
} | |
// AddNode adds an empty node | |
func (l *hnswCommitLogger) AddNode(node *vertex) error { | |
l.Lock() | |
defer l.Unlock() | |
return l.commitLogger.AddNode(node.id, node.level) | |
} | |
func (l *hnswCommitLogger) SetEntryPointWithMaxLayer(id uint64, level int) error { | |
l.Lock() | |
defer l.Unlock() | |
return l.commitLogger.SetEntryPointWithMaxLayer(id, level) | |
} | |
func (l *hnswCommitLogger) ReplaceLinksAtLevel(nodeid uint64, level int, targets []uint64) error { | |
l.Lock() | |
defer l.Unlock() | |
return l.commitLogger.ReplaceLinksAtLevel(nodeid, level, targets) | |
} | |
func (l *hnswCommitLogger) AddLinkAtLevel(nodeid uint64, level int, | |
target uint64, | |
) error { | |
l.Lock() | |
defer l.Unlock() | |
return l.commitLogger.AddLinkAtLevel(nodeid, level, target) | |
} | |
func (l *hnswCommitLogger) AddTombstone(nodeid uint64) error { | |
l.Lock() | |
defer l.Unlock() | |
return l.commitLogger.AddTombstone(nodeid) | |
} | |
func (l *hnswCommitLogger) RemoveTombstone(nodeid uint64) error { | |
l.Lock() | |
defer l.Unlock() | |
return l.commitLogger.RemoveTombstone(nodeid) | |
} | |
func (l *hnswCommitLogger) ClearLinks(nodeid uint64) error { | |
l.Lock() | |
defer l.Unlock() | |
return l.commitLogger.ClearLinks(nodeid) | |
} | |
func (l *hnswCommitLogger) ClearLinksAtLevel(nodeid uint64, level uint16) error { | |
l.Lock() | |
defer l.Unlock() | |
return l.commitLogger.ClearLinksAtLevel(nodeid, level) | |
} | |
func (l *hnswCommitLogger) DeleteNode(nodeid uint64) error { | |
l.Lock() | |
defer l.Unlock() | |
return l.commitLogger.DeleteNode(nodeid) | |
} | |
func (l *hnswCommitLogger) Reset() error { | |
l.Lock() | |
defer l.Unlock() | |
return l.commitLogger.Reset() | |
} | |
// Shutdown waits for ongoing maintenance processes to stop, then cancels their | |
// scheduling. The caller can be sure that state on disk is immutable after | |
// calling Shutdown(). | |
func (l *hnswCommitLogger) Shutdown(ctx context.Context) error { | |
if err := l.switchLogsCallbackCtrl.Unregister(ctx); err != nil { | |
return errors.Wrap(err, "failed to unregister commitlog switch from maintenance cycle") | |
} | |
if err := l.condenseLogsCallbackCtrl.Unregister(ctx); err != nil { | |
return errors.Wrap(err, "failed to unregister commitlog condense from maintenance cycle") | |
} | |
return nil | |
} | |
func (l *hnswCommitLogger) RootPath() string { | |
return l.rootPath | |
} | |
func (l *hnswCommitLogger) startSwitchLogs(shouldAbort cyclemanager.ShouldAbortCallback) bool { | |
executed, err := l.switchCommitLogs(false) | |
if err != nil { | |
l.logger.WithError(err). | |
WithField("action", "hnsw_commit_log_maintenance"). | |
Error("hnsw commit log maintenance failed") | |
} | |
return executed | |
} | |
func (l *hnswCommitLogger) startCombineAndCondenseLogs(shouldAbort cyclemanager.ShouldAbortCallback) bool { | |
executed1, err := l.combineLogs() | |
if err != nil { | |
l.logger.WithError(err). | |
WithField("action", "hnsw_commit_log_combining"). | |
Error("hnsw commit log maintenance (combining) failed") | |
} | |
executed2, err := l.condenseOldLogs() | |
if err != nil { | |
l.logger.WithError(err). | |
WithField("action", "hnsw_commit_log_condensing"). | |
Error("hnsw commit log maintenance (condensing) failed") | |
} | |
return executed1 || executed2 | |
} | |
func (l *hnswCommitLogger) SwitchCommitLogs(force bool) error { | |
_, err := l.switchCommitLogs(force) | |
return err | |
} | |
func (l *hnswCommitLogger) switchCommitLogs(force bool) (bool, error) { | |
l.Lock() | |
defer l.Unlock() | |
size, err := l.commitLogger.FileSize() | |
if err != nil { | |
return false, err | |
} | |
if size <= l.maxSizeIndividual && !force { | |
return false, nil | |
} | |
oldFileName, err := l.commitLogger.FileName() | |
if err != nil { | |
return false, err | |
} | |
if err := l.commitLogger.Close(); err != nil { | |
return true, err | |
} | |
// this is a new commit log, initialize with the current time stamp | |
fileName := fmt.Sprintf("%d", time.Now().Unix()) | |
if force { | |
l.logger.WithField("action", "commit_log_file_switched"). | |
WithField("id", l.id). | |
WithField("old_file_name", oldFileName). | |
WithField("old_file_size", size). | |
WithField("new_file_name", fileName). | |
Debug("commit log switched forced") | |
} else { | |
l.logger.WithField("action", "commit_log_file_switched"). | |
WithField("id", l.id). | |
WithField("old_file_name", oldFileName). | |
WithField("old_file_size", size). | |
WithField("new_file_name", fileName). | |
Info("commit log size crossed threshold, switching to new file") | |
} | |
fd, err := os.OpenFile(commitLogFileName(l.rootPath, l.id, fileName), | |
os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0o666) | |
if err != nil { | |
return true, errors.Wrap(err, "create commit log file") | |
} | |
l.commitLogger = commitlog.NewLoggerWithFile(fd) | |
return true, nil | |
} | |
func (l *hnswCommitLogger) condenseOldLogs() (bool, error) { | |
files, err := getCommitFileNames(l.rootPath, l.id) | |
if err != nil { | |
return false, err | |
} | |
if len(files) <= 1 { | |
// if there are no files there is nothing to do | |
// if there is only a single file, it must still be in use, we can't do | |
// anything yet | |
return false, nil | |
} | |
// cut off last element, as that's never a candidate | |
candidates := files[:len(files)-1] | |
for _, candidate := range candidates { | |
if strings.HasSuffix(candidate, ".condensed") { | |
// don't attempt to condense logs which are already condensed | |
continue | |
} | |
return true, l.condensor.Do(candidate) | |
} | |
return false, nil | |
} | |
func (l *hnswCommitLogger) combineLogs() (bool, error) { | |
// maxSize is the desired final size, since we assume a lot of redundancy we | |
// can set the combining threshold higher than the final threshold under the | |
// assumption that the combined file will be considerably smaller than the | |
// sum of both input files | |
threshold := int64(float64(l.maxSizeCombining) * 1.75) | |
return NewCommitLogCombiner(l.rootPath, l.id, threshold, l.logger).Do() | |
} | |
func (l *hnswCommitLogger) Drop(ctx context.Context) error { | |
if err := l.commitLogger.Close(); err != nil { | |
return errors.Wrap(err, "close hnsw commit logger prior to delete") | |
} | |
// stop all goroutines | |
if err := l.Shutdown(ctx); err != nil { | |
return errors.Wrap(err, "drop commitlog") | |
} | |
// remove commit log directory if exists | |
dir := commitLogDirectory(l.rootPath, l.id) | |
if _, err := os.Stat(dir); err == nil { | |
err := os.RemoveAll(dir) | |
if err != nil { | |
return errors.Wrap(err, "delete commit files directory") | |
} | |
} | |
return nil | |
} | |
func (l *hnswCommitLogger) Flush() error { | |
l.Lock() | |
defer l.Unlock() | |
return l.commitLogger.Flush() | |
} | |