Spaces:
Running
Running
// _ _ | |
// __ _____ __ ___ ___ __ _| |_ ___ | |
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
// \ V V / __/ (_| |\ V /| | (_| | || __/ | |
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
// | |
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. | |
// | |
// CONTACT: [email protected] | |
// | |
package lsmkv | |
import ( | |
"bufio" | |
"bytes" | |
"io" | |
"sort" | |
"github.com/pkg/errors" | |
"github.com/weaviate/weaviate/adapters/repos/db/lsmkv/segmentindex" | |
) | |
type compactorMap struct { | |
// c1 is always the older segment, so when there is a conflict c2 wins | |
// (because of the replace strategy) | |
c1 *segmentCursorCollectionReusable | |
c2 *segmentCursorCollectionReusable | |
// the level matching those of the cursors | |
currentLevel uint16 | |
secondaryIndexCount uint16 | |
// Tells if tombstones or keys without corresponding values | |
// can be removed from merged segment. | |
// (left segment is root (1st) one, keepTombstones is off for bucket) | |
cleanupTombstones bool | |
w io.WriteSeeker | |
bufw *bufio.Writer | |
scratchSpacePath string | |
// for backward-compatibility with states where the disk state for maps was | |
// not guaranteed to be sorted yet | |
requiresSorting bool | |
} | |
func newCompactorMapCollection(w io.WriteSeeker, | |
c1, c2 *segmentCursorCollectionReusable, level, secondaryIndexCount uint16, | |
scratchSpacePath string, requiresSorting bool, cleanupTombstones bool, | |
) *compactorMap { | |
return &compactorMap{ | |
c1: c1, | |
c2: c2, | |
w: w, | |
bufw: bufio.NewWriterSize(w, 256*1024), | |
currentLevel: level, | |
cleanupTombstones: cleanupTombstones, | |
secondaryIndexCount: secondaryIndexCount, | |
scratchSpacePath: scratchSpacePath, | |
requiresSorting: requiresSorting, | |
} | |
} | |
func (c *compactorMap) do() error { | |
if err := c.init(); err != nil { | |
return errors.Wrap(err, "init") | |
} | |
kis, err := c.writeKeys() | |
if err != nil { | |
return errors.Wrap(err, "write keys") | |
} | |
if err := c.writeIndices(kis); err != nil { | |
return errors.Wrap(err, "write index") | |
} | |
// flush buffered, so we can safely seek on underlying writer | |
if err := c.bufw.Flush(); err != nil { | |
return errors.Wrap(err, "flush buffered") | |
} | |
var dataEnd uint64 = segmentindex.HeaderSize | |
if len(kis) > 0 { | |
dataEnd = uint64(kis[len(kis)-1].ValueEnd) | |
} | |
if err := c.writeHeader(c.currentLevel, 0, c.secondaryIndexCount, | |
dataEnd); err != nil { | |
return errors.Wrap(err, "write header") | |
} | |
return nil | |
} | |
func (c *compactorMap) init() error { | |
// write a dummy header, we don't know the contents of the actual header yet, | |
// we will seek to the beginning and overwrite the actual header at the very | |
// end | |
if _, err := c.bufw.Write(make([]byte, segmentindex.HeaderSize)); err != nil { | |
return errors.Wrap(err, "write empty header") | |
} | |
return nil | |
} | |
func (c *compactorMap) writeKeys() ([]segmentindex.Key, error) { | |
key1, value1, _ := c.c1.first() | |
key2, value2, _ := c.c2.first() | |
// the (dummy) header was already written, this is our initial offset | |
offset := segmentindex.HeaderSize | |
var kis []segmentindex.Key | |
pairs := newReusableMapPairs() | |
me := newMapEncoder() | |
ssm := newSortedMapMerger() | |
for { | |
if key1 == nil && key2 == nil { | |
break | |
} | |
if bytes.Equal(key1, key2) { | |
pairs.ResizeLeft(len(value1)) | |
pairs.ResizeRight(len(value2)) | |
for i, v := range value1 { | |
if err := pairs.left[i].FromBytes(v.value, false); err != nil { | |
return nil, err | |
} | |
pairs.left[i].Tombstone = v.tombstone | |
} | |
for i, v := range value2 { | |
if err := pairs.right[i].FromBytes(v.value, false); err != nil { | |
return nil, err | |
} | |
pairs.right[i].Tombstone = v.tombstone | |
} | |
if c.requiresSorting { | |
sort.Slice(pairs.left, func(a, b int) bool { | |
return bytes.Compare(pairs.left[a].Key, pairs.left[b].Key) < 0 | |
}) | |
sort.Slice(pairs.right, func(a, b int) bool { | |
return bytes.Compare(pairs.right[a].Key, pairs.right[b].Key) < 0 | |
}) | |
} | |
ssm.reset([][]MapPair{pairs.left, pairs.right}) | |
mergedPairs, err := ssm. | |
doKeepTombstonesReusable() | |
if err != nil { | |
return nil, err | |
} | |
mergedEncoded, err := me.DoMultiReusable(mergedPairs) | |
if err != nil { | |
return nil, err | |
} | |
if values, skip := c.cleanupValues(mergedEncoded); !skip { | |
ki, err := c.writeIndividualNode(offset, key2, values) | |
if err != nil { | |
return nil, errors.Wrap(err, "write individual node (equal keys)") | |
} | |
offset = ki.ValueEnd | |
kis = append(kis, ki) | |
} | |
// advance both! | |
key1, value1, _ = c.c1.next() | |
key2, value2, _ = c.c2.next() | |
continue | |
} | |
if (key1 != nil && bytes.Compare(key1, key2) == -1) || key2 == nil { | |
// key 1 is smaller | |
if values, skip := c.cleanupValues(value1); !skip { | |
ki, err := c.writeIndividualNode(offset, key1, values) | |
if err != nil { | |
return nil, errors.Wrap(err, "write individual node (key1 smaller)") | |
} | |
offset = ki.ValueEnd | |
kis = append(kis, ki) | |
} | |
key1, value1, _ = c.c1.next() | |
} else { | |
// key 2 is smaller | |
if values, skip := c.cleanupValues(value2); !skip { | |
ki, err := c.writeIndividualNode(offset, key2, values) | |
if err != nil { | |
return nil, errors.Wrap(err, "write individual node (key2 smaller)") | |
} | |
offset = ki.ValueEnd | |
kis = append(kis, ki) | |
} | |
key2, value2, _ = c.c2.next() | |
} | |
} | |
return kis, nil | |
} | |
func (c *compactorMap) writeIndividualNode(offset int, key []byte, | |
values []value, | |
) (segmentindex.Key, error) { | |
// NOTE: There are no guarantees in the cursor logic that any memory is valid | |
// for more than a single iteration. Every time you call next() to advance | |
// the cursor, any memory might be reused. | |
// | |
// This includes the key buffer which was the cause of | |
// https://github.com/weaviate/weaviate/issues/3517 | |
// | |
// A previous logic created a new assignment in each iteration, but thatwas | |
// not an explicit guarantee. A change in v1.21 (for pread/mmap) added a | |
// reusable buffer for the key which surfaced this bug. | |
keyCopy := make([]byte, len(key)) | |
copy(keyCopy, key) | |
return segmentCollectionNode{ | |
values: values, | |
primaryKey: keyCopy, | |
offset: offset, | |
}.KeyIndexAndWriteTo(c.bufw) | |
} | |
func (c *compactorMap) writeIndices(keys []segmentindex.Key) error { | |
indices := segmentindex.Indexes{ | |
Keys: keys, | |
SecondaryIndexCount: c.secondaryIndexCount, | |
ScratchSpacePath: c.scratchSpacePath, | |
} | |
_, err := indices.WriteTo(c.bufw) | |
return err | |
} | |
// writeHeader assumes that everything has been written to the underlying | |
// writer and it is now safe to seek to the beginning and override the initial | |
// header | |
func (c *compactorMap) writeHeader(level, version, secondaryIndices uint16, | |
startOfIndex uint64, | |
) error { | |
if _, err := c.w.Seek(0, io.SeekStart); err != nil { | |
return errors.Wrap(err, "seek to beginning to write header") | |
} | |
h := &segmentindex.Header{ | |
Level: level, | |
Version: version, | |
SecondaryIndices: secondaryIndices, | |
Strategy: segmentindex.StrategyMapCollection, | |
IndexStart: startOfIndex, | |
} | |
if _, err := h.WriteTo(c.w); err != nil { | |
return err | |
} | |
return nil | |
} | |
// Removes values with tombstone set from input slice. Output slice may be smaller than input one. | |
// Returned skip of true means there are no values left (key can be omitted in segment) | |
// WARN: method can alter input slice by swapping its elements and reducing length (not capacity) | |
func (c *compactorMap) cleanupValues(values []value) (vals []value, skip bool) { | |
if !c.cleanupTombstones { | |
return values, false | |
} | |
// Reuse input slice not to allocate new memory | |
// Rearrange slice in a way that tombstoned values are moved to the end | |
// and reduce slice's length. | |
last := 0 | |
for i := 0; i < len(values); i++ { | |
if !values[i].tombstone { | |
// Swap both elements instead overwritting `last` by `i`. | |
// Overwrite would result in `values[last].value` pointing to the same slice | |
// as `values[i].value`. | |
// If `values` slice is reused by multiple nodes (as it happens for map cursors | |
// `segmentCursorCollectionReusable` using `segmentCollectionNode` as buffer) | |
// populating slice `values[i].value` would overwrite slice `values[last].value`. | |
// Swaps makes sure `values[i].value` and `values[last].value` point to different slices. | |
values[last], values[i] = values[i], values[last] | |
last++ | |
} | |
} | |
if last == 0 { | |
return nil, true | |
} | |
return values[:last], false | |
} | |