SemanticSearchPOC / adapters /repos /db /inverted /row_reader_frequency.go
KevinStephenson
Adding in weaviate code
b110593
raw
history blame
7.1 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package inverted
import (
"bytes"
"context"
"fmt"
"github.com/pkg/errors"
"github.com/weaviate/weaviate/adapters/repos/db/lsmkv"
"github.com/weaviate/weaviate/entities/filters"
)
// RowReaderFrequency reads one or many row(s) depending on the specified operator
type RowReaderFrequency struct {
value []byte
bucket *lsmkv.Bucket
operator filters.Operator
keyOnly bool
shardVersion uint16
}
func NewRowReaderFrequency(bucket *lsmkv.Bucket, value []byte,
operator filters.Operator, keyOnly bool, shardVersion uint16,
) *RowReaderFrequency {
return &RowReaderFrequency{
bucket: bucket,
value: value,
operator: operator,
keyOnly: keyOnly,
shardVersion: shardVersion,
}
}
// ReadFnFrequency will be called 1..n times per match. This means it will also be
// called on a non-match, in this case v == nil.
// It is up to the caller to decide if that is an error case or not.
//
// Note that because what we are parsing is an inverted index row, it can
// sometimes become confusing what a key and value actually resembles. The
// variables k and v are the literal row key and value. So this means, the
// data-value as in "less than 17" where 17 would be the "value" is in the key
// variable "k". The value will contain the docCount, hash and list of pointers
// (with optional frequency) to the docIDs
//
// The boolean return argument is a way to stop iteration (e.g. when a limit is
// reached) without producing an error. In normal operation always return true,
// if false is returned once, the loop is broken.
type ReadFnFrequency func(k []byte, values []lsmkv.MapPair) (bool, error)
func (rr *RowReaderFrequency) Read(ctx context.Context, readFn ReadFnFrequency) error {
switch rr.operator {
case filters.OperatorEqual:
return rr.equal(ctx, readFn)
case filters.OperatorNotEqual:
return rr.notEqual(ctx, readFn)
case filters.OperatorGreaterThan:
return rr.greaterThan(ctx, readFn, false)
case filters.OperatorGreaterThanEqual:
return rr.greaterThan(ctx, readFn, true)
case filters.OperatorLessThan:
return rr.lessThan(ctx, readFn, false)
case filters.OperatorLessThanEqual:
return rr.lessThan(ctx, readFn, true)
case filters.OperatorLike:
return rr.like(ctx, readFn)
default:
return fmt.Errorf("operator %v supported", rr.operator)
}
}
// equal is a special case, as we don't need to iterate, but just read a single
// row
func (rr *RowReaderFrequency) equal(ctx context.Context, readFn ReadFnFrequency) error {
if err := ctx.Err(); err != nil {
return err
}
var v []lsmkv.MapPair
var err error
if rr.shardVersion < 2 {
v, err = rr.bucket.MapList(rr.value, lsmkv.MapListAcceptDuplicates(),
lsmkv.MapListLegacySortingRequired())
if err != nil {
return err
}
} else {
v, err = rr.bucket.MapList(rr.value, lsmkv.MapListAcceptDuplicates())
if err != nil {
return err
}
}
// TODO: don't we need to check here if this is a doc id vs a object search?
// Or is this not a problem because the latter removes duplicates anyway?
_, err = readFn(rr.value, v)
return err
}
// greaterThan reads from the specified value to the end. The first row is only
// included if allowEqual==true, otherwise it starts with the next one
func (rr *RowReaderFrequency) greaterThan(ctx context.Context, readFn ReadFnFrequency,
allowEqual bool,
) error {
c := rr.newCursor()
defer c.Close()
for k, v := c.Seek(rr.value); k != nil; k, v = c.Next() {
if err := ctx.Err(); err != nil {
return err
}
if bytes.Equal(k, rr.value) && !allowEqual {
continue
}
continueReading, err := readFn(k, v)
if err != nil {
return err
}
if !continueReading {
break
}
}
return nil
}
// lessThan reads from the very begging to the specified value. The last
// matching row is only included if allowEqual==true, otherwise it ends one
// prior to that.
func (rr *RowReaderFrequency) lessThan(ctx context.Context, readFn ReadFnFrequency,
allowEqual bool,
) error {
c := rr.newCursor()
defer c.Close()
for k, v := c.First(); k != nil && bytes.Compare(k, rr.value) != 1; k, v = c.Next() {
if err := ctx.Err(); err != nil {
return err
}
if bytes.Equal(k, rr.value) && !allowEqual {
continue
}
continueReading, err := readFn(k, v)
if err != nil {
return err
}
if !continueReading {
break
}
}
return nil
}
// notEqual is another special case, as it's the opposite of equal. So instead
// of reading just one row, we read all but one row.
func (rr *RowReaderFrequency) notEqual(ctx context.Context, readFn ReadFnFrequency) error {
c := rr.newCursor()
defer c.Close()
for k, v := c.First(); k != nil; k, v = c.Next() {
if err := ctx.Err(); err != nil {
return err
}
if bytes.Equal(k, rr.value) {
continue
}
continueReading, err := readFn(k, v)
if err != nil {
return err
}
if !continueReading {
break
}
}
return nil
}
func (rr *RowReaderFrequency) like(ctx context.Context, readFn ReadFnFrequency) error {
like, err := parseLikeRegexp(rr.value)
if err != nil {
return errors.Wrapf(err, "parse like value")
}
// TODO: don't we need to check here if this is a doc id vs a object search?
// Or is this not a problem because the latter removes duplicates anyway?
c := rr.newCursor(lsmkv.MapListAcceptDuplicates())
defer c.Close()
var (
initialK []byte
initialV []lsmkv.MapPair
)
if like.optimizable {
initialK, initialV = c.Seek(like.min)
} else {
initialK, initialV = c.First()
}
for k, v := initialK, initialV; k != nil; k, v = c.Next() {
if err := ctx.Err(); err != nil {
return err
}
if like.optimizable {
// if the query is optimizable, i.e. it doesn't start with a wildcard, we
// can abort once we've moved past the point where the fixed characters
// no longer match
if len(k) < len(like.min) {
break
}
if bytes.Compare(like.min, k[:len(like.min)]) == -1 {
break
}
}
if !like.regexp.Match(k) {
continue
}
continueReading, err := readFn(k, v)
if err != nil {
return err
}
if !continueReading {
break
}
}
return nil
}
// newCursor will either return a regular cursor - or a key-only cursor if
// keyOnly==true
func (rr *RowReaderFrequency) newCursor(
opts ...lsmkv.MapListOption,
) *lsmkv.CursorMap {
if rr.shardVersion < 2 {
opts = append(opts, lsmkv.MapListLegacySortingRequired())
}
if rr.keyOnly {
return rr.bucket.MapCursorKeyOnly(opts...)
}
return rr.bucket.MapCursor(opts...)
}