SemanticSearchPOC / adapters /repos /db /lsmkv /cursor_bucket_set.go
KevinStephenson
Adding in weaviate code
b110593
raw
history blame
6.31 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package lsmkv
import (
"bytes"
"errors"
"fmt"
"github.com/weaviate/weaviate/entities/lsmkv"
)
type CursorSet struct {
innerCursors []innerCursorCollection
state []cursorStateCollection
unlock func()
keyOnly bool
}
type innerCursorCollection interface {
first() ([]byte, []value, error)
next() ([]byte, []value, error)
seek([]byte) ([]byte, []value, error)
}
type cursorStateCollection struct {
key []byte
value []value
err error
}
// SetCursor holds a RLock for the flushing state. It needs to be closed using the
// .Close() methods or otherwise the lock will never be released
func (b *Bucket) SetCursor() *CursorSet {
b.flushLock.RLock()
if b.strategy != StrategySetCollection {
panic("SetCursor() called on strategy other than 'set'")
}
innerCursors, unlockSegmentGroup := b.disk.newCollectionCursors()
// we have a flush-RLock, so we have the guarantee that the flushing state
// will not change for the lifetime of the cursor, thus there can only be two
// states: either a flushing memtable currently exists - or it doesn't
if b.flushing != nil {
innerCursors = append(innerCursors, b.flushing.newCollectionCursor())
}
innerCursors = append(innerCursors, b.active.newCollectionCursor())
return &CursorSet{
unlock: func() {
unlockSegmentGroup()
b.flushLock.RUnlock()
},
// cursor are in order from oldest to newest, with the memtable cursor
// being at the very top
innerCursors: innerCursors,
}
}
// SetCursorKeyOnly returns nil for all values. It has no control over the
// underlying "inner" cursors which may still retrieve a value which is then
// discarded. It does however, omit any handling of values, such as decoding,
// making this considerably more efficient if only keys are required.
//
// The same locking rules as for SetCursor apply.
func (b *Bucket) SetCursorKeyOnly() *CursorSet {
c := b.SetCursor()
c.keyOnly = true
return c
}
func (c *CursorSet) Seek(key []byte) ([]byte, [][]byte) {
c.seekAll(key)
return c.serveCurrentStateAndAdvance()
}
func (c *CursorSet) Next() ([]byte, [][]byte) {
return c.serveCurrentStateAndAdvance()
}
func (c *CursorSet) First() ([]byte, [][]byte) {
c.firstAll()
return c.serveCurrentStateAndAdvance()
}
func (c *CursorSet) Close() {
c.unlock()
}
func (c *CursorSet) seekAll(target []byte) {
state := make([]cursorStateCollection, len(c.innerCursors))
for i, cur := range c.innerCursors {
key, value, err := cur.seek(target)
if errors.Is(err, lsmkv.NotFound) {
state[i].err = err
continue
}
if err != nil {
panic(fmt.Errorf("unexpected error in seek: %w", err))
}
state[i].key = key
if !c.keyOnly {
state[i].value = value
}
}
c.state = state
}
func (c *CursorSet) firstAll() {
state := make([]cursorStateCollection, len(c.innerCursors))
for i, cur := range c.innerCursors {
key, value, err := cur.first()
if errors.Is(err, lsmkv.NotFound) {
state[i].err = err
continue
}
if err != nil {
panic(fmt.Errorf("unexpected error in seek: %w", err))
}
state[i].key = key
if !c.keyOnly {
state[i].value = value
}
}
c.state = state
}
func (c *CursorSet) serveCurrentStateAndAdvance() ([]byte, [][]byte) {
id, err := c.cursorWithLowestKey()
if err != nil {
if errors.Is(err, lsmkv.NotFound) {
return nil, nil
}
}
// check if this is a duplicate key before checking for the remaining errors,
// as cases such as 'entities.Deleted' can be better handled inside
// mergeDuplicatesInCurrentStateAndAdvance where we can be sure to act on
// segments in the correct order
if ids, ok := c.haveDuplicatesInState(id); ok {
return c.mergeDuplicatesInCurrentStateAndAdvance(ids)
} else {
return c.mergeDuplicatesInCurrentStateAndAdvance([]int{id})
}
}
func (c *CursorSet) cursorWithLowestKey() (int, error) {
err := lsmkv.NotFound
pos := -1
var lowest []byte
for i, res := range c.state {
if errors.Is(res.err, lsmkv.NotFound) {
continue
}
if lowest == nil || bytes.Compare(res.key, lowest) <= 0 {
pos = i
err = res.err
lowest = res.key
}
}
if err != nil {
return pos, err
}
return pos, nil
}
func (c *CursorSet) haveDuplicatesInState(idWithLowestKey int) ([]int, bool) {
key := c.state[idWithLowestKey].key
var idsFound []int
for i, cur := range c.state {
if i == idWithLowestKey {
idsFound = append(idsFound, i)
continue
}
if bytes.Equal(key, cur.key) {
idsFound = append(idsFound, i)
}
}
return idsFound, len(idsFound) > 1
}
// if there are no duplicates present it will still work as returning the
// latest result is the same as returning the only result
func (c *CursorSet) mergeDuplicatesInCurrentStateAndAdvance(ids []int) ([]byte, [][]byte) {
// take the key from any of the results, we have the guarantee that they're
// all the same
key := c.state[ids[0]].key
var raw []value
for _, id := range ids {
raw = append(raw, c.state[id].value...)
c.advanceInner(id)
}
values := newSetDecoder().Do(raw)
if len(values) == 0 {
// all values deleted, skip key
return c.Next()
}
// TODO remove keyOnly option, not used anyway
if !c.keyOnly {
return key, values
} else {
return key, nil
}
}
func (c *CursorSet) advanceInner(id int) {
k, v, err := c.innerCursors[id].next()
if errors.Is(err, lsmkv.NotFound) {
c.state[id].err = err
c.state[id].key = nil
if !c.keyOnly {
c.state[id].value = nil
}
return
}
if errors.Is(err, lsmkv.Deleted) {
c.state[id].err = err
c.state[id].key = k
c.state[id].value = nil
return
}
if err != nil {
panic(fmt.Errorf("unexpected error in advance: %w", err))
}
c.state[id].key = k
if !c.keyOnly {
c.state[id].value = v
}
c.state[id].err = nil
}