KevinStephenson
Adding in weaviate code
b110593
raw
history blame
6.96 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package objects
import (
"context"
"fmt"
"github.com/go-openapi/strfmt"
"github.com/weaviate/weaviate/entities/additional"
"github.com/weaviate/weaviate/entities/models"
"github.com/weaviate/weaviate/entities/moduletools"
"github.com/weaviate/weaviate/entities/schema"
"github.com/weaviate/weaviate/entities/schema/crossref"
"github.com/weaviate/weaviate/entities/search"
"github.com/weaviate/weaviate/usecases/config"
)
type MergeDocument struct {
Class string `json:"class"`
ID strfmt.UUID `json:"id"`
PrimitiveSchema map[string]interface{} `json:"primitiveSchema"`
References BatchReferences `json:"references"`
Vector []float32 `json:"vector"`
UpdateTime int64 `json:"updateTime"`
AdditionalProperties models.AdditionalProperties `json:"additionalProperties"`
PropertiesToDelete []string `json:"propertiesToDelete"`
}
func (m *Manager) MergeObject(ctx context.Context, principal *models.Principal,
updates *models.Object, repl *additional.ReplicationProperties,
) *Error {
if err := m.validateInputs(updates); err != nil {
return &Error{"bad request", StatusBadRequest, err}
}
cls, id := updates.Class, updates.ID
path := fmt.Sprintf("objects/%s/%s", cls, id)
if err := m.authorizer.Authorize(principal, "update", path); err != nil {
return &Error{path, StatusForbidden, err}
}
m.metrics.MergeObjectInc()
defer m.metrics.MergeObjectDec()
obj, err := m.vectorRepo.Object(ctx, cls, id, nil, additional.Properties{}, repl, updates.Tenant)
if err != nil {
switch err.(type) {
case ErrMultiTenancy:
return &Error{"repo.object", StatusUnprocessableEntity, err}
default:
return &Error{"repo.object", StatusInternalServerError, err}
}
}
if obj == nil {
return &Error{"not found", StatusNotFound, err}
}
err = m.autoSchemaManager.autoSchema(ctx, principal, updates, false)
if err != nil {
return &Error{"bad request", StatusBadRequest, NewErrInvalidUserInput("invalid object: %v", err)}
}
var propertiesToDelete []string
if updates.Properties != nil {
for key, val := range updates.Properties.(map[string]interface{}) {
if val == nil {
propertiesToDelete = append(propertiesToDelete, schema.LowercaseFirstLetter(key))
}
}
}
if err := m.validateObjectAndNormalizeNames(
ctx, principal, repl, updates, obj.Object()); err != nil {
return &Error{"bad request", StatusBadRequest, err}
}
if updates.Properties == nil {
updates.Properties = map[string]interface{}{}
}
return m.patchObject(ctx, principal, obj, updates, repl, propertiesToDelete, updates.Tenant)
}
// patchObject patches an existing object obj with updates
func (m *Manager) patchObject(ctx context.Context, principal *models.Principal,
obj *search.Result, updates *models.Object, repl *additional.ReplicationProperties,
propertiesToDelete []string, tenant string,
) *Error {
cls, id := updates.Class, updates.ID
primitive, refs := m.splitPrimitiveAndRefs(updates.Properties.(map[string]interface{}), cls, id)
objWithVec, err := m.mergeObjectSchemaAndVectorize(ctx, cls, obj.Schema,
primitive, principal, obj.Vector, updates.Vector)
if err != nil {
return &Error{"merge and vectorize", StatusInternalServerError, err}
}
mergeDoc := MergeDocument{
Class: cls,
ID: id,
PrimitiveSchema: primitive,
References: refs,
Vector: objWithVec.Vector,
UpdateTime: m.timeSource.Now(),
PropertiesToDelete: propertiesToDelete,
}
if objWithVec.Additional != nil {
mergeDoc.AdditionalProperties = objWithVec.Additional
}
if err := m.vectorRepo.Merge(ctx, mergeDoc, repl, tenant); err != nil {
return &Error{"repo.merge", StatusInternalServerError, err}
}
return nil
}
func (m *Manager) validateInputs(updates *models.Object) error {
if updates == nil {
return fmt.Errorf("empty updates")
}
if updates.Class == "" {
return fmt.Errorf("empty class")
}
if updates.ID == "" {
return fmt.Errorf("empty uuid")
}
return nil
}
func (m *Manager) mergeObjectSchemaAndVectorize(ctx context.Context, className string,
old interface{}, new map[string]interface{},
principal *models.Principal, oldVec, newVec []float32,
) (*models.Object, error) {
var merged map[string]interface{}
var vector []float32
var objDiff *moduletools.ObjectDiff
if old == nil {
merged = new
vector = newVec
} else {
oldMap, ok := old.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("expected previous schema to be map, but got %#v", old)
}
objDiff = moduletools.NewObjectDiff(oldVec)
for key, value := range new {
objDiff.WithProp(key, oldMap[key], value)
oldMap[key] = value
}
merged = oldMap
if newVec != nil {
vector = newVec
} else {
vectorizerName, err := m.modulesProvider.VectorizerName(className)
if err != nil {
return nil, fmt.Errorf("find vectorizer name: %w", err)
}
if vectorizerName == config.VectorizerModuleNone {
vector = oldVec
}
}
}
// Note: vector could be a nil vector in case a vectorizer is configured,
// then the vectorizer will set it
obj := &models.Object{Class: className, Properties: merged, Vector: vector}
class, err := m.schemaManager.GetClass(ctx, principal, className)
if err != nil {
return nil, err
}
if err := m.modulesProvider.UpdateVector(ctx, obj, class, objDiff, m.findObject, m.logger); err != nil {
return nil, err
}
return obj, nil
}
func (m *Manager) splitPrimitiveAndRefs(in map[string]interface{}, sourceClass string,
sourceID strfmt.UUID,
) (map[string]interface{}, BatchReferences) {
primitive := map[string]interface{}{}
var outRefs BatchReferences
for prop, value := range in {
refs, ok := value.(models.MultipleRef)
if !ok {
// this must be a primitive filed
primitive[prop] = value
continue
}
for _, ref := range refs {
target, _ := crossref.Parse(ref.Beacon.String())
// safe to ignore error as validation has already been passed
source := &crossref.RefSource{
Local: true,
PeerName: "localhost",
Property: schema.PropertyName(prop),
Class: schema.ClassName(sourceClass),
TargetID: sourceID,
}
outRefs = append(outRefs, BatchReference{From: source, To: target})
}
}
return primitive, outRefs
}