Spaces:
Running
Running
// _ _ | |
// __ _____ __ ___ ___ __ _| |_ ___ | |
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
// \ V V / __/ (_| |\ V /| | (_| | || __/ | |
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
// | |
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. | |
// | |
// CONTACT: [email protected] | |
// | |
package lsmkv | |
import ( | |
"bufio" | |
"encoding/binary" | |
"os" | |
"sync/atomic" | |
"github.com/pkg/errors" | |
"github.com/weaviate/weaviate/adapters/repos/db/lsmkv/roaringset" | |
) | |
type commitLogger struct { | |
file *os.File | |
writer *bufio.Writer | |
n atomic.Int64 | |
path string | |
// e.g. when recovering from an existing log, we do not want to write into a | |
// new log again | |
paused bool | |
} | |
type CommitType uint16 | |
const ( | |
CommitTypeReplace CommitType = iota // replace strategy | |
// collection strategy - this can handle all cases as updates and deletes are | |
// only appends in a collection strategy | |
CommitTypeCollection | |
CommitTypeRoaringSet | |
) | |
func (ct CommitType) String() string { | |
switch ct { | |
case CommitTypeReplace: | |
return "replace" | |
case CommitTypeCollection: | |
return "collection" | |
case CommitTypeRoaringSet: | |
return "roaringset" | |
default: | |
return "unknown" | |
} | |
} | |
func (ct CommitType) Is(checkedCommitType CommitType) bool { | |
return ct == checkedCommitType | |
} | |
func newCommitLogger(path string) (*commitLogger, error) { | |
out := &commitLogger{ | |
path: path + ".wal", | |
} | |
f, err := os.Create(out.path) | |
if err != nil { | |
return nil, err | |
} | |
out.file = f | |
out.writer = bufio.NewWriter(f) | |
return out, nil | |
} | |
func (cl *commitLogger) put(node segmentReplaceNode) error { | |
if cl.paused { | |
return nil | |
} | |
// TODO: do we need a timestamp? if so, does it need to be a vector clock? | |
if err := binary.Write(cl.writer, binary.LittleEndian, CommitTypeReplace); err != nil { | |
return err | |
} | |
n := 1 | |
if ki, err := node.KeyIndexAndWriteTo(cl.writer); err != nil { | |
return err | |
} else { | |
n += ki.ValueEnd - ki.ValueStart | |
} | |
cl.n.Add(int64(n)) | |
return nil | |
} | |
func (cl *commitLogger) append(node segmentCollectionNode) error { | |
if cl.paused { | |
return nil | |
} | |
// TODO: do we need a timestamp? if so, does it need to be a vector clock? | |
if err := binary.Write(cl.writer, binary.LittleEndian, CommitTypeCollection); err != nil { | |
return err | |
} | |
n := 1 | |
if ki, err := node.KeyIndexAndWriteTo(cl.writer); err != nil { | |
return err | |
} else { | |
n += ki.ValueEnd - ki.ValueStart | |
} | |
cl.n.Add(int64(n)) | |
return nil | |
} | |
func (cl *commitLogger) add(node *roaringset.SegmentNode) error { | |
if cl.paused { | |
return nil | |
} | |
if err := binary.Write(cl.writer, binary.LittleEndian, CommitTypeRoaringSet); err != nil { | |
return err | |
} | |
n := 1 | |
if ki, err := node.KeyIndexAndWriteTo(cl.writer, 0); err != nil { | |
return err | |
} else { | |
n += ki.ValueEnd - ki.ValueStart | |
} | |
cl.n.Add(int64(n)) | |
return nil | |
} | |
// Size returns the amount of data that has been written since the commit | |
// logger was initialized. After a flush a new logger is initialized which | |
// automatically resets the logger. | |
func (cl *commitLogger) Size() int64 { | |
return cl.n.Load() | |
} | |
func (cl *commitLogger) close() error { | |
if cl.paused { | |
return errors.Errorf("attempting to close a paused commit logger") | |
} | |
if err := cl.writer.Flush(); err != nil { | |
return err | |
} | |
return cl.file.Close() | |
} | |
func (cl *commitLogger) pause() { | |
cl.paused = true | |
} | |
func (cl *commitLogger) unpause() { | |
cl.paused = false | |
} | |
func (cl *commitLogger) delete() error { | |
return os.Remove(cl.path) | |
} | |
func (cl *commitLogger) flushBuffers() error { | |
return cl.writer.Flush() | |
} | |