KevinStephenson
Adding in weaviate code
b110593
raw
history blame
6.58 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package inverted
import (
"bytes"
"context"
"fmt"
"github.com/pkg/errors"
"github.com/weaviate/weaviate/adapters/repos/db/lsmkv"
"github.com/weaviate/weaviate/entities/filters"
)
// RowReader reads one or many row(s) depending on the specified operator
type RowReader struct {
value []byte
bucket *lsmkv.Bucket
operator filters.Operator
keyOnly bool
}
// If keyOnly is set, the RowReader will request key-only cursors wherever
// cursors are used, the specified value arguments in the ReadFn will always be
// nil
func NewRowReader(bucket *lsmkv.Bucket, value []byte,
operator filters.Operator, keyOnly bool,
) *RowReader {
return &RowReader{
bucket: bucket,
value: value,
operator: operator,
keyOnly: keyOnly,
}
}
// ReadFn will be called 1..n times per match. This means it will also be
// called on a non-match, in this case v == nil.
// It is up to the caller to decide if that is an error case or not.
//
// Note that because what we are parsing is an inverted index row, it can
// sometimes become confusing what a key and value actually resembles. The
// variables k and v are the literal row key and value. So this means, the
// data-value as in "less than 17" where 17 would be the "value" is in the key
// variable "k". The value will contain the docCount, hash and list of pointers
// (with optional frequency) to the docIDs
//
// The boolean return argument is a way to stop iteration (e.g. when a limit is
// reached) without producing an error. In normal operation always return true,
// if false is returned once, the loop is broken.
type ReadFn func(k []byte, values [][]byte) (bool, error)
// Read a row using the specified ReadFn. If RowReader was created with
// keysOnly==true, the values argument in the readFn will always be nil on all
// requests involving cursors
func (rr *RowReader) Read(ctx context.Context, readFn ReadFn) error {
switch rr.operator {
case filters.OperatorEqual:
return rr.equal(ctx, readFn)
case filters.OperatorNotEqual:
return rr.notEqual(ctx, readFn)
case filters.OperatorGreaterThan:
return rr.greaterThan(ctx, readFn, false)
case filters.OperatorGreaterThanEqual:
return rr.greaterThan(ctx, readFn, true)
case filters.OperatorLessThan:
return rr.lessThan(ctx, readFn, false)
case filters.OperatorLessThanEqual:
return rr.lessThan(ctx, readFn, true)
case filters.OperatorLike:
return rr.like(ctx, readFn)
case filters.OperatorIsNull: // we need to fetch a row with a given value (there is only nil and !nil) and can reuse equal to get the correct row
return rr.equal(ctx, readFn)
default:
return fmt.Errorf("operator %v not supported", rr.operator)
}
}
// equal is a special case, as we don't need to iterate, but just read a single
// row
func (rr *RowReader) equal(ctx context.Context, readFn ReadFn) error {
if err := ctx.Err(); err != nil {
return err
}
v, err := rr.bucket.SetList(rr.value)
if err != nil {
return err
}
_, err = readFn(rr.value, v)
return err
}
// greaterThan reads from the specified value to the end. The first row is only
// included if allowEqual==true, otherwise it starts with the next one
func (rr *RowReader) greaterThan(ctx context.Context, readFn ReadFn,
allowEqual bool,
) error {
c := rr.newCursor()
defer c.Close()
for k, v := c.Seek(rr.value); k != nil; k, v = c.Next() {
if err := ctx.Err(); err != nil {
return err
}
if bytes.Equal(k, rr.value) && !allowEqual {
continue
}
continueReading, err := readFn(k, v)
if err != nil {
return err
}
if !continueReading {
break
}
}
return nil
}
// lessThan reads from the very begging to the specified value. The last
// matching row is only included if allowEqual==true, otherwise it ends one
// prior to that.
func (rr *RowReader) lessThan(ctx context.Context, readFn ReadFn,
allowEqual bool,
) error {
c := rr.newCursor()
defer c.Close()
for k, v := c.First(); k != nil && bytes.Compare(k, rr.value) != 1; k, v = c.Next() {
if err := ctx.Err(); err != nil {
return err
}
if bytes.Equal(k, rr.value) && !allowEqual {
continue
}
continueReading, err := readFn(k, v)
if err != nil {
return err
}
if !continueReading {
break
}
}
return nil
}
// notEqual is another special case, as it's the opposite of equal. So instead
// of reading just one row, we read all but one row.
func (rr *RowReader) notEqual(ctx context.Context, readFn ReadFn) error {
c := rr.newCursor()
defer c.Close()
for k, v := c.First(); k != nil; k, v = c.Next() {
if err := ctx.Err(); err != nil {
return err
}
if bytes.Equal(k, rr.value) {
continue
}
continueReading, err := readFn(k, v)
if err != nil {
return err
}
if !continueReading {
break
}
}
return nil
}
func (rr *RowReader) like(ctx context.Context, readFn ReadFn) error {
like, err := parseLikeRegexp(rr.value)
if err != nil {
return errors.Wrapf(err, "parse like value")
}
c := rr.newCursor()
defer c.Close()
var (
initialK []byte
initialV [][]byte
)
if like.optimizable {
initialK, initialV = c.Seek(like.min)
} else {
initialK, initialV = c.First()
}
for k, v := initialK, initialV; k != nil; k, v = c.Next() {
if err := ctx.Err(); err != nil {
return err
}
if like.optimizable {
// if the query is optimizable, i.e. it doesn't start with a wildcard, we
// can abort once we've moved past the point where the fixed characters
// no longer match
if len(k) < len(like.min) {
break
}
if bytes.Compare(like.min, k[:len(like.min)]) == -1 {
break
}
}
if !like.regexp.Match(k) {
continue
}
continueReading, err := readFn(k, v)
if err != nil {
return err
}
if !continueReading {
break
}
}
return nil
}
// newCursor will either return a regular cursor - or a key-only cursor if
// keyOnly==true
func (rr *RowReader) newCursor() *lsmkv.CursorSet {
if rr.keyOnly {
return rr.bucket.SetCursorKeyOnly()
}
return rr.bucket.SetCursor()
}