SemanticSearchPOC / adapters /repos /db /lsmkv /compactor_replace.go
KevinStephenson
Adding in weaviate code
b110593
raw
history blame
6.18 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package lsmkv
import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
"github.com/weaviate/weaviate/adapters/repos/db/lsmkv/segmentindex"
"github.com/weaviate/weaviate/entities/lsmkv"
)
type compactorReplace struct {
// c1 is always the older segment, so when there is a conflict c2 wins
// (because of the replace strategy)
c1 *segmentCursorReplace
c2 *segmentCursorReplace
// the level matching those of the cursors
currentLevel uint16
// Tells if tombstones or keys without corresponding values
// can be removed from merged segment.
// (left segment is root (1st) one, keepTombstones is off for bucket)
cleanupTombstones bool
secondaryIndexCount uint16
w io.WriteSeeker
bufw *bufio.Writer
scratchSpacePath string
}
func newCompactorReplace(w io.WriteSeeker,
c1, c2 *segmentCursorReplace, level, secondaryIndexCount uint16,
scratchSpacePath string, cleanupTombstones bool,
) *compactorReplace {
return &compactorReplace{
c1: c1,
c2: c2,
w: w,
bufw: bufio.NewWriterSize(w, 256*1024),
currentLevel: level,
cleanupTombstones: cleanupTombstones,
secondaryIndexCount: secondaryIndexCount,
scratchSpacePath: scratchSpacePath,
}
}
func (c *compactorReplace) do() error {
if err := c.init(); err != nil {
return fmt.Errorf("init: %w", err)
}
kis, err := c.writeKeys()
if err != nil {
return fmt.Errorf("write keys: %w", err)
}
if err := c.writeIndices(kis); err != nil {
return fmt.Errorf("write indices: %w", err)
}
// flush buffered, so we can safely seek on underlying writer
if err := c.bufw.Flush(); err != nil {
return fmt.Errorf("flush buffered: %w", err)
}
var dataEnd uint64 = segmentindex.HeaderSize
if len(kis) > 0 {
dataEnd = uint64(kis[len(kis)-1].ValueEnd)
}
if err := c.writeHeader(c.currentLevel, 0, c.secondaryIndexCount, dataEnd); err != nil {
return fmt.Errorf("write header: %w", err)
}
return nil
}
func (c *compactorReplace) init() error {
// write a dummy header, we don't know the contents of the actual header yet,
// we will seek to the beginning and overwrite the actual header at the very
// end
if _, err := c.bufw.Write(make([]byte, segmentindex.HeaderSize)); err != nil {
return fmt.Errorf("write empty header: %w", err)
}
return nil
}
func (c *compactorReplace) writeKeys() ([]segmentindex.Key, error) {
res1, err1 := c.c1.firstWithAllKeys()
res2, err2 := c.c2.firstWithAllKeys()
// the (dummy) header was already written, this is our initial offset
offset := segmentindex.HeaderSize
var kis []segmentindex.Key
for {
if res1.primaryKey == nil && res2.primaryKey == nil {
break
}
if bytes.Equal(res1.primaryKey, res2.primaryKey) {
if !(c.cleanupTombstones && errors.Is(err2, lsmkv.Deleted)) {
ki, err := c.writeIndividualNode(offset, res2.primaryKey, res2.value,
res2.secondaryKeys, errors.Is(err2, lsmkv.Deleted))
if err != nil {
return nil, fmt.Errorf("write individual node (equal keys): %w", err)
}
offset = ki.ValueEnd
kis = append(kis, ki)
}
// advance both!
res1, err1 = c.c1.nextWithAllKeys()
res2, err2 = c.c2.nextWithAllKeys()
continue
}
if (res1.primaryKey != nil && bytes.Compare(res1.primaryKey, res2.primaryKey) == -1) || res2.primaryKey == nil {
// key 1 is smaller
if !(c.cleanupTombstones && errors.Is(err1, lsmkv.Deleted)) {
ki, err := c.writeIndividualNode(offset, res1.primaryKey, res1.value,
res1.secondaryKeys, errors.Is(err1, lsmkv.Deleted))
if err != nil {
return nil, fmt.Errorf("write individual node (res1.primaryKey smaller)")
}
offset = ki.ValueEnd
kis = append(kis, ki)
}
res1, err1 = c.c1.nextWithAllKeys()
} else {
// key 2 is smaller
if !(c.cleanupTombstones && errors.Is(err2, lsmkv.Deleted)) {
ki, err := c.writeIndividualNode(offset, res2.primaryKey, res2.value,
res2.secondaryKeys, errors.Is(err2, lsmkv.Deleted))
if err != nil {
return nil, fmt.Errorf("write individual node (res2.primaryKey smaller): %w", err)
}
offset = ki.ValueEnd
kis = append(kis, ki)
}
res2, err2 = c.c2.nextWithAllKeys()
}
}
return kis, nil
}
func (c *compactorReplace) writeIndividualNode(offset int, key, value []byte,
secondaryKeys [][]byte, tombstone bool,
) (segmentindex.Key, error) {
segNode := segmentReplaceNode{
offset: offset,
tombstone: tombstone,
value: value,
primaryKey: key,
secondaryIndexCount: c.secondaryIndexCount,
secondaryKeys: secondaryKeys,
}
return segNode.KeyIndexAndWriteTo(c.bufw)
}
func (c *compactorReplace) writeIndices(keys []segmentindex.Key) error {
indices := &segmentindex.Indexes{
Keys: keys,
SecondaryIndexCount: c.secondaryIndexCount,
ScratchSpacePath: c.scratchSpacePath,
}
_, err := indices.WriteTo(c.bufw)
return err
}
// writeHeader assumes that everything has been written to the underlying
// writer and it is now safe to seek to the beginning and override the initial
// header
func (c *compactorReplace) writeHeader(level, version, secondaryIndices uint16,
startOfIndex uint64,
) error {
if _, err := c.w.Seek(0, io.SeekStart); err != nil {
return fmt.Errorf("seek to beginning to write header: %w", err)
}
h := &segmentindex.Header{
Level: level,
Version: version,
SecondaryIndices: secondaryIndices,
Strategy: segmentindex.StrategyReplace,
IndexStart: startOfIndex,
}
if _, err := h.WriteTo(c.w); err != nil {
return err
}
return nil
}