KevinStephenson
Adding in weaviate code
b110593
raw
history blame
3.42 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package docid
import (
"encoding/binary"
"github.com/weaviate/weaviate/entities/storobj"
"github.com/weaviate/weaviate/entities/models"
"github.com/pkg/errors"
"github.com/weaviate/weaviate/adapters/repos/db/helpers"
"github.com/weaviate/weaviate/adapters/repos/db/lsmkv"
)
// ObjectScanFn is called once per object, if false or an error is returned,
// the scanning will stop
type ObjectScanFn func(prop *models.PropertySchema, docID uint64) (bool, error)
// ScanObjectsLSM calls the provided scanFn on each object for the
// specified pointer. If a pointer does not resolve to an object-id, the item
// will be skipped. The number of times scanFn is called can therefore be
// smaller than the input length of pointers.
func ScanObjectsLSM(store *lsmkv.Store, pointers []uint64, scan ObjectScanFn, properties []string) error {
return newObjectScannerLSM(store, pointers, scan, properties).Do()
}
type objectScannerLSM struct {
store *lsmkv.Store
pointers []uint64
scanFn ObjectScanFn
objectsBucket *lsmkv.Bucket
properties []string
}
func newObjectScannerLSM(store *lsmkv.Store, pointers []uint64,
scan ObjectScanFn, properties []string,
) *objectScannerLSM {
return &objectScannerLSM{
store: store,
pointers: pointers,
scanFn: scan,
properties: properties,
}
}
func (os *objectScannerLSM) Do() error {
if err := os.init(); err != nil {
return errors.Wrap(err, "init object scanner")
}
if err := os.scan(); err != nil {
return errors.Wrap(err, "scan")
}
return nil
}
func (os *objectScannerLSM) init() error {
bucket := os.store.Bucket(helpers.ObjectsBucketLSM)
if bucket == nil {
return errors.Errorf("objects bucket not found")
}
os.objectsBucket = bucket
return nil
}
func (os *objectScannerLSM) scan() error {
// each object is scanned one after the other, so we can reuse the same memory allocations for all objects
docIDBytes := make([]byte, 8)
// Preallocate strings needed for json unmarshalling
propStrings := make([][]string, len(os.properties))
for i := range os.properties {
propStrings[i] = []string{os.properties[i]}
}
// The typed properties are needed for extraction from json
var properties models.PropertySchema
propertiesTyped := map[string]interface{}{}
for _, prop := range os.properties {
propertiesTyped[prop] = nil
}
for _, id := range os.pointers {
binary.LittleEndian.PutUint64(docIDBytes, id)
res, err := os.objectsBucket.GetBySecondary(0, docIDBytes)
if err != nil {
return err
}
if res == nil {
continue
}
if len(os.properties) > 0 {
err = storobj.UnmarshalPropertiesFromObject(res, &propertiesTyped, os.properties, propStrings)
if err != nil {
return errors.Wrapf(err, "unmarshal data object")
}
properties = propertiesTyped
}
continueScan, err := os.scanFn(&properties, id)
if err != nil {
return errors.Wrapf(err, "scan")
}
if !continueScan {
break
}
}
return nil
}