KevinStephenson
Adding in weaviate code
b110593
raw
history blame
5.5 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package hnsw
import (
"io"
"os"
"strings"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
type CommitLogCombiner struct {
rootPath string
id string
threshold int64
logger logrus.FieldLogger
}
func NewCommitLogCombiner(rootPath, id string, threshold int64,
logger logrus.FieldLogger,
) *CommitLogCombiner {
return &CommitLogCombiner{
rootPath: rootPath,
id: id,
threshold: threshold,
logger: logger,
}
}
func (c *CommitLogCombiner) Do() (bool, error) {
executed := false
for {
// fileNames will already be in order
fileNames, err := getCommitFileNames(c.rootPath, c.id)
if err != nil {
return executed, errors.Wrap(err, "obtain files names")
}
ok, err := c.combineFirstMatch(fileNames)
if err != nil {
return executed, err
}
if ok {
executed = true
continue
}
break
}
return executed, nil
}
func (c *CommitLogCombiner) combineFirstMatch(fileNames []string) (bool, error) {
for i, fileName := range fileNames {
if !strings.HasSuffix(fileName, ".condensed") {
// not an already condensed file, so no candidate for combining
continue
}
if i == len(fileNames)-1 {
// this is the last file, so there is nothing to combine it with
return false, nil
}
if !strings.HasSuffix(fileNames[i+1], ".condensed") {
// the next file is not a condensed file, so this file is not candidate
// for merging with the next
continue
}
currentStat, err := os.Stat(fileName)
if err != nil {
return false, errors.Wrapf(err, "stat file %q", fileName)
}
if currentStat.Size() > c.threshold {
// already too big, can't combine further
continue
}
nextStat, err := os.Stat(fileNames[i+1])
if err != nil {
return false, errors.Wrapf(err, "stat file %q", fileNames[i+1])
}
if currentStat.Size()+nextStat.Size() > c.threshold {
// combining those two would exceed threshold
continue
}
if err := c.combine(fileName, fileNames[i+1]); err != nil {
return false, errors.Wrapf(err, "combine %q and %q", fileName, fileNames[i+1])
}
return true, nil
}
return false, nil
}
func (c *CommitLogCombiner) combine(first, second string) error {
// all names are based on the first file, so that once file1 + file2 are
// combined it is as if file2 had never existed and file 1 was just always
// big enough to hold the contents of both
// clearly indicate that the file is "in progress", in case we crash while
// combining and the after restart there are multiple alternatives
tmpName := strings.TrimSuffix(first, ".condensed") + (".combined.tmp")
// finalName will look like an uncondensed original commit log, so the
// condensor will pick it up without even knowing that it's a combined file
finalName := strings.TrimSuffix(first, ".condensed")
if err := c.mergeFiles(tmpName, first, second); err != nil {
return errors.Wrap(err, "merge files")
}
if err := c.renameAndCleanUp(tmpName, finalName, first, second); err != nil {
return errors.Wrap(err, "rename and clean up files")
}
c.logger.WithFields(logrus.Fields{
"action": "hnsw_commit_logger_combine_condensed_logs",
"id": c.id,
"input_first": first,
"input_second": second,
"output": finalName,
}).Info("successfully combined previously condensed commit log files")
return nil
}
func (c *CommitLogCombiner) mergeFiles(outName, first, second string) error {
out, err := os.Create(outName)
if err != nil {
return errors.Wrapf(err, "open target file %q", outName)
}
source1, err := os.Open(first)
if err != nil {
return errors.Wrapf(err, "open first source file %q", first)
}
defer source1.Close()
source2, err := os.Open(second)
if err != nil {
return errors.Wrapf(err, "open second source file %q", second)
}
defer source2.Close()
_, err = io.Copy(out, source1)
if err != nil {
return errors.Wrapf(err, "copy first source (%q) into target (%q)", first,
outName)
}
_, err = io.Copy(out, source2)
if err != nil {
return errors.Wrapf(err, "copy second source (%q) into target (%q)", second,
outName)
}
err = out.Close()
if err != nil {
return errors.Wrapf(err, "close target file %q", outName)
}
return nil
}
func (c *CommitLogCombiner) renameAndCleanUp(tmpName, finalName string,
toDeletes ...string,
) error {
// do the rename before the delete, because if we crash in between we end up
// with duplicate files both with and without the ".condensed" suffix. The
// new (and complete) merged file will not carry the suffix whereas the
// sources will. This will look to the corrupted file fixer as if a
// condensing had gone wrong and will delete the the source
if err := os.Rename(tmpName, finalName); err != nil {
return errors.Wrapf(err, "rename tmp (%q) to final (%q)", tmpName, finalName)
}
for _, toDelete := range toDeletes {
if err := os.Remove(toDelete); err != nil {
return errors.Wrapf(err, "clean up %q", toDelete)
}
}
return nil
}