Spaces:
Running
Running
// _ _ | |
// __ _____ __ ___ ___ __ _| |_ ___ | |
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
// \ V V / __/ (_| |\ V /| | (_| | || __/ | |
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
// | |
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. | |
// | |
// CONTACT: [email protected] | |
// | |
package sharding | |
import ( | |
"context" | |
"fmt" | |
"io" | |
"math/rand" | |
"github.com/go-openapi/strfmt" | |
"github.com/pkg/errors" | |
"github.com/weaviate/weaviate/entities/additional" | |
"github.com/weaviate/weaviate/entities/aggregation" | |
"github.com/weaviate/weaviate/entities/filters" | |
"github.com/weaviate/weaviate/entities/search" | |
"github.com/weaviate/weaviate/entities/searchparams" | |
"github.com/weaviate/weaviate/entities/storobj" | |
"github.com/weaviate/weaviate/usecases/objects" | |
) | |
type RemoteIndex struct { | |
class string | |
stateGetter shardingStateGetter | |
client RemoteIndexClient | |
nodeResolver nodeResolver | |
} | |
type shardingStateGetter interface { | |
// ShardOwner returns id of owner node | |
ShardOwner(class, shard string) (string, error) | |
ShardReplicas(class, shard string) ([]string, error) | |
} | |
func NewRemoteIndex(className string, | |
stateGetter shardingStateGetter, nodeResolver nodeResolver, | |
client RemoteIndexClient, | |
) *RemoteIndex { | |
return &RemoteIndex{ | |
class: className, | |
stateGetter: stateGetter, | |
client: client, | |
nodeResolver: nodeResolver, | |
} | |
} | |
type nodeResolver interface { | |
NodeHostname(nodeName string) (string, bool) | |
} | |
type RemoteIndexClient interface { | |
PutObject(ctx context.Context, hostName, indexName, shardName string, | |
obj *storobj.Object) error | |
BatchPutObjects(ctx context.Context, hostName, indexName, shardName string, | |
objs []*storobj.Object, repl *additional.ReplicationProperties) []error | |
BatchAddReferences(ctx context.Context, hostName, indexName, shardName string, | |
refs objects.BatchReferences) []error | |
GetObject(ctx context.Context, hostname, indexName, shardName string, | |
id strfmt.UUID, props search.SelectProperties, | |
additional additional.Properties) (*storobj.Object, error) | |
Exists(ctx context.Context, hostname, indexName, shardName string, | |
id strfmt.UUID) (bool, error) | |
DeleteObject(ctx context.Context, hostname, indexName, shardName string, | |
id strfmt.UUID) error | |
MergeObject(ctx context.Context, hostname, indexName, shardName string, | |
mergeDoc objects.MergeDocument) error | |
MultiGetObjects(ctx context.Context, hostname, indexName, shardName string, | |
ids []strfmt.UUID) ([]*storobj.Object, error) | |
SearchShard(ctx context.Context, hostname, indexName, shardName string, | |
searchVector []float32, limit int, filters *filters.LocalFilter, | |
keywordRanking *searchparams.KeywordRanking, sort []filters.Sort, | |
cursor *filters.Cursor, groupBy *searchparams.GroupBy, | |
additional additional.Properties, | |
) ([]*storobj.Object, []float32, error) | |
Aggregate(ctx context.Context, hostname, indexName, shardName string, | |
params aggregation.Params) (*aggregation.Result, error) | |
FindUUIDs(ctx context.Context, hostName, indexName, shardName string, | |
filters *filters.LocalFilter) ([]strfmt.UUID, error) | |
DeleteObjectBatch(ctx context.Context, hostName, indexName, shardName string, | |
uuids []strfmt.UUID, dryRun bool) objects.BatchSimpleObjects | |
GetShardQueueSize(ctx context.Context, hostName, indexName, shardName string) (int64, error) | |
GetShardStatus(ctx context.Context, hostName, indexName, shardName string) (string, error) | |
UpdateShardStatus(ctx context.Context, hostName, indexName, shardName, | |
targetStatus string) error | |
PutFile(ctx context.Context, hostName, indexName, shardName, fileName string, | |
payload io.ReadSeekCloser) error | |
} | |
func (ri *RemoteIndex) PutObject(ctx context.Context, shardName string, | |
obj *storobj.Object, | |
) error { | |
owner, err := ri.stateGetter.ShardOwner(ri.class, shardName) | |
if err != nil { | |
return fmt.Errorf("class %s has no physical shard %q: %w", ri.class, shardName, err) | |
} | |
host, ok := ri.nodeResolver.NodeHostname(owner) | |
if !ok { | |
return errors.Errorf("resolve node name %q to host", owner) | |
} | |
return ri.client.PutObject(ctx, host, ri.class, shardName, obj) | |
} | |
// helper for single errors that affect the entire batch, assign the error to | |
// every single item in the batch | |
func duplicateErr(in error, count int) []error { | |
out := make([]error, count) | |
for i := range out { | |
out[i] = in | |
} | |
return out | |
} | |
func (ri *RemoteIndex) BatchPutObjects(ctx context.Context, shardName string, | |
objs []*storobj.Object, | |
) []error { | |
owner, err := ri.stateGetter.ShardOwner(ri.class, shardName) | |
if err != nil { | |
return duplicateErr(fmt.Errorf("class %s has no physical shard %q: %w", | |
ri.class, shardName, err), len(objs)) | |
} | |
host, ok := ri.nodeResolver.NodeHostname(owner) | |
if !ok { | |
return duplicateErr(fmt.Errorf("resolve node name %q to host", | |
owner), len(objs)) | |
} | |
return ri.client.BatchPutObjects(ctx, host, ri.class, shardName, objs, nil) | |
} | |
func (ri *RemoteIndex) BatchAddReferences(ctx context.Context, shardName string, | |
refs objects.BatchReferences, | |
) []error { | |
owner, err := ri.stateGetter.ShardOwner(ri.class, shardName) | |
if err != nil { | |
return duplicateErr(fmt.Errorf("class %s has no physical shard %q: %w", | |
ri.class, shardName, err), len(refs)) | |
} | |
host, ok := ri.nodeResolver.NodeHostname(owner) | |
if !ok { | |
return duplicateErr(fmt.Errorf("resolve node name %q to host", | |
owner), len(refs)) | |
} | |
return ri.client.BatchAddReferences(ctx, host, ri.class, shardName, refs) | |
} | |
func (ri *RemoteIndex) Exists(ctx context.Context, shardName string, | |
id strfmt.UUID, | |
) (bool, error) { | |
owner, err := ri.stateGetter.ShardOwner(ri.class, shardName) | |
if err != nil { | |
return false, fmt.Errorf("class %s has no physical shard %q: %w", ri.class, shardName, err) | |
} | |
host, ok := ri.nodeResolver.NodeHostname(owner) | |
if !ok { | |
return false, errors.Errorf("resolve node name %q to host", owner) | |
} | |
return ri.client.Exists(ctx, host, ri.class, shardName, id) | |
} | |
func (ri *RemoteIndex) DeleteObject(ctx context.Context, shardName string, | |
id strfmt.UUID, | |
) error { | |
owner, err := ri.stateGetter.ShardOwner(ri.class, shardName) | |
if err != nil { | |
return fmt.Errorf("class %s has no physical shard %q: %w", ri.class, shardName, err) | |
} | |
host, ok := ri.nodeResolver.NodeHostname(owner) | |
if !ok { | |
return errors.Errorf("resolve node name %q to host", owner) | |
} | |
return ri.client.DeleteObject(ctx, host, ri.class, shardName, id) | |
} | |
func (ri *RemoteIndex) MergeObject(ctx context.Context, shardName string, | |
mergeDoc objects.MergeDocument, | |
) error { | |
owner, err := ri.stateGetter.ShardOwner(ri.class, shardName) | |
if err != nil { | |
return fmt.Errorf("class %s has no physical shard %q: %w", ri.class, shardName, err) | |
} | |
host, ok := ri.nodeResolver.NodeHostname(owner) | |
if !ok { | |
return errors.Errorf("resolve node name %q to host", owner) | |
} | |
return ri.client.MergeObject(ctx, host, ri.class, shardName, mergeDoc) | |
} | |
func (ri *RemoteIndex) GetObject(ctx context.Context, shardName string, | |
id strfmt.UUID, props search.SelectProperties, | |
additional additional.Properties, | |
) (*storobj.Object, error) { | |
owner, err := ri.stateGetter.ShardOwner(ri.class, shardName) | |
if err != nil { | |
return nil, fmt.Errorf("class %s has no physical shard %q: %w", ri.class, shardName, err) | |
} | |
host, ok := ri.nodeResolver.NodeHostname(owner) | |
if !ok { | |
return nil, errors.Errorf("resolve node name %q to host", owner) | |
} | |
return ri.client.GetObject(ctx, host, ri.class, shardName, id, props, additional) | |
} | |
func (ri *RemoteIndex) MultiGetObjects(ctx context.Context, shardName string, | |
ids []strfmt.UUID, | |
) ([]*storobj.Object, error) { | |
owner, err := ri.stateGetter.ShardOwner(ri.class, shardName) | |
if err != nil { | |
return nil, fmt.Errorf("class %s has no physical shard %q: %w", ri.class, shardName, err) | |
} | |
host, ok := ri.nodeResolver.NodeHostname(owner) | |
if !ok { | |
return nil, errors.Errorf("resolve node name %q to host", owner) | |
} | |
return ri.client.MultiGetObjects(ctx, host, ri.class, shardName, ids) | |
} | |
func (ri *RemoteIndex) SearchShard(ctx context.Context, shard string, | |
queryVec []float32, | |
limit int, | |
filters *filters.LocalFilter, | |
keywordRanking *searchparams.KeywordRanking, | |
sort []filters.Sort, | |
cursor *filters.Cursor, | |
groupBy *searchparams.GroupBy, | |
adds additional.Properties, | |
replEnabled bool, | |
) ([]*storobj.Object, []float32, string, error) { | |
type pair struct { | |
first []*storobj.Object | |
second []float32 | |
} | |
f := func(node, host string) (interface{}, error) { | |
objs, scores, err := ri.client.SearchShard(ctx, host, ri.class, shard, | |
queryVec, limit, filters, keywordRanking, sort, cursor, groupBy, adds) | |
if err != nil { | |
return nil, err | |
} | |
return pair{objs, scores}, err | |
} | |
rr, node, err := ri.queryReplicas(ctx, shard, f) | |
if err != nil { | |
return nil, nil, node, err | |
} | |
r := rr.(pair) | |
return r.first, r.second, node, err | |
} | |
func (ri *RemoteIndex) Aggregate( | |
ctx context.Context, | |
shard string, | |
params aggregation.Params, | |
) (*aggregation.Result, error) { | |
f := func(_, host string) (interface{}, error) { | |
r, err := ri.client.Aggregate(ctx, host, ri.class, shard, params) | |
if err != nil { | |
return nil, err | |
} | |
return r, nil | |
} | |
rr, _, err := ri.queryReplicas(ctx, shard, f) | |
if err != nil { | |
return nil, err | |
} | |
return rr.(*aggregation.Result), err | |
} | |
func (ri *RemoteIndex) FindUUIDs(ctx context.Context, shardName string, | |
filters *filters.LocalFilter, | |
) ([]strfmt.UUID, error) { | |
owner, err := ri.stateGetter.ShardOwner(ri.class, shardName) | |
if err != nil { | |
return nil, fmt.Errorf("class %s has no physical shard %q: %w", ri.class, shardName, err) | |
} | |
host, ok := ri.nodeResolver.NodeHostname(owner) | |
if !ok { | |
return nil, errors.Errorf("resolve node name %q to host", owner) | |
} | |
return ri.client.FindUUIDs(ctx, host, ri.class, shardName, filters) | |
} | |
func (ri *RemoteIndex) DeleteObjectBatch(ctx context.Context, shardName string, | |
uuids []strfmt.UUID, dryRun bool, | |
) objects.BatchSimpleObjects { | |
owner, err := ri.stateGetter.ShardOwner(ri.class, shardName) | |
if err != nil { | |
err := fmt.Errorf("class %s has no physical shard %q: %w", ri.class, shardName, err) | |
return objects.BatchSimpleObjects{objects.BatchSimpleObject{Err: err}} | |
} | |
host, ok := ri.nodeResolver.NodeHostname(owner) | |
if !ok { | |
err := fmt.Errorf("resolve node name %q to host", owner) | |
return objects.BatchSimpleObjects{objects.BatchSimpleObject{Err: err}} | |
} | |
return ri.client.DeleteObjectBatch(ctx, host, ri.class, shardName, uuids, dryRun) | |
} | |
func (ri *RemoteIndex) GetShardQueueSize(ctx context.Context, shardName string) (int64, error) { | |
owner, err := ri.stateGetter.ShardOwner(ri.class, shardName) | |
if err != nil { | |
return 0, fmt.Errorf("class %s has no physical shard %q: %w", ri.class, shardName, err) | |
} | |
host, ok := ri.nodeResolver.NodeHostname(owner) | |
if !ok { | |
return 0, errors.Errorf("resolve node name %q to host", owner) | |
} | |
return ri.client.GetShardQueueSize(ctx, host, ri.class, shardName) | |
} | |
func (ri *RemoteIndex) GetShardStatus(ctx context.Context, shardName string) (string, error) { | |
owner, err := ri.stateGetter.ShardOwner(ri.class, shardName) | |
if err != nil { | |
return "", fmt.Errorf("class %s has no physical shard %q: %w", ri.class, shardName, err) | |
} | |
host, ok := ri.nodeResolver.NodeHostname(owner) | |
if !ok { | |
return "", errors.Errorf("resolve node name %q to host", owner) | |
} | |
return ri.client.GetShardStatus(ctx, host, ri.class, shardName) | |
} | |
func (ri *RemoteIndex) UpdateShardStatus(ctx context.Context, shardName, targetStatus string) error { | |
owner, err := ri.stateGetter.ShardOwner(ri.class, shardName) | |
if err != nil { | |
return fmt.Errorf("class %s has no physical shard %q: %w", ri.class, shardName, err) | |
} | |
host, ok := ri.nodeResolver.NodeHostname(owner) | |
if !ok { | |
return errors.Errorf("resolve node name %q to host", owner) | |
} | |
return ri.client.UpdateShardStatus(ctx, host, ri.class, shardName, targetStatus) | |
} | |
func (ri *RemoteIndex) queryReplicas( | |
ctx context.Context, | |
shard string, | |
do func(nodeName, host string) (interface{}, error), | |
) (resp interface{}, node string, err error) { | |
replicas, err := ri.stateGetter.ShardReplicas(ri.class, shard) | |
if err != nil || len(replicas) == 0 { | |
return nil, | |
"", | |
fmt.Errorf("class %q has no physical shard %q: %w", ri.class, shard, err) | |
} | |
queryOne := func(replica string) (interface{}, error) { | |
host, ok := ri.nodeResolver.NodeHostname(replica) | |
if !ok || host == "" { | |
return nil, errors.Errorf("resolve node name %q to host", replica) | |
} | |
return do(replica, host) | |
} | |
queryUntil := func(replicas []string) (resp interface{}, node string, err error) { | |
for _, node = range replicas { | |
if errC := ctx.Err(); errC != nil { | |
return nil, node, errC | |
} | |
if resp, err = queryOne(node); err == nil { | |
return resp, node, nil | |
} | |
} | |
return | |
} | |
first := rand.Intn(len(replicas)) | |
if resp, node, err = queryUntil(replicas[first:]); err != nil && first != 0 { | |
return queryUntil(replicas[:first]) | |
} | |
return | |
} | |