KevinStephenson
Adding in weaviate code
b110593
raw
history blame
7.85 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package db
import (
"context"
"fmt"
"time"
"github.com/go-openapi/strfmt"
"github.com/pkg/errors"
"github.com/weaviate/weaviate/adapters/repos/db/refcache"
"github.com/weaviate/weaviate/entities/additional"
"github.com/weaviate/weaviate/entities/models"
"github.com/weaviate/weaviate/entities/multi"
"github.com/weaviate/weaviate/entities/schema"
"github.com/weaviate/weaviate/entities/schema/crossref"
"github.com/weaviate/weaviate/entities/search"
"github.com/weaviate/weaviate/entities/storobj"
"github.com/weaviate/weaviate/usecases/objects"
)
func (db *DB) PutObject(ctx context.Context, obj *models.Object,
vector []float32, repl *additional.ReplicationProperties,
) error {
object := storobj.FromObject(obj, vector)
idx := db.GetIndex(object.Class())
if idx == nil {
return fmt.Errorf("import into non-existing index for %s", object.Class())
}
if err := idx.putObject(ctx, object, repl); err != nil {
return fmt.Errorf("import into index %s: %w", idx.ID(), err)
}
return nil
}
// DeleteObject from of a specific class giving its ID
func (db *DB) DeleteObject(ctx context.Context, class string, id strfmt.UUID,
repl *additional.ReplicationProperties, tenant string,
) error {
idx := db.GetIndex(schema.ClassName(class))
if idx == nil {
return fmt.Errorf("delete from non-existing index for %s", class)
}
err := idx.deleteObject(ctx, id, repl, tenant)
if err != nil {
return fmt.Errorf("delete from index %q: %w", idx.ID(), err)
}
return nil
}
func (db *DB) MultiGet(ctx context.Context, query []multi.Identifier,
additional additional.Properties, tenant string,
) ([]search.Result, error) {
byIndex := map[string][]multi.Identifier{}
db.indexLock.RLock()
defer db.indexLock.RUnlock()
for i, q := range query {
// store original position to make assembly easier later
q.OriginalPosition = i
for _, index := range db.indices {
if index.Config.ClassName != schema.ClassName(q.ClassName) {
continue
}
queue := byIndex[index.ID()]
queue = append(queue, q)
byIndex[index.ID()] = queue
}
}
out := make(search.Results, len(query))
for indexID, queries := range byIndex {
indexRes, err := db.indices[indexID].multiObjectByID(ctx, queries, tenant)
if err != nil {
return nil, errors.Wrapf(err, "index %q", indexID)
}
for i, obj := range indexRes {
if obj == nil {
continue
}
res := obj.SearchResult(additional, tenant)
out[queries[i].OriginalPosition] = *res
}
}
return out, nil
}
// ObjectByID checks every index of the particular kind for the ID
//
// @warning: this function is deprecated by Object()
func (db *DB) ObjectByID(ctx context.Context, id strfmt.UUID,
props search.SelectProperties, additional additional.Properties,
tenant string,
) (*search.Result, error) {
results, err := db.ObjectsByID(ctx, id, props, additional, tenant)
if err != nil {
return nil, err
}
if len(results) == 0 {
return nil, nil
}
return &results[0], nil
}
// ObjectsByID checks every index of the particular kind for the ID
// this method is only used for Explore queries where we don't have
// a class context
func (db *DB) ObjectsByID(ctx context.Context, id strfmt.UUID,
props search.SelectProperties, additional additional.Properties,
tenant string,
) (search.Results, error) {
var result []*storobj.Object
// TODO: Search in parallel, rather than sequentially or this will be
// painfully slow on large schemas
db.indexLock.RLock()
for _, index := range db.indices {
res, err := index.objectByID(ctx, id, props, additional, nil, tenant)
if err != nil {
db.indexLock.RUnlock()
switch err.(type) {
case objects.ErrMultiTenancy:
return nil, objects.NewErrMultiTenancy(fmt.Errorf("search index %s: %w", index.ID(), err))
default:
return nil, errors.Wrapf(err, "search index %s", index.ID())
}
}
if res != nil {
result = append(result, res)
}
}
db.indexLock.RUnlock()
if result == nil {
return nil, nil
}
return db.ResolveReferences(ctx,
storobj.SearchResults(result, additional, tenant), props, nil, additional, tenant)
}
// Object gets object with id from index of specified class.
func (db *DB) Object(ctx context.Context, class string, id strfmt.UUID,
props search.SelectProperties, addl additional.Properties,
repl *additional.ReplicationProperties, tenant string,
) (*search.Result, error) {
idx := db.GetIndex(schema.ClassName(class))
if idx == nil {
return nil, nil
}
obj, err := idx.objectByID(ctx, id, props, addl, repl, tenant)
if err != nil {
switch err.(type) {
case objects.ErrMultiTenancy:
return nil, objects.NewErrMultiTenancy(fmt.Errorf("search index %s: %w", idx.ID(), err))
default:
return nil, errors.Wrapf(err, "search index %s", idx.ID())
}
}
var r *search.Result
if obj != nil {
r = obj.SearchResult(addl, tenant)
}
if r == nil {
return nil, nil
}
return db.enrichRefsForSingle(ctx, r, props, addl, tenant)
}
func (db *DB) enrichRefsForSingle(ctx context.Context, obj *search.Result,
props search.SelectProperties, additional additional.Properties, tenant string,
) (*search.Result, error) {
res, err := refcache.NewResolver(refcache.NewCacher(db, db.logger, tenant)).
Do(ctx, []search.Result{*obj}, props, additional)
if err != nil {
return nil, errors.Wrap(err, "resolve cross-refs")
}
return &res[0], nil
}
func (db *DB) Exists(ctx context.Context, class string, id strfmt.UUID,
repl *additional.ReplicationProperties, tenant string,
) (bool, error) {
if class == "" {
return db.anyExists(ctx, id, repl)
}
index := db.GetIndex(schema.ClassName(class))
if index == nil {
return false, nil
}
return index.exists(ctx, id, repl, tenant)
}
func (db *DB) anyExists(ctx context.Context, id strfmt.UUID,
repl *additional.ReplicationProperties,
) (bool, error) {
// TODO: Search in parallel, rather than sequentially or this will be
// painfully slow on large schemas
db.indexLock.RLock()
defer db.indexLock.RUnlock()
for _, index := range db.indices {
ok, err := index.exists(ctx, id, repl, "")
if err != nil {
switch err.(type) {
case objects.ErrMultiTenancy:
return false, objects.NewErrMultiTenancy(fmt.Errorf("search index %s: %w", index.ID(), err))
default:
return false, errors.Wrapf(err, "search index %s", index.ID())
}
}
if ok {
return true, nil
}
}
return false, nil
}
func (db *DB) AddReference(ctx context.Context, source *crossref.RefSource, target *crossref.Ref,
repl *additional.ReplicationProperties, tenant string,
) error {
return db.Merge(ctx, objects.MergeDocument{
Class: source.Class.String(),
ID: source.TargetID,
UpdateTime: time.Now().UnixMilli(),
References: objects.BatchReferences{
objects.BatchReference{
From: source,
To: target,
},
},
}, repl, tenant)
}
func (db *DB) Merge(ctx context.Context, merge objects.MergeDocument,
repl *additional.ReplicationProperties, tenant string,
) error {
idx := db.GetIndex(schema.ClassName(merge.Class))
if idx == nil {
return fmt.Errorf("merge from non-existing index for %s", merge.Class)
}
err := idx.mergeObject(ctx, merge, repl, tenant)
if err != nil {
return errors.Wrapf(err, "merge into index %s", idx.ID())
}
return nil
}