KevinStephenson
Adding in weaviate code
b110593
raw
history blame
5.72 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package objects
import (
"context"
"fmt"
"runtime"
"time"
"github.com/go-openapi/strfmt"
"github.com/google/uuid"
"github.com/weaviate/weaviate/entities/additional"
"github.com/weaviate/weaviate/entities/errorcompounder"
"github.com/weaviate/weaviate/entities/models"
"github.com/weaviate/weaviate/usecases/objects/validation"
"golang.org/x/sync/errgroup"
)
// AddObjects Class Instances in batch to the connected DB
func (b *BatchManager) AddObjects(ctx context.Context, principal *models.Principal,
objects []*models.Object, fields []*string, repl *additional.ReplicationProperties,
) (BatchObjects, error) {
err := b.authorizer.Authorize(principal, "create", "batch/objects")
if err != nil {
return nil, err
}
unlock, err := b.locks.LockConnector()
if err != nil {
return nil, NewErrInternal("could not acquire lock: %v", err)
}
defer unlock()
before := time.Now()
b.metrics.BatchInc()
defer b.metrics.BatchOp("total_uc_level", before.UnixNano())
defer b.metrics.BatchDec()
return b.addObjects(ctx, principal, objects, fields, repl)
}
func (b *BatchManager) addObjects(ctx context.Context, principal *models.Principal,
classes []*models.Object, fields []*string, repl *additional.ReplicationProperties,
) (BatchObjects, error) {
beforePreProcessing := time.Now()
if err := b.validateObjectForm(classes); err != nil {
return nil, NewErrInvalidUserInput("invalid param 'objects': %v", err)
}
batchObjects := b.validateObjectsConcurrently(ctx, principal, classes, fields, repl)
b.metrics.BatchOp("total_preprocessing", beforePreProcessing.UnixNano())
var (
res BatchObjects
err error
)
beforePersistence := time.Now()
defer b.metrics.BatchOp("total_persistence_level", beforePersistence.UnixNano())
if res, err = b.vectorRepo.BatchPutObjects(ctx, batchObjects, repl); err != nil {
return nil, NewErrInternal("batch objects: %#v", err)
}
return res, nil
}
func (b *BatchManager) validateObjectForm(classes []*models.Object) error {
if len(classes) == 0 {
return fmt.Errorf("cannot be empty, need at least one object for batching")
}
return nil
}
func (b *BatchManager) validateObjectsConcurrently(ctx context.Context, principal *models.Principal,
objects []*models.Object, fields []*string, repl *additional.ReplicationProperties,
) BatchObjects {
fieldsToKeep := determineResponseFields(fields)
c := make(chan BatchObject, len(objects))
// the validation function can't error directly, it would return an error
// over the channel. But by using an error group, we can easily limit the
// concurrency
//
// see https://github.com/weaviate/weaviate/issues/3179 for details of how the
// unbounded concurrency caused a production outage
eg := new(errgroup.Group)
eg.SetLimit(2 * runtime.GOMAXPROCS(0))
// Generate a goroutine for each separate request
for i, object := range objects {
i := i
object := object
eg.Go(func() error {
b.validateObject(ctx, principal, object, i, &c, fieldsToKeep, repl)
return nil
})
}
eg.Wait()
close(c)
return objectsChanToSlice(c)
}
func (b *BatchManager) validateObject(ctx context.Context, principal *models.Principal,
concept *models.Object, originalIndex int, resultsC *chan BatchObject,
fieldsToKeep map[string]struct{}, repl *additional.ReplicationProperties,
) {
var id strfmt.UUID
ec := &errorcompounder.ErrorCompounder{}
// Auto Schema
err := b.autoSchemaManager.autoSchema(ctx, principal, concept, true)
ec.Add(err)
if concept.ID == "" {
// Generate UUID for the new object
uid, err := generateUUID()
id = uid
ec.Add(err)
} else {
if _, err := uuid.Parse(concept.ID.String()); err != nil {
ec.Add(err)
}
id = concept.ID
}
object := &models.Object{}
object.LastUpdateTimeUnix = 0
object.ID = id
object.Vector = concept.Vector
object.Tenant = concept.Tenant
if _, ok := fieldsToKeep["class"]; ok {
object.Class = concept.Class
}
if _, ok := fieldsToKeep["properties"]; ok {
object.Properties = concept.Properties
}
if object.Properties == nil {
object.Properties = map[string]interface{}{}
}
now := unixNow()
if _, ok := fieldsToKeep["creationTimeUnix"]; ok {
object.CreationTimeUnix = now
}
if _, ok := fieldsToKeep["lastUpdateTimeUnix"]; ok {
object.LastUpdateTimeUnix = now
}
class, err := b.schemaManager.GetClass(ctx, principal, object.Class)
ec.Add(err)
if class == nil {
ec.Add(fmt.Errorf("class '%s' not present in schema", object.Class))
} else {
err = validation.New(b.vectorRepo.Exists, b.config, repl).
Object(ctx, class, object, nil)
ec.Add(err)
if err == nil {
// update vector only if we passed validation
err = b.modulesProvider.UpdateVector(ctx, object, class, nil, b.findObject, b.logger)
ec.Add(err)
}
}
*resultsC <- BatchObject{
UUID: id,
Object: object,
Err: ec.ToError(),
OriginalIndex: originalIndex,
Vector: object.Vector,
}
}
func objectsChanToSlice(c chan BatchObject) BatchObjects {
result := make([]BatchObject, len(c))
for object := range c {
result[object.OriginalIndex] = object
}
return result
}
func unixNow() int64 {
return time.Now().UnixNano() / int64(time.Millisecond)
}