KevinStephenson
Adding in weaviate code
b110593
raw
history blame
3.72 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package lsmkv
import (
"bufio"
"encoding/binary"
"os"
"sync/atomic"
"github.com/pkg/errors"
"github.com/weaviate/weaviate/adapters/repos/db/lsmkv/roaringset"
)
type commitLogger struct {
file *os.File
writer *bufio.Writer
n atomic.Int64
path string
// e.g. when recovering from an existing log, we do not want to write into a
// new log again
paused bool
}
type CommitType uint16
const (
CommitTypeReplace CommitType = iota // replace strategy
// collection strategy - this can handle all cases as updates and deletes are
// only appends in a collection strategy
CommitTypeCollection
CommitTypeRoaringSet
)
func (ct CommitType) String() string {
switch ct {
case CommitTypeReplace:
return "replace"
case CommitTypeCollection:
return "collection"
case CommitTypeRoaringSet:
return "roaringset"
default:
return "unknown"
}
}
func (ct CommitType) Is(checkedCommitType CommitType) bool {
return ct == checkedCommitType
}
func newCommitLogger(path string) (*commitLogger, error) {
out := &commitLogger{
path: path + ".wal",
}
f, err := os.Create(out.path)
if err != nil {
return nil, err
}
out.file = f
out.writer = bufio.NewWriter(f)
return out, nil
}
func (cl *commitLogger) put(node segmentReplaceNode) error {
if cl.paused {
return nil
}
// TODO: do we need a timestamp? if so, does it need to be a vector clock?
if err := binary.Write(cl.writer, binary.LittleEndian, CommitTypeReplace); err != nil {
return err
}
n := 1
if ki, err := node.KeyIndexAndWriteTo(cl.writer); err != nil {
return err
} else {
n += ki.ValueEnd - ki.ValueStart
}
cl.n.Add(int64(n))
return nil
}
func (cl *commitLogger) append(node segmentCollectionNode) error {
if cl.paused {
return nil
}
// TODO: do we need a timestamp? if so, does it need to be a vector clock?
if err := binary.Write(cl.writer, binary.LittleEndian, CommitTypeCollection); err != nil {
return err
}
n := 1
if ki, err := node.KeyIndexAndWriteTo(cl.writer); err != nil {
return err
} else {
n += ki.ValueEnd - ki.ValueStart
}
cl.n.Add(int64(n))
return nil
}
func (cl *commitLogger) add(node *roaringset.SegmentNode) error {
if cl.paused {
return nil
}
if err := binary.Write(cl.writer, binary.LittleEndian, CommitTypeRoaringSet); err != nil {
return err
}
n := 1
if ki, err := node.KeyIndexAndWriteTo(cl.writer, 0); err != nil {
return err
} else {
n += ki.ValueEnd - ki.ValueStart
}
cl.n.Add(int64(n))
return nil
}
// Size returns the amount of data that has been written since the commit
// logger was initialized. After a flush a new logger is initialized which
// automatically resets the logger.
func (cl *commitLogger) Size() int64 {
return cl.n.Load()
}
func (cl *commitLogger) close() error {
if cl.paused {
return errors.Errorf("attempting to close a paused commit logger")
}
if err := cl.writer.Flush(); err != nil {
return err
}
return cl.file.Close()
}
func (cl *commitLogger) pause() {
cl.paused = true
}
func (cl *commitLogger) unpause() {
cl.paused = false
}
func (cl *commitLogger) delete() error {
return os.Remove(cl.path)
}
func (cl *commitLogger) flushBuffers() error {
return cl.writer.Flush()
}