Spaces:
Running
Running
File size: 6,790 Bytes
b110593 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 |
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package db
import (
"context"
"fmt"
"github.com/go-openapi/strfmt"
"github.com/pkg/errors"
"github.com/weaviate/weaviate/entities/additional"
"github.com/weaviate/weaviate/entities/schema"
"github.com/weaviate/weaviate/entities/storobj"
"github.com/weaviate/weaviate/usecases/objects"
)
type batchQueue struct {
objects []*storobj.Object
originalIndex []int
}
func (db *DB) BatchPutObjects(ctx context.Context, objs objects.BatchObjects,
repl *additional.ReplicationProperties,
) (objects.BatchObjects, error) {
objectByClass := make(map[string]batchQueue)
indexByClass := make(map[string]*Index)
if err := db.memMonitor.CheckAlloc(estimateBatchMemory(objs)); err != nil {
return nil, fmt.Errorf("cannot process batch: %w", err)
}
for _, item := range objs {
if item.Err != nil {
// item has a validation error or another reason to ignore
continue
}
queue := objectByClass[item.Object.Class]
queue.objects = append(queue.objects, storobj.FromObject(item.Object, item.Vector))
queue.originalIndex = append(queue.originalIndex, item.OriginalIndex)
objectByClass[item.Object.Class] = queue
}
// wrapped by func to acquire and safely release indexLock only for duration of loop
func() {
db.indexLock.RLock()
defer db.indexLock.RUnlock()
for class, queue := range objectByClass {
index, ok := db.indices[indexID(schema.ClassName(class))]
if !ok {
msg := fmt.Sprintf("could not find index for class %v. It might have been deleted in the meantime", class)
db.logger.Warn(msg)
for _, origIdx := range queue.originalIndex {
if origIdx >= len(objs) {
db.logger.Errorf(
"batch add queue index out of bounds. len(objs) == %d, queue.originalIndex == %d",
len(objs), origIdx)
break
}
objs[origIdx].Err = fmt.Errorf(msg)
}
continue
}
index.dropIndex.RLock()
indexByClass[class] = index
}
}()
// safely release remaining locks (in case of panic)
defer func() {
for _, index := range indexByClass {
if index != nil {
index.dropIndex.RUnlock()
}
}
}()
for class, index := range indexByClass {
queue := objectByClass[class]
errs := index.putObjectBatch(ctx, queue.objects, repl)
// remove index from map to skip releasing its lock in defer
indexByClass[class] = nil
index.dropIndex.RUnlock()
for i, err := range errs {
if err != nil {
objs[queue.originalIndex[i]].Err = err
}
}
}
return objs, nil
}
func (db *DB) AddBatchReferences(ctx context.Context, references objects.BatchReferences,
repl *additional.ReplicationProperties,
) (objects.BatchReferences, error) {
refByClass := make(map[schema.ClassName]objects.BatchReferences)
indexByClass := make(map[schema.ClassName]*Index)
for _, item := range references {
if item.Err != nil {
// item has a validation error or another reason to ignore
continue
}
refByClass[item.From.Class] = append(refByClass[item.From.Class], item)
}
// wrapped by func to acquire and safely release indexLock only for duration of loop
func() {
db.indexLock.RLock()
defer db.indexLock.RUnlock()
for class, queue := range refByClass {
index, ok := db.indices[indexID(class)]
if !ok {
for _, item := range queue {
references[item.OriginalIndex].Err = fmt.Errorf("could not find index for class %v. It might have been deleted in the meantime", class)
}
continue
}
index.dropIndex.RLock()
indexByClass[class] = index
}
}()
// safely release remaining locks (in case of panic)
defer func() {
for _, index := range indexByClass {
if index != nil {
index.dropIndex.RUnlock()
}
}
}()
for class, index := range indexByClass {
queue := refByClass[class]
errs := index.AddReferencesBatch(ctx, queue, repl)
// remove index from map to skip releasing its lock in defer
indexByClass[class] = nil
index.dropIndex.RUnlock()
for i, err := range errs {
if err != nil {
references[queue[i].OriginalIndex].Err = err
}
}
}
return references, nil
}
func (db *DB) BatchDeleteObjects(ctx context.Context, params objects.BatchDeleteParams,
repl *additional.ReplicationProperties, tenant string,
) (objects.BatchDeleteResult, error) {
// get index for a given class
className := params.ClassName
idx := db.GetIndex(className)
if idx == nil {
return objects.BatchDeleteResult{}, errors.Errorf("cannot find index for class %v", className)
}
// find all DocIDs in all shards that match the filter
shardDocIDs, err := idx.findUUIDs(ctx, params.Filters, tenant)
if err != nil {
return objects.BatchDeleteResult{}, errors.Wrapf(err, "cannot find objects")
}
// prepare to be deleted list of DocIDs from all shards
toDelete := map[string][]strfmt.UUID{}
limit := db.config.QueryMaximumResults
matches := int64(0)
for shardName, docIDs := range shardDocIDs {
docIDsLength := int64(len(docIDs))
if matches <= limit {
if matches+docIDsLength <= limit {
toDelete[shardName] = docIDs
} else {
toDelete[shardName] = docIDs[:limit-matches]
}
}
matches += docIDsLength
}
// delete the DocIDs in given shards
deletedObjects, err := idx.batchDeleteObjects(ctx, toDelete, params.DryRun, repl)
if err != nil {
return objects.BatchDeleteResult{}, errors.Wrapf(err, "cannot delete objects")
}
result := objects.BatchDeleteResult{
Matches: matches,
Limit: db.config.QueryMaximumResults,
DryRun: params.DryRun,
Objects: deletedObjects,
}
return result, nil
}
func estimateBatchMemory(objs objects.BatchObjects) int64 {
var sum int64
for _, item := range objs {
// Note: This is very much oversimplified. It assumes that we always need
// the footprint of the full vector and it assumes a fixed overhead of 30B
// per vector. In reality this depends on the HNSW settings - and possibly
// in the future we might have completely different index types.
//
// However, in the meantime this should be a fairly reasonable estimate, as
// it's not meant to fail exactly on the last available byte, but rather
// prevent OOM crashes. Given the fuzziness and async style of the
// memtrackinga somewhat decent estimate should be go good enough.
sum += int64(len(item.Vector)*4 + 30)
}
return sum
}
|