Spaces:
Running
Running
// _ _ | |
// __ _____ __ ___ ___ __ _| |_ ___ | |
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
// \ V V / __/ (_| |\ V /| | (_| | || __/ | |
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
// | |
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. | |
// | |
// CONTACT: [email protected] | |
// | |
package db | |
import ( | |
"context" | |
"fmt" | |
"os" | |
"path" | |
"runtime" | |
"runtime/debug" | |
golangSort "sort" | |
"strings" | |
"sync" | |
"sync/atomic" | |
"time" | |
"github.com/go-openapi/strfmt" | |
"github.com/google/uuid" | |
"github.com/pkg/errors" | |
"github.com/sirupsen/logrus" | |
"github.com/weaviate/weaviate/adapters/repos/db/aggregator" | |
"github.com/weaviate/weaviate/adapters/repos/db/helpers" | |
"github.com/weaviate/weaviate/adapters/repos/db/indexcheckpoint" | |
"github.com/weaviate/weaviate/adapters/repos/db/inverted" | |
"github.com/weaviate/weaviate/adapters/repos/db/inverted/stopwords" | |
"github.com/weaviate/weaviate/adapters/repos/db/sorter" | |
"github.com/weaviate/weaviate/adapters/repos/db/vector/hnsw" | |
"github.com/weaviate/weaviate/entities/additional" | |
"github.com/weaviate/weaviate/entities/aggregation" | |
"github.com/weaviate/weaviate/entities/autocut" | |
"github.com/weaviate/weaviate/entities/filters" | |
"github.com/weaviate/weaviate/entities/models" | |
"github.com/weaviate/weaviate/entities/multi" | |
"github.com/weaviate/weaviate/entities/schema" | |
"github.com/weaviate/weaviate/entities/search" | |
"github.com/weaviate/weaviate/entities/searchparams" | |
"github.com/weaviate/weaviate/entities/storobj" | |
"github.com/weaviate/weaviate/usecases/config" | |
"github.com/weaviate/weaviate/usecases/monitoring" | |
"github.com/weaviate/weaviate/usecases/objects" | |
"github.com/weaviate/weaviate/usecases/replica" | |
schemaUC "github.com/weaviate/weaviate/usecases/schema" | |
"github.com/weaviate/weaviate/usecases/sharding" | |
"golang.org/x/sync/errgroup" | |
) | |
var ( | |
errTenantNotFound = errors.New("tenant not found") | |
errTenantNotActive = errors.New("tenant not active") | |
// Use runtime.GOMAXPROCS instead of runtime.NumCPU because NumCPU returns | |
// the physical CPU cores. However, in a containerization context, that might | |
// not be what we want. The physical node could have 128 cores, but we could | |
// be cgroup-limited to 2 cores. In that case, we want 2 to be our limit, not | |
// 128. It isn't guaranteed that MAXPROCS reflects the cgroup limit, but at | |
// least there is a chance that it was set correctly. If not, it defaults to | |
// NumCPU anyway, so we're not any worse off. | |
_NUMCPU = runtime.GOMAXPROCS(0) | |
errShardNotFound = errors.New("shard not found") | |
) | |
// shardMap is a syn.Map which specialized in storing shards | |
type shardMap sync.Map | |
// Range calls f sequentially for each key and value present in the map. | |
// If f returns an error, range stops the iteration | |
func (m *shardMap) Range(f func(name string, shard ShardLike) error) (err error) { | |
(*sync.Map)(m).Range(func(key, value any) bool { | |
err = f(key.(string), value.(ShardLike)) | |
return err == nil | |
}) | |
return err | |
} | |
// RangeConcurrently calls f for each key and value present in the map with at | |
// most _NUMCPU executors running in parallel. As opposed to [Range] it does | |
// not guarantee an exit on the first error. | |
func (m *shardMap) RangeConcurrently(f func(name string, shard ShardLike) error) (err error) { | |
eg := errgroup.Group{} | |
eg.SetLimit(_NUMCPU) | |
(*sync.Map)(m).Range(func(key, value any) bool { | |
name, shard := key.(string), value.(ShardLike) | |
eg.Go(func() error { | |
return f(name, shard) | |
}) | |
return true | |
}) | |
return eg.Wait() | |
} | |
// Load returns the shard or nil if no shard is present. | |
func (m *shardMap) Load(name string) ShardLike { | |
v, ok := (*sync.Map)(m).Load(name) | |
if !ok { | |
return nil | |
} | |
return v.(ShardLike) | |
} | |
// Store sets a shard giving its name and value | |
func (m *shardMap) Store(name string, shard ShardLike) { | |
(*sync.Map)(m).Store(name, shard) | |
} | |
// Swap swaps the shard for a key and returns the previous value if any. | |
// The loaded result reports whether the key was present. | |
func (m *shardMap) Swap(name string, shard ShardLike) (previous ShardLike, loaded bool) { | |
v, ok := (*sync.Map)(m).Swap(name, shard) | |
if v == nil || !ok { | |
return nil, ok | |
} | |
return v.(ShardLike), ok | |
} | |
// CompareAndSwap swaps the old and new values for key if the value stored in the map is equal to old. | |
func (m *shardMap) CompareAndSwap(name string, old, new ShardLike) bool { | |
return (*sync.Map)(m).CompareAndSwap(name, old, new) | |
} | |
// LoadAndDelete deletes the value for a key, returning the previous value if any. | |
// The loaded result reports whether the key was present. | |
func (m *shardMap) LoadAndDelete(name string) (ShardLike, bool) { | |
v, ok := (*sync.Map)(m).LoadAndDelete(name) | |
if v == nil || !ok { | |
return nil, ok | |
} | |
return v.(ShardLike), ok | |
} | |
// Index is the logical unit which contains all the data for one particular | |
// class. An index can be further broken up into self-contained units, called | |
// Shards, to allow for easy distribution across Nodes | |
type Index struct { | |
classSearcher inverted.ClassSearcher // to allow for nested by-references searches | |
shards shardMap | |
Config IndexConfig | |
vectorIndexUserConfig schema.VectorIndexConfig | |
vectorIndexUserConfigLock sync.Mutex | |
getSchema schemaUC.SchemaGetter | |
logger logrus.FieldLogger | |
remote *sharding.RemoteIndex | |
stopwords *stopwords.Detector | |
replicator *replica.Replicator | |
invertedIndexConfig schema.InvertedIndexConfig | |
invertedIndexConfigLock sync.Mutex | |
// This lock should be used together with the db indexLock. | |
// | |
// The db indexlock locks the map that contains all indices against changes and should be used while iterating. | |
// This lock protects this specific index form being deleted while in use. Use Rlock to signal that it is in use. | |
// This way many goroutines can use a specific index in parallel. The delete-routine will try to acquire a RWlock. | |
// | |
// Usage: | |
// Lock the whole db using db.indexLock | |
// pick the indices you want and Rlock them | |
// unlock db.indexLock | |
// Use the indices | |
// RUnlock all picked indices | |
dropIndex sync.RWMutex | |
metrics *Metrics | |
centralJobQueue chan job | |
indexCheckpoints *indexcheckpoint.Checkpoints | |
partitioningEnabled bool | |
cycleCallbacks *indexCycleCallbacks | |
backupMutex backupMutex | |
lastBackup atomic.Pointer[BackupState] | |
// canceled when either Shutdown or Drop called | |
closingCtx context.Context | |
closingCancel context.CancelFunc | |
} | |
func (i *Index) GetShards() []ShardLike { | |
var out []ShardLike | |
i.shards.Range(func(_ string, shard ShardLike) error { | |
out = append(out, shard) | |
return nil | |
}) | |
return out | |
} | |
func (i *Index) ID() string { | |
return indexID(i.Config.ClassName) | |
} | |
func (i *Index) path() string { | |
return path.Join(i.Config.RootPath, i.ID()) | |
} | |
type nodeResolver interface { | |
NodeHostname(nodeName string) (string, bool) | |
} | |
// NewIndex creates an index with the specified amount of shards, using only | |
// the shards that are local to a node | |
func NewIndex(ctx context.Context, cfg IndexConfig, | |
shardState *sharding.State, invertedIndexConfig schema.InvertedIndexConfig, | |
vectorIndexUserConfig schema.VectorIndexConfig, sg schemaUC.SchemaGetter, | |
cs inverted.ClassSearcher, logger logrus.FieldLogger, | |
nodeResolver nodeResolver, remoteClient sharding.RemoteIndexClient, | |
replicaClient replica.Client, | |
promMetrics *monitoring.PrometheusMetrics, class *models.Class, jobQueueCh chan job, | |
indexCheckpoints *indexcheckpoint.Checkpoints, | |
) (*Index, error) { | |
sd, err := stopwords.NewDetectorFromConfig(invertedIndexConfig.Stopwords) | |
if err != nil { | |
return nil, errors.Wrap(err, "failed to create new index") | |
} | |
repl := replica.NewReplicator(cfg.ClassName.String(), | |
sg, nodeResolver, replicaClient, logger) | |
if cfg.QueryNestedRefLimit == 0 { | |
cfg.QueryNestedRefLimit = config.DefaultQueryNestedCrossReferenceLimit | |
} | |
index := &Index{ | |
Config: cfg, | |
getSchema: sg, | |
logger: logger, | |
classSearcher: cs, | |
vectorIndexUserConfig: vectorIndexUserConfig, | |
invertedIndexConfig: invertedIndexConfig, | |
stopwords: sd, | |
replicator: repl, | |
remote: sharding.NewRemoteIndex(cfg.ClassName.String(), sg, | |
nodeResolver, remoteClient), | |
metrics: NewMetrics(logger, promMetrics, cfg.ClassName.String(), "n/a"), | |
centralJobQueue: jobQueueCh, | |
partitioningEnabled: shardState.PartitioningEnabled, | |
backupMutex: backupMutex{log: logger, retryDuration: mutexRetryDuration, notifyDuration: mutexNotifyDuration}, | |
indexCheckpoints: indexCheckpoints, | |
} | |
index.closingCtx, index.closingCancel = context.WithCancel(context.Background()) | |
index.initCycleCallbacks() | |
if err := index.checkSingleShardMigration(shardState); err != nil { | |
return nil, errors.Wrap(err, "migrating sharding state from previous version") | |
} | |
eg := errgroup.Group{} | |
eg.SetLimit(_NUMCPU) | |
if err := os.MkdirAll(index.path(), os.ModePerm); err != nil { | |
return nil, fmt.Errorf("init index %q: %w", index.ID(), err) | |
} | |
if err := index.initAndStoreShards(ctx, shardState, class, promMetrics); err != nil { | |
return nil, err | |
} | |
index.cycleCallbacks.compactionCycle.Start() | |
index.cycleCallbacks.flushCycle.Start() | |
return index, nil | |
} | |
func (i *Index) initAndStoreShards(ctx context.Context, shardState *sharding.State, class *models.Class, | |
promMetrics *monitoring.PrometheusMetrics, | |
) error { | |
if i.Config.DisableLazyLoadShards { | |
eg := errgroup.Group{} | |
eg.SetLimit(_NUMCPU) | |
for _, shardName := range shardState.AllLocalPhysicalShards() { | |
physical := shardState.Physical[shardName] | |
if physical.ActivityStatus() != models.TenantActivityStatusHOT { | |
// do not instantiate inactive shard | |
continue | |
} | |
shardName := shardName // prevent loop variable capture | |
eg.Go(func() error { | |
shard, err := NewShard(ctx, promMetrics, shardName, i, class, i.centralJobQueue, i.indexCheckpoints, nil) | |
if err != nil { | |
return fmt.Errorf("init shard %s of index %s: %w", shardName, i.ID(), err) | |
} | |
i.shards.Store(shardName, shard) | |
return nil | |
}) | |
} | |
return eg.Wait() | |
} | |
for _, shardName := range shardState.AllLocalPhysicalShards() { | |
physical := shardState.Physical[shardName] | |
if physical.ActivityStatus() != models.TenantActivityStatusHOT { | |
// do not instantiate inactive shard | |
continue | |
} | |
shard := NewLazyLoadShard(ctx, promMetrics, shardName, i, class, i.centralJobQueue, i.indexCheckpoints) | |
i.shards.Store(shardName, shard) | |
} | |
go func() { | |
ticker := time.NewTicker(time.Second) | |
defer ticker.Stop() | |
i.ForEachShard(func(name string, shard ShardLike) error { | |
// prioritize closingCtx over ticker: | |
// check closing again in case of ticker was selected when both | |
// cases where available | |
select { | |
case <-i.closingCtx.Done(): | |
// break loop by returning error | |
return i.closingCtx.Err() | |
case <-ticker.C: | |
select { | |
case <-i.closingCtx.Done(): | |
// break loop by returning error | |
return i.closingCtx.Err() | |
default: | |
shard.(*LazyLoadShard).Load(context.Background()) | |
return nil | |
} | |
} | |
}) | |
}() | |
return nil | |
} | |
func (i *Index) initAndStoreShard(ctx context.Context, shardName string, class *models.Class, | |
promMetrics *monitoring.PrometheusMetrics, | |
) error { | |
shard, err := i.initShard(ctx, shardName, class, promMetrics) | |
if err != nil { | |
return err | |
} | |
i.shards.Store(shardName, shard) | |
return nil | |
} | |
func (i *Index) initShard(ctx context.Context, shardName string, class *models.Class, | |
promMetrics *monitoring.PrometheusMetrics, | |
) (ShardLike, error) { | |
if i.Config.DisableLazyLoadShards { | |
shard, err := NewShard(ctx, promMetrics, shardName, i, class, i.centralJobQueue, i.indexCheckpoints, nil) | |
if err != nil { | |
return nil, fmt.Errorf("init shard %s of index %s: %w", shardName, i.ID(), err) | |
} | |
return shard, nil | |
} | |
shard := NewLazyLoadShard(ctx, promMetrics, shardName, i, class, i.centralJobQueue, i.indexCheckpoints) | |
return shard, nil | |
} | |
// Iterate over all objects in the index, applying the callback function to each one. Adding or removing objects during iteration is not supported. | |
func (i *Index) IterateObjects(ctx context.Context, cb func(index *Index, shard ShardLike, object *storobj.Object) error) (err error) { | |
return i.ForEachShard(func(_ string, shard ShardLike) error { | |
wrapper := func(object *storobj.Object) error { | |
return cb(i, shard, object) | |
} | |
bucket := shard.Store().Bucket(helpers.ObjectsBucketLSM) | |
return bucket.IterateObjects(ctx, wrapper) | |
}) | |
} | |
func (i *Index) ForEachShard(f func(name string, shard ShardLike) error) error { | |
return i.shards.Range(f) | |
} | |
func (i *Index) ForEachShardConcurrently(f func(name string, shard ShardLike) error) error { | |
return i.shards.RangeConcurrently(f) | |
} | |
// Iterate over all objects in the shard, applying the callback function to each one. Adding or removing objects during iteration is not supported. | |
func (i *Index) IterateShards(ctx context.Context, cb func(index *Index, shard ShardLike) error) (err error) { | |
return i.ForEachShard(func(key string, shard ShardLike) error { | |
return cb(i, shard) | |
}) | |
} | |
func (i *Index) addProperty(ctx context.Context, prop *models.Property) error { | |
eg := &errgroup.Group{} | |
eg.SetLimit(_NUMCPU) | |
i.ForEachShard(func(key string, shard ShardLike) error { | |
shard.createPropertyIndex(ctx, prop, eg) | |
return nil | |
}) | |
if err := eg.Wait(); err != nil { | |
return errors.Wrapf(err, "extend idx '%s' with property '%s", i.ID(), prop.Name) | |
} | |
return nil | |
} | |
func (i *Index) addUUIDProperty(ctx context.Context) error { | |
return i.ForEachShard(func(name string, shard ShardLike) error { | |
err := shard.addIDProperty(ctx) | |
if err != nil { | |
return errors.Wrapf(err, "add id property to shard %q", name) | |
} | |
return nil | |
}) | |
} | |
func (i *Index) addDimensionsProperty(ctx context.Context) error { | |
return i.ForEachShard(func(name string, shard ShardLike) error { | |
if err := shard.addDimensionsProperty(ctx); err != nil { | |
return errors.Wrapf(err, "add dimensions property to shard %q", name) | |
} | |
return nil | |
}) | |
} | |
func (i *Index) addTimestampProperties(ctx context.Context) error { | |
return i.ForEachShard(func(name string, shard ShardLike) error { | |
if err := shard.addTimestampProperties(ctx); err != nil { | |
return errors.Wrapf(err, "add timestamp properties to shard %q", name) | |
} | |
return nil | |
}) | |
} | |
func (i *Index) updateVectorIndexConfig(ctx context.Context, | |
updated schema.VectorIndexConfig, | |
) error { | |
// an updated is not specific to one shard, but rather all | |
err := i.ForEachShard(func(name string, shard ShardLike) error { | |
// At the moment, we don't do anything in an update that could fail, but | |
// technically this should be part of some sort of a two-phase commit or | |
// have another way to rollback if we have updates that could potentially | |
// fail in the future. For now that's not a realistic risk. | |
if err := shard.UpdateVectorIndexConfig(ctx, updated); err != nil { | |
return errors.Wrapf(err, "shard %s", name) | |
} | |
return nil | |
}) | |
if err != nil { | |
return err | |
} | |
i.vectorIndexUserConfigLock.Lock() | |
defer i.vectorIndexUserConfigLock.Unlock() | |
i.vectorIndexUserConfig = updated | |
return nil | |
} | |
func (i *Index) getInvertedIndexConfig() schema.InvertedIndexConfig { | |
i.invertedIndexConfigLock.Lock() | |
defer i.invertedIndexConfigLock.Unlock() | |
return i.invertedIndexConfig | |
} | |
func (i *Index) updateInvertedIndexConfig(ctx context.Context, | |
updated schema.InvertedIndexConfig, | |
) error { | |
i.invertedIndexConfigLock.Lock() | |
defer i.invertedIndexConfigLock.Unlock() | |
i.invertedIndexConfig = updated | |
return nil | |
} | |
type IndexConfig struct { | |
RootPath string | |
ClassName schema.ClassName | |
QueryMaximumResults int64 | |
QueryNestedRefLimit int64 | |
ResourceUsage config.ResourceUsage | |
MemtablesFlushIdleAfter int | |
MemtablesInitialSizeMB int | |
MemtablesMaxSizeMB int | |
MemtablesMinActiveSeconds int | |
MemtablesMaxActiveSeconds int | |
ReplicationFactor int64 | |
AvoidMMap bool | |
DisableLazyLoadShards bool | |
TrackVectorDimensions bool | |
} | |
func indexID(class schema.ClassName) string { | |
return strings.ToLower(string(class)) | |
} | |
func (i *Index) determineObjectShard(id strfmt.UUID, tenant string) (string, error) { | |
className := i.Config.ClassName.String() | |
if tenant != "" { | |
if shard, status := i.getSchema.TenantShard(className, tenant); shard != "" { | |
if status == models.TenantActivityStatusHOT { | |
return shard, nil | |
} | |
return "", objects.NewErrMultiTenancy(fmt.Errorf("%w: '%s'", errTenantNotActive, tenant)) | |
} | |
return "", objects.NewErrMultiTenancy(fmt.Errorf("%w: %q", errTenantNotFound, tenant)) | |
} | |
uuid, err := uuid.Parse(id.String()) | |
if err != nil { | |
return "", fmt.Errorf("parse uuid: %q", id.String()) | |
} | |
uuidBytes, err := uuid.MarshalBinary() // cannot error | |
if err != nil { | |
return "", fmt.Errorf("marshal uuid: %q", id.String()) | |
} | |
return i.getSchema.ShardFromUUID(className, uuidBytes), nil | |
} | |
func (i *Index) putObject(ctx context.Context, object *storobj.Object, | |
replProps *additional.ReplicationProperties, | |
) error { | |
if err := i.validateMultiTenancy(object.Object.Tenant); err != nil { | |
return err | |
} | |
if i.Config.ClassName != object.Class() { | |
return fmt.Errorf("cannot import object of class %s into index of class %s", | |
object.Class(), i.Config.ClassName) | |
} | |
shardName, err := i.determineObjectShard(object.ID(), object.Object.Tenant) | |
if err != nil { | |
return objects.NewErrInvalidUserInput("determine shard: %v", err) | |
} | |
if i.replicationEnabled() { | |
if replProps == nil { | |
replProps = defaultConsistency() | |
} | |
cl := replica.ConsistencyLevel(replProps.ConsistencyLevel) | |
if err := i.replicator.PutObject(ctx, shardName, object, cl); err != nil { | |
return fmt.Errorf("replicate insertion: shard=%q: %w", shardName, err) | |
} | |
return nil | |
} | |
// no replication, remote shard | |
if i.localShard(shardName) == nil { | |
if err := i.remote.PutObject(ctx, shardName, object); err != nil { | |
return fmt.Errorf("put remote object: shard=%q: %w", shardName, err) | |
} | |
return nil | |
} | |
// no replication, local shard | |
i.backupMutex.RLock() | |
defer i.backupMutex.RUnlock() | |
err = errShardNotFound | |
if shard := i.localShard(shardName); shard != nil { // does shard still exist | |
err = shard.PutObject(ctx, object) | |
} | |
if err != nil { | |
return fmt.Errorf("put local object: shard=%q: %w", shardName, err) | |
} | |
return nil | |
} | |
func (i *Index) IncomingPutObject(ctx context.Context, shardName string, | |
object *storobj.Object, | |
) error { | |
i.backupMutex.RLock() | |
defer i.backupMutex.RUnlock() | |
localShard := i.localShard(shardName) | |
if localShard == nil { | |
return errShardNotFound | |
} | |
// This is a bit hacky, the problem here is that storobj.Parse() currently | |
// misses date fields as it has no way of knowing that a date-formatted | |
// string was actually a date type. However, adding this functionality to | |
// Parse() would break a lot of code, because it currently | |
// schema-independent. To find out if a field is a date or date[], we need to | |
// involve the schema, thus why we are doing it here. This was discovered as | |
// part of https://github.com/weaviate/weaviate/issues/1775 | |
if err := i.parseDateFieldsInProps(object.Object.Properties); err != nil { | |
return err | |
} | |
if err := localShard.PutObject(ctx, object); err != nil { | |
return err | |
} | |
return nil | |
} | |
func (i *Index) replicationEnabled() bool { | |
return i.Config.ReplicationFactor > 1 | |
} | |
// parseDateFieldsInProps checks the schema for the current class for which | |
// fields are date fields, then - if they are set - parses them accordingly. | |
// Works for both date and date[]. | |
func (i *Index) parseDateFieldsInProps(props interface{}) error { | |
if props == nil { | |
return nil | |
} | |
propMap, ok := props.(map[string]interface{}) | |
if !ok { | |
// don't know what to do with this | |
return nil | |
} | |
schemaModel := i.getSchema.GetSchemaSkipAuth().Objects | |
c, err := schema.GetClassByName(schemaModel, i.Config.ClassName.String()) | |
if err != nil { | |
return err | |
} | |
for _, prop := range c.Properties { | |
if prop.DataType[0] == string(schema.DataTypeDate) { | |
raw, ok := propMap[prop.Name] | |
if !ok { | |
// prop is not set, nothing to do | |
continue | |
} | |
parsed, err := parseAsStringToTime(raw) | |
if err != nil { | |
return errors.Wrapf(err, "time prop %q", prop.Name) | |
} | |
propMap[prop.Name] = parsed | |
} | |
if prop.DataType[0] == string(schema.DataTypeDateArray) { | |
raw, ok := propMap[prop.Name] | |
if !ok { | |
// prop is not set, nothing to do | |
continue | |
} | |
asSlice, ok := raw.([]string) | |
if !ok { | |
return errors.Errorf("parse as time array, expected []interface{} got %T", | |
raw) | |
} | |
parsedSlice := make([]interface{}, len(asSlice)) | |
for j := range asSlice { | |
parsed, err := parseAsStringToTime(interface{}(asSlice[j])) | |
if err != nil { | |
return errors.Wrapf(err, "time array prop %q at pos %d", prop.Name, j) | |
} | |
parsedSlice[j] = parsed | |
} | |
propMap[prop.Name] = parsedSlice | |
} | |
} | |
return nil | |
} | |
func parseAsStringToTime(in interface{}) (time.Time, error) { | |
var parsed time.Time | |
var err error | |
asString, ok := in.(string) | |
if !ok { | |
return parsed, errors.Errorf("parse as time, expected string got %T", in) | |
} | |
parsed, err = time.Parse(time.RFC3339, asString) | |
if err != nil { | |
return parsed, err | |
} | |
return parsed, nil | |
} | |
// return value []error gives the error for the index with the positions | |
// matching the inputs | |
func (i *Index) putObjectBatch(ctx context.Context, objects []*storobj.Object, | |
replProps *additional.ReplicationProperties, | |
) []error { | |
type objsAndPos struct { | |
objects []*storobj.Object | |
pos []int | |
} | |
out := make([]error, len(objects)) | |
if i.replicationEnabled() && replProps == nil { | |
replProps = defaultConsistency() | |
} | |
byShard := map[string]objsAndPos{} | |
for pos, obj := range objects { | |
if err := i.validateMultiTenancy(obj.Object.Tenant); err != nil { | |
out[pos] = err | |
continue | |
} | |
shardName, err := i.determineObjectShard(obj.ID(), obj.Object.Tenant) | |
if err != nil { | |
out[pos] = err | |
continue | |
} | |
group := byShard[shardName] | |
group.objects = append(group.objects, obj) | |
group.pos = append(group.pos, pos) | |
byShard[shardName] = group | |
} | |
wg := &sync.WaitGroup{} | |
for shardName, group := range byShard { | |
wg.Add(1) | |
go func(shardName string, group objsAndPos) { | |
defer wg.Done() | |
defer func() { | |
err := recover() | |
if err != nil { | |
for pos := range group.pos { | |
out[pos] = fmt.Errorf("an unexpected error occurred: %s", err) | |
} | |
fmt.Fprintf(os.Stderr, "panic: %s\n", err) | |
debug.PrintStack() | |
} | |
}() | |
var errs []error | |
if replProps != nil { | |
errs = i.replicator.PutObjects(ctx, shardName, group.objects, | |
replica.ConsistencyLevel(replProps.ConsistencyLevel)) | |
} else if i.localShard(shardName) == nil { | |
errs = i.remote.BatchPutObjects(ctx, shardName, group.objects) | |
} else { | |
i.backupMutex.RLockGuard(func() error { | |
if shard := i.localShard(shardName); shard != nil { | |
errs = shard.PutObjectBatch(ctx, group.objects) | |
} else { | |
errs = duplicateErr(errShardNotFound, len(group.objects)) | |
} | |
return nil | |
}) | |
} | |
for i, err := range errs { | |
desiredPos := group.pos[i] | |
out[desiredPos] = err | |
} | |
}(shardName, group) | |
} | |
wg.Wait() | |
return out | |
} | |
func duplicateErr(in error, count int) []error { | |
out := make([]error, count) | |
for i := range out { | |
out[i] = in | |
} | |
return out | |
} | |
func (i *Index) IncomingBatchPutObjects(ctx context.Context, shardName string, | |
objects []*storobj.Object, | |
) []error { | |
i.backupMutex.RLock() | |
defer i.backupMutex.RUnlock() | |
localShard := i.localShard(shardName) | |
if localShard == nil { | |
return duplicateErr(errShardNotFound, len(objects)) | |
} | |
// This is a bit hacky, the problem here is that storobj.Parse() currently | |
// misses date fields as it has no way of knowing that a date-formatted | |
// string was actually a date type. However, adding this functionality to | |
// Parse() would break a lot of code, because it currently | |
// schema-independent. To find out if a field is a date or date[], we need to | |
// involve the schema, thus why we are doing it here. This was discovered as | |
// part of https://github.com/weaviate/weaviate/issues/1775 | |
for j := range objects { | |
if err := i.parseDateFieldsInProps(objects[j].Object.Properties); err != nil { | |
return duplicateErr(err, len(objects)) | |
} | |
} | |
return localShard.PutObjectBatch(ctx, objects) | |
} | |
// return value map[int]error gives the error for the index as it received it | |
func (i *Index) AddReferencesBatch(ctx context.Context, refs objects.BatchReferences, | |
replProps *additional.ReplicationProperties, | |
) []error { | |
type refsAndPos struct { | |
refs objects.BatchReferences | |
pos []int | |
} | |
if i.replicationEnabled() && replProps == nil { | |
replProps = defaultConsistency() | |
} | |
byShard := map[string]refsAndPos{} | |
out := make([]error, len(refs)) | |
for pos, ref := range refs { | |
if err := i.validateMultiTenancy(ref.Tenant); err != nil { | |
out[pos] = err | |
continue | |
} | |
shardName, err := i.determineObjectShard(ref.From.TargetID, ref.Tenant) | |
if err != nil { | |
out[pos] = err | |
continue | |
} | |
group := byShard[shardName] | |
group.refs = append(group.refs, ref) | |
group.pos = append(group.pos, pos) | |
byShard[shardName] = group | |
} | |
for shardName, group := range byShard { | |
var errs []error | |
if i.replicationEnabled() { | |
errs = i.replicator.AddReferences(ctx, shardName, group.refs, | |
replica.ConsistencyLevel(replProps.ConsistencyLevel)) | |
} else if i.localShard(shardName) == nil { | |
errs = i.remote.BatchAddReferences(ctx, shardName, group.refs) | |
} else { | |
i.backupMutex.RLockGuard(func() error { | |
if shard := i.localShard(shardName); shard != nil { | |
errs = shard.AddReferencesBatch(ctx, group.refs) | |
} else { | |
errs = duplicateErr(errShardNotFound, len(group.refs)) | |
} | |
return nil | |
}) | |
} | |
for i, err := range errs { | |
desiredPos := group.pos[i] | |
out[desiredPos] = err | |
} | |
} | |
return out | |
} | |
func (i *Index) IncomingBatchAddReferences(ctx context.Context, shardName string, | |
refs objects.BatchReferences, | |
) []error { | |
i.backupMutex.RLock() | |
defer i.backupMutex.RUnlock() | |
localShard := i.localShard(shardName) | |
if localShard == nil { | |
return duplicateErr(errShardNotFound, len(refs)) | |
} | |
return localShard.AddReferencesBatch(ctx, refs) | |
} | |
func (i *Index) objectByID(ctx context.Context, id strfmt.UUID, | |
props search.SelectProperties, addl additional.Properties, | |
replProps *additional.ReplicationProperties, tenant string, | |
) (*storobj.Object, error) { | |
if err := i.validateMultiTenancy(tenant); err != nil { | |
return nil, err | |
} | |
shardName, err := i.determineObjectShard(id, tenant) | |
if err != nil { | |
switch err.(type) { | |
case objects.ErrMultiTenancy: | |
return nil, objects.NewErrMultiTenancy(fmt.Errorf("determine shard: %w", err)) | |
default: | |
return nil, objects.NewErrInvalidUserInput("determine shard: %v", err) | |
} | |
} | |
var obj *storobj.Object | |
if i.replicationEnabled() { | |
if replProps == nil { | |
replProps = defaultConsistency() | |
} | |
if replProps.NodeName != "" { | |
obj, err = i.replicator.NodeObject(ctx, replProps.NodeName, shardName, id, props, addl) | |
} else { | |
obj, err = i.replicator.GetOne(ctx, | |
replica.ConsistencyLevel(replProps.ConsistencyLevel), shardName, id, props, addl) | |
} | |
return obj, err | |
} | |
if shard := i.localShard(shardName); shard != nil { | |
obj, err = shard.ObjectByID(ctx, id, props, addl) | |
if err != nil { | |
return obj, fmt.Errorf("get local object: shard=%s: %w", shardName, err) | |
} | |
} else { | |
obj, err = i.remote.GetObject(ctx, shardName, id, props, addl) | |
if err != nil { | |
return obj, fmt.Errorf("get remote object: shard=%s: %w", shardName, err) | |
} | |
} | |
return obj, nil | |
} | |
func (i *Index) IncomingGetObject(ctx context.Context, shardName string, | |
id strfmt.UUID, props search.SelectProperties, | |
additional additional.Properties, | |
) (*storobj.Object, error) { | |
shard := i.localShard(shardName) | |
if shard == nil { | |
return nil, errShardNotFound | |
} | |
return shard.ObjectByID(ctx, id, props, additional) | |
} | |
func (i *Index) IncomingMultiGetObjects(ctx context.Context, shardName string, | |
ids []strfmt.UUID, | |
) ([]*storobj.Object, error) { | |
shard := i.localShard(shardName) | |
if shard == nil { | |
return nil, errShardNotFound | |
} | |
return shard.MultiObjectByID(ctx, wrapIDsInMulti(ids)) | |
} | |
func (i *Index) multiObjectByID(ctx context.Context, | |
query []multi.Identifier, tenant string, | |
) ([]*storobj.Object, error) { | |
if err := i.validateMultiTenancy(tenant); err != nil { | |
return nil, err | |
} | |
type idsAndPos struct { | |
ids []multi.Identifier | |
pos []int | |
} | |
byShard := map[string]idsAndPos{} | |
for pos, id := range query { | |
shardName, err := i.determineObjectShard(strfmt.UUID(id.ID), tenant) | |
if err != nil { | |
return nil, objects.NewErrInvalidUserInput("determine shard: %v", err) | |
} | |
group := byShard[shardName] | |
group.ids = append(group.ids, id) | |
group.pos = append(group.pos, pos) | |
byShard[shardName] = group | |
} | |
out := make([]*storobj.Object, len(query)) | |
for shardName, group := range byShard { | |
var objects []*storobj.Object | |
var err error | |
if shard := i.localShard(shardName); shard != nil { | |
objects, err = shard.MultiObjectByID(ctx, group.ids) | |
if err != nil { | |
return nil, errors.Wrapf(err, "shard %s", shard.ID()) | |
} | |
} else { | |
objects, err = i.remote.MultiGetObjects(ctx, shardName, extractIDsFromMulti(group.ids)) | |
if err != nil { | |
return nil, errors.Wrapf(err, "remote shard %s", shardName) | |
} | |
} | |
for i, obj := range objects { | |
desiredPos := group.pos[i] | |
out[desiredPos] = obj | |
} | |
} | |
return out, nil | |
} | |
func extractIDsFromMulti(in []multi.Identifier) []strfmt.UUID { | |
out := make([]strfmt.UUID, len(in)) | |
for i, id := range in { | |
out[i] = strfmt.UUID(id.ID) | |
} | |
return out | |
} | |
func wrapIDsInMulti(in []strfmt.UUID) []multi.Identifier { | |
out := make([]multi.Identifier, len(in)) | |
for i, id := range in { | |
out[i] = multi.Identifier{ID: string(id)} | |
} | |
return out | |
} | |
func (i *Index) exists(ctx context.Context, id strfmt.UUID, | |
replProps *additional.ReplicationProperties, tenant string, | |
) (bool, error) { | |
if err := i.validateMultiTenancy(tenant); err != nil { | |
return false, err | |
} | |
shardName, err := i.determineObjectShard(id, tenant) | |
if err != nil { | |
switch err.(type) { | |
case objects.ErrMultiTenancy: | |
return false, objects.NewErrMultiTenancy(fmt.Errorf("determine shard: %w", err)) | |
default: | |
return false, objects.NewErrInvalidUserInput("determine shard: %v", err) | |
} | |
} | |
var exists bool | |
if i.replicationEnabled() { | |
if replProps == nil { | |
replProps = defaultConsistency() | |
} | |
cl := replica.ConsistencyLevel(replProps.ConsistencyLevel) | |
return i.replicator.Exists(ctx, cl, shardName, id) | |
} | |
if shard := i.localShard(shardName); shard != nil { | |
exists, err = shard.Exists(ctx, id) | |
if err != nil { | |
err = fmt.Errorf("exists locally: shard=%q: %w", shardName, err) | |
} | |
} else { | |
exists, err = i.remote.Exists(ctx, shardName, id) | |
if err != nil { | |
err = fmt.Errorf("exists remotely: shard=%q: %w", shardName, err) | |
} | |
} | |
return exists, err | |
} | |
func (i *Index) IncomingExists(ctx context.Context, shardName string, | |
id strfmt.UUID, | |
) (bool, error) { | |
shard := i.localShard(shardName) | |
if shard == nil { | |
return false, errShardNotFound | |
} | |
return shard.Exists(ctx, id) | |
} | |
func (i *Index) objectSearch(ctx context.Context, limit int, filters *filters.LocalFilter, | |
keywordRanking *searchparams.KeywordRanking, sort []filters.Sort, cursor *filters.Cursor, | |
addlProps additional.Properties, replProps *additional.ReplicationProperties, tenant string, autoCut int, | |
) ([]*storobj.Object, []float32, error) { | |
if err := i.validateMultiTenancy(tenant); err != nil { | |
return nil, nil, err | |
} | |
shardNames, err := i.targetShardNames(tenant) | |
if err != nil || len(shardNames) == 0 { | |
return nil, nil, err | |
} | |
// If the request is a BM25F with no properties selected, use all possible properties | |
if keywordRanking != nil && keywordRanking.Type == "bm25" && len(keywordRanking.Properties) == 0 { | |
cl, err := schema.GetClassByName( | |
i.getSchema.GetSchemaSkipAuth().Objects, | |
i.Config.ClassName.String()) | |
if err != nil { | |
return nil, nil, err | |
} | |
propHash := cl.Properties | |
// Get keys of hash | |
for _, v := range propHash { | |
if inverted.PropertyHasSearchableIndex(i.getSchema.GetSchemaSkipAuth().Objects, | |
i.Config.ClassName.String(), v.Name) { | |
keywordRanking.Properties = append(keywordRanking.Properties, v.Name) | |
} | |
} | |
// WEAVIATE-471 - error if we can't find a property to search | |
if len(keywordRanking.Properties) == 0 { | |
return nil, []float32{}, errors.New( | |
"No properties provided, and no indexed properties found in class") | |
} | |
} | |
outObjects, outScores, err := i.objectSearchByShard(ctx, limit, | |
filters, keywordRanking, sort, cursor, addlProps, shardNames) | |
if err != nil { | |
return nil, nil, err | |
} | |
if len(outObjects) == len(outScores) { | |
if keywordRanking != nil && keywordRanking.Type == "bm25" { | |
for ii := range outObjects { | |
oo := outObjects[ii] | |
os := outScores[ii] | |
if oo.AdditionalProperties() == nil { | |
oo.Object.Additional = make(map[string]interface{}) | |
} | |
oo.Object.Additional["score"] = os | |
// Collect all keys starting with "BM25F" and add them to the Additional | |
if keywordRanking.AdditionalExplanations { | |
explainScore := "" | |
for k, v := range oo.Object.Additional { | |
if strings.HasPrefix(k, "BM25F") { | |
explainScore = fmt.Sprintf("%v, %v:%v", explainScore, k, v) | |
delete(oo.Object.Additional, k) | |
} | |
} | |
oo.Object.Additional["explainScore"] = explainScore | |
} | |
} | |
} | |
} | |
if len(sort) > 0 { | |
if len(shardNames) > 1 { | |
var err error | |
outObjects, outScores, err = i.sort(outObjects, outScores, sort, limit) | |
if err != nil { | |
return nil, nil, errors.Wrap(err, "sort") | |
} | |
} | |
} else if keywordRanking != nil { | |
outObjects, outScores = i.sortKeywordRanking(outObjects, outScores) | |
} else if len(shardNames) > 1 && !addlProps.ReferenceQuery { | |
// sort only for multiple shards (already sorted for single) | |
// and for not reference nested query (sort is applied for root query) | |
outObjects, outScores = i.sortByID(outObjects, outScores) | |
} | |
if autoCut > 0 { | |
cutOff := autocut.Autocut(outScores, autoCut) | |
outObjects = outObjects[:cutOff] | |
outScores = outScores[:cutOff] | |
} | |
// if this search was caused by a reference property | |
// search, we should not limit the number of results. | |
// for example, if the query contains a where filter | |
// whose operator is `And`, and one of the operands | |
// contains a path to a reference prop, the Search | |
// caused by such a ref prop being limited can cause | |
// the `And` to return no results where results would | |
// be expected. we won't know that unless we search | |
// and return all referenced object properties. | |
if !addlProps.ReferenceQuery && len(outObjects) > limit { | |
if len(outObjects) == len(outScores) { | |
outScores = outScores[:limit] | |
} | |
outObjects = outObjects[:limit] | |
} | |
if i.replicationEnabled() { | |
if replProps == nil { | |
replProps = defaultConsistency(replica.One) | |
} | |
l := replica.ConsistencyLevel(replProps.ConsistencyLevel) | |
err = i.replicator.CheckConsistency(ctx, l, outObjects) | |
if err != nil { | |
i.logger.WithField("action", "object_search"). | |
Errorf("failed to check consistency of search results: %v", err) | |
} | |
} | |
return outObjects, outScores, nil | |
} | |
func (i *Index) objectSearchByShard(ctx context.Context, limit int, filters *filters.LocalFilter, | |
keywordRanking *searchparams.KeywordRanking, sort []filters.Sort, cursor *filters.Cursor, | |
addlProps additional.Properties, shards []string, | |
) ([]*storobj.Object, []float32, error) { | |
resultObjects, resultScores := objectSearchPreallocate(limit, shards) | |
eg := errgroup.Group{} | |
eg.SetLimit(_NUMCPU * 2) | |
shardResultLock := sync.Mutex{} | |
for _, shardName := range shards { | |
shardName := shardName | |
eg.Go(func() error { | |
var ( | |
objs []*storobj.Object | |
scores []float32 | |
nodeName string | |
err error | |
) | |
if shard := i.localShard(shardName); shard != nil { | |
nodeName = i.getSchema.NodeName() | |
objs, scores, err = shard.ObjectSearch(ctx, limit, filters, keywordRanking, sort, cursor, addlProps) | |
if err != nil { | |
return fmt.Errorf( | |
"local shard object search %s: %w", shard.ID(), err) | |
} | |
} else { | |
objs, scores, nodeName, err = i.remote.SearchShard( | |
ctx, shardName, nil, limit, filters, keywordRanking, | |
sort, cursor, nil, addlProps, i.replicationEnabled()) | |
if err != nil { | |
return fmt.Errorf( | |
"remote shard object search %s: %w", shardName, err) | |
} | |
} | |
if i.replicationEnabled() { | |
storobj.AddOwnership(objs, nodeName, shardName) | |
} | |
shardResultLock.Lock() | |
resultObjects = append(resultObjects, objs...) | |
resultScores = append(resultScores, scores...) | |
shardResultLock.Unlock() | |
return nil | |
}) | |
} | |
if err := eg.Wait(); err != nil { | |
return nil, nil, err | |
} | |
if len(resultObjects) == len(resultScores) { | |
// Force a stable sort order by UUID | |
type resultSortable struct { | |
object *storobj.Object | |
score float32 | |
} | |
objs := resultObjects | |
scores := resultScores | |
var results []resultSortable = make([]resultSortable, len(objs)) | |
for i := range objs { | |
results[i] = resultSortable{ | |
object: objs[i], | |
score: scores[i], | |
} | |
} | |
golangSort.Slice(results, func(i, j int) bool { | |
if results[i].score == results[j].score { | |
return results[i].object.Object.ID > results[j].object.Object.ID | |
} | |
return results[i].score > results[j].score | |
}) | |
var finalObjs []*storobj.Object = make([]*storobj.Object, len(results)) | |
var finalScores []float32 = make([]float32, len(results)) | |
for i, result := range results { | |
finalObjs[i] = result.object | |
finalScores[i] = result.score | |
} | |
return finalObjs, finalScores, nil | |
} | |
return resultObjects, resultScores, nil | |
} | |
func (i *Index) sortByID(objects []*storobj.Object, scores []float32, | |
) ([]*storobj.Object, []float32) { | |
return newIDSorter().sort(objects, scores) | |
} | |
func (i *Index) sortKeywordRanking(objects []*storobj.Object, | |
scores []float32, | |
) ([]*storobj.Object, []float32) { | |
return newScoresSorter().sort(objects, scores) | |
} | |
func (i *Index) sort(objects []*storobj.Object, scores []float32, | |
sort []filters.Sort, limit int, | |
) ([]*storobj.Object, []float32, error) { | |
return sorter.NewObjectsSorter(i.getSchema.GetSchemaSkipAuth()). | |
Sort(objects, scores, limit, sort) | |
} | |
func (i *Index) mergeGroups(objects []*storobj.Object, dists []float32, | |
groupBy *searchparams.GroupBy, limit, shardCount int, | |
) ([]*storobj.Object, []float32, error) { | |
return newGroupMerger(objects, dists, groupBy).Do() | |
} | |
func (i *Index) singleLocalShardObjectVectorSearch(ctx context.Context, searchVector []float32, | |
dist float32, limit int, filters *filters.LocalFilter, | |
sort []filters.Sort, groupBy *searchparams.GroupBy, additional additional.Properties, | |
shardName string, | |
) ([]*storobj.Object, []float32, error) { | |
shard := i.localShard(shardName) | |
res, resDists, err := shard.ObjectVectorSearch( | |
ctx, searchVector, dist, limit, filters, sort, groupBy, additional) | |
if err != nil { | |
return nil, nil, errors.Wrapf(err, "shard %s", shard.ID()) | |
} | |
return res, resDists, nil | |
} | |
// to be called after validating multi-tenancy | |
func (i *Index) targetShardNames(tenant string) ([]string, error) { | |
className := i.Config.ClassName.String() | |
if !i.partitioningEnabled { | |
shardingState := i.getSchema.CopyShardingState(className) | |
return shardingState.AllPhysicalShards(), nil | |
} | |
if tenant != "" { | |
if shard, status := i.getSchema.TenantShard(className, tenant); shard != "" { | |
if status == models.TenantActivityStatusHOT { | |
return []string{shard}, nil | |
} | |
return nil, objects.NewErrMultiTenancy(fmt.Errorf("%w: '%s'", errTenantNotActive, tenant)) | |
} | |
} | |
return nil, objects.NewErrMultiTenancy(fmt.Errorf("%w: %q", errTenantNotFound, tenant)) | |
} | |
func (i *Index) objectVectorSearch(ctx context.Context, searchVector []float32, | |
dist float32, limit int, filters *filters.LocalFilter, sort []filters.Sort, | |
groupBy *searchparams.GroupBy, additional additional.Properties, | |
replProps *additional.ReplicationProperties, tenant string, | |
) ([]*storobj.Object, []float32, error) { | |
if err := i.validateMultiTenancy(tenant); err != nil { | |
return nil, nil, err | |
} | |
shardNames, err := i.targetShardNames(tenant) | |
if err != nil || len(shardNames) == 0 { | |
return nil, nil, err | |
} | |
if len(shardNames) == 1 { | |
if i.localShard(shardNames[0]) != nil { | |
return i.singleLocalShardObjectVectorSearch(ctx, searchVector, dist, limit, filters, | |
sort, groupBy, additional, shardNames[0]) | |
} | |
} | |
// a limit of -1 is used to signal a search by distance. if that is | |
// the case we have to adjust how we calculate the output capacity | |
var shardCap int | |
if limit < 0 { | |
shardCap = len(shardNames) * hnsw.DefaultSearchByDistInitialLimit | |
} else { | |
shardCap = len(shardNames) * limit | |
} | |
eg := &errgroup.Group{} | |
eg.SetLimit(_NUMCPU * 2) | |
m := &sync.Mutex{} | |
out := make([]*storobj.Object, 0, shardCap) | |
dists := make([]float32, 0, shardCap) | |
for _, shardName := range shardNames { | |
shardName := shardName | |
eg.Go(func() error { | |
var ( | |
res []*storobj.Object | |
resDists []float32 | |
nodeName string | |
err error | |
) | |
if shard := i.localShard(shardName); shard != nil { | |
nodeName = i.getSchema.NodeName() | |
res, resDists, err = shard.ObjectVectorSearch( | |
ctx, searchVector, dist, limit, filters, sort, groupBy, additional) | |
if err != nil { | |
return errors.Wrapf(err, "shard %s", shard.ID()) | |
} | |
} else { | |
res, resDists, nodeName, err = i.remote.SearchShard(ctx, | |
shardName, searchVector, limit, filters, | |
nil, sort, nil, groupBy, additional, i.replicationEnabled()) | |
if err != nil { | |
return errors.Wrapf(err, "remote shard %s", shardName) | |
} | |
} | |
if i.replicationEnabled() { | |
storobj.AddOwnership(res, nodeName, shardName) | |
} | |
m.Lock() | |
out = append(out, res...) | |
dists = append(dists, resDists...) | |
m.Unlock() | |
return nil | |
}) | |
} | |
if err := eg.Wait(); err != nil { | |
return nil, nil, err | |
} | |
if len(shardNames) == 1 { | |
return out, dists, nil | |
} | |
if len(shardNames) > 1 && groupBy != nil { | |
return i.mergeGroups(out, dists, groupBy, limit, len(shardNames)) | |
} | |
if len(shardNames) > 1 && len(sort) > 0 { | |
return i.sort(out, dists, sort, limit) | |
} | |
out, dists = newDistancesSorter().sort(out, dists) | |
if limit > 0 && len(out) > limit { | |
out = out[:limit] | |
dists = dists[:limit] | |
} | |
if i.replicationEnabled() { | |
if replProps == nil { | |
replProps = defaultConsistency(replica.One) | |
} | |
l := replica.ConsistencyLevel(replProps.ConsistencyLevel) | |
err = i.replicator.CheckConsistency(ctx, l, out) | |
if err != nil { | |
i.logger.WithField("action", "object_vector_search"). | |
Errorf("failed to check consistency of search results: %v", err) | |
} | |
} | |
return out, dists, nil | |
} | |
func (i *Index) IncomingSearch(ctx context.Context, shardName string, | |
searchVector []float32, distance float32, limit int, filters *filters.LocalFilter, | |
keywordRanking *searchparams.KeywordRanking, sort []filters.Sort, | |
cursor *filters.Cursor, groupBy *searchparams.GroupBy, | |
additional additional.Properties, | |
) ([]*storobj.Object, []float32, error) { | |
shard := i.localShard(shardName) | |
if shard == nil { | |
return nil, nil, errShardNotFound | |
} | |
if searchVector == nil { | |
res, scores, err := shard.ObjectSearch(ctx, limit, filters, keywordRanking, sort, cursor, additional) | |
if err != nil { | |
return nil, nil, err | |
} | |
return res, scores, nil | |
} | |
res, resDists, err := shard.ObjectVectorSearch( | |
ctx, searchVector, distance, limit, filters, sort, groupBy, additional) | |
if err != nil { | |
return nil, nil, errors.Wrapf(err, "shard %s", shard.ID()) | |
} | |
return res, resDists, nil | |
} | |
func (i *Index) deleteObject(ctx context.Context, id strfmt.UUID, | |
replProps *additional.ReplicationProperties, tenant string, | |
) error { | |
if err := i.validateMultiTenancy(tenant); err != nil { | |
return err | |
} | |
shardName, err := i.determineObjectShard(id, tenant) | |
if err != nil { | |
return objects.NewErrInvalidUserInput("determine shard: %v", err) | |
} | |
if i.replicationEnabled() { | |
if replProps == nil { | |
replProps = defaultConsistency() | |
} | |
cl := replica.ConsistencyLevel(replProps.ConsistencyLevel) | |
if err := i.replicator.DeleteObject(ctx, shardName, id, cl); err != nil { | |
return fmt.Errorf("replicate deletion: shard=%q %w", shardName, err) | |
} | |
return nil | |
} | |
// no replication, remote shard | |
if i.localShard(shardName) == nil { | |
if err := i.remote.DeleteObject(ctx, shardName, id); err != nil { | |
return fmt.Errorf("delete remote object: shard=%q: %w", shardName, err) | |
} | |
return nil | |
} | |
// no replication, local shard | |
i.backupMutex.RLock() | |
defer i.backupMutex.RUnlock() | |
err = errShardNotFound | |
if shard := i.localShard(shardName); shard != nil { | |
err = shard.DeleteObject(ctx, id) | |
} | |
if err != nil { | |
return fmt.Errorf("delete local object: shard=%q: %w", shardName, err) | |
} | |
return nil | |
} | |
func (i *Index) IncomingDeleteObject(ctx context.Context, shardName string, | |
id strfmt.UUID, | |
) error { | |
i.backupMutex.RLock() | |
defer i.backupMutex.RUnlock() | |
shard := i.localShard(shardName) | |
if shard == nil { | |
return errShardNotFound | |
} | |
return shard.DeleteObject(ctx, id) | |
} | |
func (i *Index) localShard(name string) ShardLike { | |
return i.shards.Load(name) | |
} | |
func (i *Index) mergeObject(ctx context.Context, merge objects.MergeDocument, | |
replProps *additional.ReplicationProperties, tenant string, | |
) error { | |
if err := i.validateMultiTenancy(tenant); err != nil { | |
return err | |
} | |
shardName, err := i.determineObjectShard(merge.ID, tenant) | |
if err != nil { | |
return objects.NewErrInvalidUserInput("determine shard: %v", err) | |
} | |
if i.replicationEnabled() { | |
if replProps == nil { | |
replProps = defaultConsistency() | |
} | |
cl := replica.ConsistencyLevel(replProps.ConsistencyLevel) | |
if err := i.replicator.MergeObject(ctx, shardName, &merge, cl); err != nil { | |
return fmt.Errorf("replicate single update: %w", err) | |
} | |
return nil | |
} | |
// no replication, remote shard | |
if i.localShard(shardName) == nil { | |
if err := i.remote.MergeObject(ctx, shardName, merge); err != nil { | |
return fmt.Errorf("update remote object: shard=%q: %w", shardName, err) | |
} | |
return nil | |
} | |
// no replication, local shard | |
i.backupMutex.RLock() | |
defer i.backupMutex.RUnlock() | |
err = errShardNotFound | |
if shard := i.localShard(shardName); shard != nil { | |
err = shard.MergeObject(ctx, merge) | |
} | |
if err != nil { | |
return fmt.Errorf("update local object: shard=%q: %w", shardName, err) | |
} | |
return nil | |
} | |
func (i *Index) IncomingMergeObject(ctx context.Context, shardName string, | |
mergeDoc objects.MergeDocument, | |
) error { | |
i.backupMutex.RLock() | |
defer i.backupMutex.RUnlock() | |
shard := i.localShard(shardName) | |
if shard == nil { | |
return errShardNotFound | |
} | |
return shard.MergeObject(ctx, mergeDoc) | |
} | |
func (i *Index) aggregate(ctx context.Context, | |
params aggregation.Params, | |
) (*aggregation.Result, error) { | |
if err := i.validateMultiTenancy(params.Tenant); err != nil { | |
return nil, err | |
} | |
shardNames, err := i.targetShardNames(params.Tenant) | |
if err != nil || len(shardNames) == 0 { | |
return nil, err | |
} | |
results := make([]*aggregation.Result, len(shardNames)) | |
for j, shardName := range shardNames { | |
var err error | |
var res *aggregation.Result | |
if shard := i.localShard(shardName); shard != nil { | |
res, err = shard.Aggregate(ctx, params) | |
} else { | |
res, err = i.remote.Aggregate(ctx, shardName, params) | |
} | |
if err != nil { | |
return nil, errors.Wrapf(err, "shard %s", shardName) | |
} | |
results[j] = res | |
} | |
return aggregator.NewShardCombiner().Do(results), nil | |
} | |
func (i *Index) IncomingAggregate(ctx context.Context, shardName string, | |
params aggregation.Params, | |
) (*aggregation.Result, error) { | |
shard := i.localShard(shardName) | |
if shard == nil { | |
return nil, errShardNotFound | |
} | |
return shard.Aggregate(ctx, params) | |
} | |
func (i *Index) drop() error { | |
i.closingCancel() | |
var eg errgroup.Group | |
eg.SetLimit(_NUMCPU * 2) | |
fields := logrus.Fields{"action": "drop_shard", "class": i.Config.ClassName} | |
dropShard := func(name string, shard ShardLike) error { | |
if shard == nil { | |
return nil | |
} | |
eg.Go(func() error { | |
if err := shard.drop(); err != nil { | |
logrus.WithFields(fields).WithField("id", shard.ID()).Error(err) | |
} | |
return nil | |
}) | |
return nil | |
} | |
i.backupMutex.RLock() | |
defer i.backupMutex.RUnlock() | |
i.shards.Range(dropShard) | |
if err := eg.Wait(); err != nil { | |
return err | |
} | |
// Dropping the shards only unregisters the shards callbacks, but we still | |
// need to stop the cycle managers that those shards used to register with. | |
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) | |
defer cancel() | |
if err := i.stopCycleManagers(ctx, "drop"); err != nil { | |
return err | |
} | |
return os.RemoveAll(i.path()) | |
} | |
// dropShards deletes shards in a transactional manner. | |
// To confirm the deletion, the user must call Commit(true). | |
// To roll back the deletion, the user must call Commit(false) | |
func (i *Index) dropShards(names []string) (commit func(success bool), err error) { | |
shards := make(map[string]ShardLike, len(names)) | |
i.backupMutex.RLock() | |
defer i.backupMutex.RUnlock() | |
// mark deleted shards | |
for _, name := range names { | |
prev, ok := i.shards.Swap(name, nil) // mark | |
if !ok { // shard doesn't exit | |
i.shards.LoadAndDelete(name) // rollback nil value created by swap() | |
continue | |
} | |
if prev != nil { | |
shards[name] = prev | |
} | |
} | |
rollback := func() { | |
for name, shard := range shards { | |
i.shards.CompareAndSwap(name, nil, shard) | |
} | |
} | |
var eg errgroup.Group | |
eg.SetLimit(_NUMCPU * 2) | |
commit = func(success bool) { | |
if !success { | |
rollback() | |
return | |
} | |
// detach shards | |
for name := range shards { | |
i.shards.LoadAndDelete(name) | |
} | |
// drop shards | |
for _, shard := range shards { | |
shard := shard | |
eg.Go(func() error { | |
if err := shard.drop(); err != nil { | |
i.logger.WithField("action", "drop_shard"). | |
WithField("shard", shard.ID()).Error(err) | |
} | |
return nil | |
}) | |
} | |
} | |
return commit, eg.Wait() | |
} | |
func (i *Index) Shutdown(ctx context.Context) error { | |
i.closingCancel() | |
i.backupMutex.RLock() | |
defer i.backupMutex.RUnlock() | |
// TODO allow every resource cleanup to run, before returning early with error | |
if err := i.ForEachShardConcurrently(func(name string, shard ShardLike) error { | |
if err := shard.Shutdown(ctx); err != nil { | |
return errors.Wrapf(err, "shutdown shard %q", name) | |
} | |
return nil | |
}); err != nil { | |
return err | |
} | |
if err := i.stopCycleManagers(ctx, "shutdown"); err != nil { | |
return err | |
} | |
return nil | |
} | |
func (i *Index) stopCycleManagers(ctx context.Context, usecase string) error { | |
if err := i.cycleCallbacks.compactionCycle.StopAndWait(ctx); err != nil { | |
return fmt.Errorf("%s: stop compaction cycle: %w", usecase, err) | |
} | |
if err := i.cycleCallbacks.flushCycle.StopAndWait(ctx); err != nil { | |
return fmt.Errorf("%s: stop flush cycle: %w", usecase, err) | |
} | |
if err := i.cycleCallbacks.vectorCommitLoggerCycle.StopAndWait(ctx); err != nil { | |
return fmt.Errorf("%s: stop vector commit logger cycle: %w", usecase, err) | |
} | |
if err := i.cycleCallbacks.vectorTombstoneCleanupCycle.StopAndWait(ctx); err != nil { | |
return fmt.Errorf("%s: stop vector tombstone cleanup cycle: %w", usecase, err) | |
} | |
if err := i.cycleCallbacks.geoPropsCommitLoggerCycle.StopAndWait(ctx); err != nil { | |
return fmt.Errorf("%s: stop geo props commit logger cycle: %w", usecase, err) | |
} | |
if err := i.cycleCallbacks.geoPropsTombstoneCleanupCycle.StopAndWait(ctx); err != nil { | |
return fmt.Errorf("%s: stop geo props tombstone cleanup cycle: %w", usecase, err) | |
} | |
return nil | |
} | |
func (i *Index) getShardsQueueSize(ctx context.Context, tenant string) (map[string]int64, error) { | |
shardsQueueSize := make(map[string]int64) | |
shardState := i.getSchema.CopyShardingState(i.Config.ClassName.String()) | |
shardNames := shardState.AllPhysicalShards() | |
for _, shardName := range shardNames { | |
if tenant != "" && shardName != tenant { | |
continue | |
} | |
var err error | |
var size int64 | |
if !shardState.IsLocalShard(shardName) { | |
size, err = i.remote.GetShardQueueSize(ctx, shardName) | |
} else { | |
shard := i.localShard(shardName) | |
if shard == nil { | |
err = errors.Errorf("shard %s does not exist", shardName) | |
} else { | |
size = shard.Queue().Size() | |
} | |
} | |
if err != nil { | |
return nil, errors.Wrapf(err, "shard %s", shardName) | |
} | |
shardsQueueSize[shardName] = size | |
} | |
return shardsQueueSize, nil | |
} | |
func (i *Index) IncomingGetShardQueueSize(ctx context.Context, shardName string) (int64, error) { | |
shard := i.localShard(shardName) | |
if shard == nil { | |
return 0, errShardNotFound | |
} | |
return shard.Queue().Size(), nil | |
} | |
func (i *Index) getShardsStatus(ctx context.Context, tenant string) (map[string]string, error) { | |
shardsStatus := make(map[string]string) | |
shardState := i.getSchema.CopyShardingState(i.Config.ClassName.String()) | |
shardNames := shardState.AllPhysicalShards() | |
for _, shardName := range shardNames { | |
if tenant != "" && shardName != tenant { | |
continue | |
} | |
var err error | |
var status string | |
if !shardState.IsLocalShard(shardName) { | |
status, err = i.remote.GetShardStatus(ctx, shardName) | |
} else { | |
shard := i.localShard(shardName) | |
if shard == nil { | |
err = errors.Errorf("shard %s does not exist", shardName) | |
} else { | |
status = shard.GetStatus().String() | |
} | |
} | |
if err != nil { | |
return nil, errors.Wrapf(err, "shard %s", shardName) | |
} | |
shardsStatus[shardName] = status | |
} | |
return shardsStatus, nil | |
} | |
func (i *Index) IncomingGetShardStatus(ctx context.Context, shardName string) (string, error) { | |
shard := i.localShard(shardName) | |
if shard == nil { | |
return "", errShardNotFound | |
} | |
return shard.GetStatus().String(), nil | |
} | |
func (i *Index) updateShardStatus(ctx context.Context, shardName, targetStatus string) error { | |
if shard := i.localShard(shardName); shard != nil { | |
return shard.UpdateStatus(targetStatus) | |
} | |
return i.remote.UpdateShardStatus(ctx, shardName, targetStatus) | |
} | |
func (i *Index) IncomingUpdateShardStatus(ctx context.Context, shardName, targetStatus string) error { | |
shard := i.localShard(shardName) | |
if shard == nil { | |
return errShardNotFound | |
} | |
return shard.UpdateStatus(targetStatus) | |
} | |
func (i *Index) notifyReady() { | |
i.ForEachShard(func(name string, shard ShardLike) error { | |
shard.NotifyReady() | |
return nil | |
}) | |
} | |
func (i *Index) findUUIDs(ctx context.Context, | |
filters *filters.LocalFilter, tenant string, | |
) (map[string][]strfmt.UUID, error) { | |
before := time.Now() | |
defer i.metrics.BatchDelete(before, "filter_total") | |
if err := i.validateMultiTenancy(tenant); err != nil { | |
return nil, err | |
} | |
shardNames, err := i.targetShardNames(tenant) | |
if err != nil { | |
return nil, err | |
} | |
results := make(map[string][]strfmt.UUID) | |
for _, shardName := range shardNames { | |
var err error | |
var res []strfmt.UUID | |
if shard := i.localShard(shardName); shard != nil { | |
res, err = shard.FindUUIDs(ctx, filters) | |
} else { | |
res, err = i.remote.FindUUIDs(ctx, shardName, filters) | |
} | |
if err != nil { | |
return nil, fmt.Errorf("find matching doc ids in shard %q: %w", shardName, err) | |
} | |
results[shardName] = res | |
} | |
return results, nil | |
} | |
func (i *Index) IncomingFindUUIDs(ctx context.Context, shardName string, | |
filters *filters.LocalFilter, | |
) ([]strfmt.UUID, error) { | |
shard := i.localShard(shardName) | |
if shard == nil { | |
return nil, errShardNotFound | |
} | |
return shard.FindUUIDs(ctx, filters) | |
} | |
func (i *Index) batchDeleteObjects(ctx context.Context, shardUUIDs map[string][]strfmt.UUID, | |
dryRun bool, replProps *additional.ReplicationProperties, | |
) (objects.BatchSimpleObjects, error) { | |
before := time.Now() | |
defer i.metrics.BatchDelete(before, "delete_from_shards_total") | |
type result struct { | |
objs objects.BatchSimpleObjects | |
} | |
if i.replicationEnabled() && replProps == nil { | |
replProps = defaultConsistency() | |
} | |
wg := &sync.WaitGroup{} | |
ch := make(chan result, len(shardUUIDs)) | |
for shardName, uuids := range shardUUIDs { | |
uuids := uuids | |
wg.Add(1) | |
go func(shardName string, uuids []strfmt.UUID) { | |
defer wg.Done() | |
var objs objects.BatchSimpleObjects | |
if i.replicationEnabled() { | |
objs = i.replicator.DeleteObjects(ctx, shardName, uuids, | |
dryRun, replica.ConsistencyLevel(replProps.ConsistencyLevel)) | |
} else if i.localShard(shardName) == nil { | |
objs = i.remote.DeleteObjectBatch(ctx, shardName, uuids, dryRun) | |
} else { | |
i.backupMutex.RLockGuard(func() error { | |
if shard := i.localShard(shardName); shard != nil { | |
objs = shard.DeleteObjectBatch(ctx, uuids, dryRun) | |
} else { | |
objs = objects.BatchSimpleObjects{objects.BatchSimpleObject{Err: errShardNotFound}} | |
} | |
return nil | |
}) | |
} | |
ch <- result{objs} | |
}(shardName, uuids) | |
} | |
wg.Wait() | |
close(ch) | |
var out objects.BatchSimpleObjects | |
for res := range ch { | |
out = append(out, res.objs...) | |
} | |
return out, nil | |
} | |
func (i *Index) IncomingDeleteObjectBatch(ctx context.Context, shardName string, | |
uuids []strfmt.UUID, dryRun bool, | |
) objects.BatchSimpleObjects { | |
i.backupMutex.RLock() | |
defer i.backupMutex.RUnlock() | |
shard := i.localShard(shardName) | |
if shard == nil { | |
return objects.BatchSimpleObjects{ | |
objects.BatchSimpleObject{Err: errShardNotFound}, | |
} | |
} | |
return shard.DeleteObjectBatch(ctx, uuids, dryRun) | |
} | |
func defaultConsistency(l ...replica.ConsistencyLevel) *additional.ReplicationProperties { | |
rp := &additional.ReplicationProperties{} | |
if len(l) != 0 { | |
rp.ConsistencyLevel = string(l[0]) | |
} else { | |
rp.ConsistencyLevel = string(replica.Quorum) | |
} | |
return rp | |
} | |
func objectSearchPreallocate(limit int, shards []string) ([]*storobj.Object, []float32) { | |
perShardLimit := config.DefaultQueryMaximumResults | |
if perShardLimit > int64(limit) { | |
perShardLimit = int64(limit) | |
} | |
capacity := perShardLimit * int64(len(shards)) | |
objects := make([]*storobj.Object, 0, capacity) | |
scores := make([]float32, 0, capacity) | |
return objects, scores | |
} | |
func (i *Index) addNewShard(ctx context.Context, | |
class *models.Class, shardName string, | |
) error { | |
if shard := i.localShard(shardName); shard != nil { | |
return fmt.Errorf("shard %q exists already", shardName) | |
} | |
// TODO: metrics | |
return i.initAndStoreShard(ctx, shardName, class, i.metrics.baseMetrics) | |
} | |
func (i *Index) validateMultiTenancy(tenant string) error { | |
if i.partitioningEnabled && tenant == "" { | |
return objects.NewErrMultiTenancy( | |
fmt.Errorf("class %s has multi-tenancy enabled, but request was without tenant", i.Config.ClassName), | |
) | |
} else if !i.partitioningEnabled && tenant != "" { | |
return objects.NewErrMultiTenancy( | |
fmt.Errorf("class %s has multi-tenancy disabled, but request was with tenant", i.Config.ClassName), | |
) | |
} | |
return nil | |
} | |