Spaces:
Running
Running
// _ _ | |
// __ _____ __ ___ ___ __ _| |_ ___ | |
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
// \ V V / __/ (_| |\ V /| | (_| | || __/ | |
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
// | |
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. | |
// | |
// CONTACT: [email protected] | |
// | |
package replica | |
import ( | |
"context" | |
"fmt" | |
"github.com/go-openapi/strfmt" | |
"github.com/stretchr/testify/mock" | |
"github.com/weaviate/weaviate/entities/additional" | |
"github.com/weaviate/weaviate/entities/search" | |
"github.com/weaviate/weaviate/entities/storobj" | |
"github.com/weaviate/weaviate/usecases/objects" | |
) | |
type fakeRClient struct { | |
mock.Mock | |
} | |
func (f *fakeRClient) FetchObject(ctx context.Context, host, index, shard string, | |
id strfmt.UUID, props search.SelectProperties, | |
additional additional.Properties, | |
) (objects.Replica, error) { | |
args := f.Called(ctx, host, index, shard, id, props, additional) | |
return args.Get(0).(objects.Replica), args.Error(1) | |
} | |
func (f *fakeRClient) FetchObjects(ctx context.Context, host, index, | |
shard string, ids []strfmt.UUID, | |
) ([]objects.Replica, error) { | |
args := f.Called(ctx, host, index, shard, ids) | |
return args.Get(0).([]objects.Replica), args.Error(1) | |
} | |
func (f *fakeRClient) OverwriteObjects(ctx context.Context, host, index, shard string, | |
xs []*objects.VObject, | |
) ([]RepairResponse, error) { | |
args := f.Called(ctx, host, index, shard, xs) | |
return args.Get(0).([]RepairResponse), args.Error(1) | |
} | |
func (f *fakeRClient) DigestObjects(ctx context.Context, host, index, shard string, | |
ids []strfmt.UUID, | |
) ([]RepairResponse, error) { | |
args := f.Called(ctx, host, index, shard, ids) | |
return args.Get(0).([]RepairResponse), args.Error(1) | |
} | |
type fakeClient struct { | |
mock.Mock | |
} | |
func (f *fakeClient) PutObject(ctx context.Context, host, index, shard, requestID string, | |
obj *storobj.Object, | |
) (SimpleResponse, error) { | |
args := f.Called(ctx, host, index, shard, requestID, obj) | |
return args.Get(0).(SimpleResponse), args.Error(1) | |
} | |
func (f *fakeClient) DeleteObject(ctx context.Context, host, index, shard, requestID string, | |
id strfmt.UUID, | |
) (SimpleResponse, error) { | |
args := f.Called(ctx, host, index, shard, requestID, id) | |
return args.Get(0).(SimpleResponse), args.Error(1) | |
} | |
func (f *fakeClient) MergeObject(ctx context.Context, host, index, shard, requestID string, | |
doc *objects.MergeDocument, | |
) (SimpleResponse, error) { | |
args := f.Called(ctx, host, index, shard, requestID, doc) | |
return args.Get(0).(SimpleResponse), args.Error(1) | |
} | |
func (f *fakeClient) PutObjects(ctx context.Context, host, index, shard, requestID string, | |
objs []*storobj.Object, | |
) (SimpleResponse, error) { | |
args := f.Called(ctx, host, index, shard, requestID, objs) | |
return args.Get(0).(SimpleResponse), args.Error(1) | |
} | |
func (f *fakeClient) DeleteObjects(ctx context.Context, host, index, shard, requestID string, | |
uuids []strfmt.UUID, dryRun bool, | |
) (SimpleResponse, error) { | |
args := f.Called(ctx, host, index, shard, requestID, uuids, dryRun) | |
return args.Get(0).(SimpleResponse), args.Error(1) | |
} | |
func (f *fakeClient) AddReferences(ctx context.Context, host, index, shard, requestID string, | |
refs []objects.BatchReference, | |
) (SimpleResponse, error) { | |
args := f.Called(ctx, host, index, shard, requestID, refs) | |
return args.Get(0).(SimpleResponse), args.Error(1) | |
} | |
func (f *fakeClient) Commit(ctx context.Context, host, index, shard, requestID string, resp interface{}) error { | |
args := f.Called(ctx, host, index, shard, requestID, resp) | |
return args.Error(0) | |
} | |
func (f *fakeClient) Abort(ctx context.Context, host, index, shard, requestID string) (SimpleResponse, error) { | |
args := f.Called(ctx, host, index, shard, requestID) | |
return args.Get(0).(SimpleResponse), args.Error(1) | |
} | |
// Replica finder | |
type fakeShardingState struct { | |
thisNode string | |
ShardToReplicas map[string][]string | |
nodeResolver *fakeNodeResolver | |
} | |
func newFakeShardingState(thisNode string, shardToReplicas map[string][]string, resolver *fakeNodeResolver) *fakeShardingState { | |
return &fakeShardingState{ | |
thisNode: thisNode, | |
ShardToReplicas: shardToReplicas, | |
nodeResolver: resolver, | |
} | |
} | |
func (f *fakeShardingState) NodeName() string { | |
return f.thisNode | |
} | |
func (f *fakeShardingState) ResolveParentNodes(_ string, shard string) (map[string]string, error) { | |
replicas, ok := f.ShardToReplicas[shard] | |
if !ok { | |
return nil, fmt.Errorf("sharding state not found") | |
} | |
m := make(map[string]string) | |
for _, name := range replicas { | |
addr, _ := f.nodeResolver.NodeHostname(name) | |
m[name] = addr | |
} | |
return m, nil | |
} | |
// node resolver | |
type fakeNodeResolver struct { | |
hosts map[string]string | |
} | |
func (r *fakeNodeResolver) NodeHostname(nodeName string) (string, bool) { | |
return r.hosts[nodeName], true | |
} | |
func newFakeNodeResolver(nodes []string) *fakeNodeResolver { | |
hosts := make(map[string]string) | |
for _, node := range nodes { | |
hosts[node] = node | |
} | |
return &fakeNodeResolver{hosts: hosts} | |
} | |