Spaces:
Running
Running
// _ _ | |
// __ _____ __ ___ ___ __ _| |_ ___ | |
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
// \ V V / __/ (_| |\ V /| | (_| | || __/ | |
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
// | |
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. | |
// | |
// CONTACT: [email protected] | |
// | |
package replica | |
import ( | |
"context" | |
"testing" | |
"github.com/go-openapi/strfmt" | |
"github.com/stretchr/testify/assert" | |
"github.com/weaviate/weaviate/entities/additional" | |
"github.com/weaviate/weaviate/entities/models" | |
"github.com/weaviate/weaviate/entities/search" | |
"github.com/weaviate/weaviate/entities/storobj" | |
"github.com/weaviate/weaviate/usecases/objects" | |
) | |
func object(id strfmt.UUID, lastTime int64) *storobj.Object { | |
return &storobj.Object{ | |
Object: models.Object{ | |
ID: id, | |
LastUpdateTimeUnix: lastTime, | |
}, | |
} | |
} | |
func replica(id strfmt.UUID, lastTime int64, deleted bool) objects.Replica { | |
x := objects.Replica{ | |
Deleted: deleted, | |
Object: &storobj.Object{ | |
Object: models.Object{ | |
ID: id, | |
LastUpdateTimeUnix: lastTime, | |
}, | |
}, | |
} | |
if !x.Deleted { | |
x.ID = id | |
} | |
return x | |
} | |
func TestFinderReplicaNotFound(t *testing.T) { | |
var ( | |
f = newFakeFactory("C1", "S", []string{}) | |
ctx = context.Background() | |
finder = f.newFinder("A") | |
) | |
_, err := finder.GetOne(ctx, "ONE", "S", "id", nil, additional.Properties{}) | |
assert.ErrorIs(t, err, errReplicas) | |
f.assertLogErrorContains(t, errNoReplicaFound.Error()) | |
_, err = finder.Exists(ctx, "ONE", "S", "id") | |
assert.ErrorIs(t, err, errReplicas) | |
f.assertLogErrorContains(t, errNoReplicaFound.Error()) | |
finder.CheckConsistency(ctx, All, []*storobj.Object{objectEx("1", 1, "S", "N")}) | |
f.assertLogErrorContains(t, errReplicas.Error()) | |
} | |
func TestFinderNodeObject(t *testing.T) { | |
var ( | |
id = strfmt.UUID("123") | |
cls = "C1" | |
shard = "SH1" | |
nodes = []string{"A", "B", "C"} | |
ctx = context.Background() | |
r = objects.Replica{ID: id, Object: object(id, 3)} | |
adds = additional.Properties{} | |
proj = search.SelectProperties{} | |
) | |
t.Run("Unresolved", func(t *testing.T) { | |
f := newFakeFactory("C1", shard, nodes) | |
finder := f.newFinder("A") | |
_, err := finder.NodeObject(ctx, "N", "S", "id", nil, additional.Properties{}) | |
assert.Contains(t, err.Error(), "N") | |
}) | |
t.Run("Success", func(t *testing.T) { | |
f := newFakeFactory("C1", shard, nodes) | |
finder := f.newFinder("A") | |
for _, n := range nodes { | |
f.RClient.On("FetchObject", anyVal, n, cls, shard, id, proj, adds).Return(r, nil) | |
} | |
got, err := finder.NodeObject(ctx, nodes[0], shard, id, proj, adds) | |
assert.Nil(t, err) | |
assert.Equal(t, r.Object, got) | |
}) | |
} | |
func TestFinderGetOneWithConsistencyLevelALL(t *testing.T) { | |
var ( | |
id = strfmt.UUID("123") | |
cls = "C1" | |
shard = "SH1" | |
nodes = []string{"A", "B", "C"} | |
ctx = context.Background() | |
adds = additional.Properties{} | |
proj = search.SelectProperties{} | |
nilObject *storobj.Object | |
emptyItem = objects.Replica{} | |
) | |
t.Run("AllButOne", func(t *testing.T) { | |
var ( | |
f = newFakeFactory("C1", shard, nodes) | |
finder = f.newFinder("A") | |
digestIDs = []strfmt.UUID{id} | |
item = objects.Replica{ID: id, Object: object(id, 3)} | |
digestR = []RepairResponse{{ID: id.String(), UpdateTime: 3}} | |
) | |
f.RClient.On("FetchObject", anyVal, nodes[0], cls, shard, id, proj, adds).Return(item, nil) | |
f.RClient.On("DigestObjects", anyVal, nodes[1], cls, shard, digestIDs).Return(digestR, errAny) | |
f.RClient.On("DigestObjects", anyVal, nodes[2], cls, shard, digestIDs).Return(digestR, nil) | |
got, err := finder.GetOne(ctx, All, shard, id, proj, adds) | |
assert.ErrorIs(t, err, errRead) | |
f.assertLogErrorContains(t, errAny.Error()) | |
assert.Equal(t, nilObject, got) | |
}) | |
t.Run("Success", func(t *testing.T) { | |
var ( | |
f = newFakeFactory("C1", shard, nodes) | |
finder = f.newFinder("A") | |
digestIDs = []strfmt.UUID{id} | |
item = objects.Replica{ID: id, Object: object(id, 3)} | |
digestR = []RepairResponse{{ID: id.String(), UpdateTime: 3}} | |
) | |
f.RClient.On("FetchObject", anyVal, nodes[0], cls, shard, id, proj, adds).Return(item, nil) | |
f.RClient.On("DigestObjects", anyVal, nodes[1], cls, shard, digestIDs).Return(digestR, nil) | |
f.RClient.On("DigestObjects", anyVal, nodes[2], cls, shard, digestIDs).Return(digestR, nil) | |
got, err := finder.GetOne(ctx, All, shard, id, proj, adds) | |
assert.Nil(t, err) | |
assert.Equal(t, item.Object, got) | |
}) | |
t.Run("NotFound", func(t *testing.T) { | |
var ( | |
f = newFakeFactory("C1", shard, nodes) | |
finder = f.newFinder("A") | |
digestIDs = []strfmt.UUID{id} | |
// obj = object(id, 3) | |
digestR = []RepairResponse{{ID: id.String(), UpdateTime: 0}} | |
) | |
f.RClient.On("FetchObject", anyVal, nodes[0], cls, shard, id, proj, adds).Return(emptyItem, nil) | |
f.RClient.On("DigestObjects", anyVal, nodes[1], cls, shard, digestIDs).Return(digestR, nil) | |
f.RClient.On("DigestObjects", anyVal, nodes[2], cls, shard, digestIDs).Return(digestR, nil) | |
got, err := finder.GetOne(ctx, All, shard, id, proj, adds) | |
assert.Nil(t, err) | |
assert.Equal(t, nilObject, got) | |
}) | |
} | |
func TestFinderGetOneWithConsistencyLevelQuorum(t *testing.T) { | |
var ( | |
id = strfmt.UUID("123") | |
cls = "C1" | |
shard = "SH1" | |
nodes = []string{"A", "B", "C"} | |
ctx = context.Background() | |
adds = additional.Properties{} | |
proj = search.SelectProperties{} | |
nilObject *storobj.Object | |
emptyItem = objects.Replica{} | |
) | |
t.Run("AllButOne", func(t *testing.T) { | |
var ( | |
f = newFakeFactory("C1", shard, nodes) | |
finder = f.newFinder("A") | |
digestIDs = []strfmt.UUID{id} | |
item = objects.Replica{ID: id, Object: object(id, 3)} | |
digestR = []RepairResponse{{ID: id.String(), UpdateTime: 3}} | |
) | |
f.RClient.On("FetchObject", anyVal, nodes[0], cls, shard, id, proj, adds).Return(item, nil) | |
f.RClient.On("DigestObjects", anyVal, nodes[1], cls, shard, digestIDs).Return(digestR, errAny) | |
f.RClient.On("DigestObjects", anyVal, nodes[2], cls, shard, digestIDs).Return(digestR, errAny) | |
got, err := finder.GetOne(ctx, Quorum, shard, id, proj, adds) | |
assert.ErrorIs(t, err, errRead) | |
f.assertLogErrorContains(t, errAny.Error()) | |
assert.Equal(t, nilObject, got) | |
}) | |
t.Run("Success", func(t *testing.T) { | |
var ( | |
f = newFakeFactory("C1", shard, nodes) | |
finder = f.newFinder("A") | |
digestIDs = []strfmt.UUID{id} | |
item = objects.Replica{ID: id, Object: object(id, 3)} | |
digestR = []RepairResponse{{ID: id.String(), UpdateTime: 3}} | |
) | |
f.RClient.On("FetchObject", anyVal, nodes[0], cls, shard, id, proj, adds).Return(item, nil) | |
f.RClient.On("DigestObjects", anyVal, nodes[1], cls, shard, digestIDs).Return(digestR, errAny) | |
f.RClient.On("DigestObjects", anyVal, nodes[2], cls, shard, digestIDs).Return(digestR, nil) | |
got, err := finder.GetOne(ctx, Quorum, shard, id, proj, adds) | |
assert.Nil(t, err) | |
assert.Equal(t, item.Object, got) | |
}) | |
t.Run("NotFound", func(t *testing.T) { | |
var ( | |
f = newFakeFactory("C1", shard, nodes) | |
finder = f.newFinder("A") | |
digestIDs = []strfmt.UUID{id} | |
// obj = object(id, 3) | |
digestR = []RepairResponse{{ID: id.String(), UpdateTime: 0}} | |
) | |
f.RClient.On("FetchObject", anyVal, nodes[0], cls, shard, id, proj, adds).Return(emptyItem, nil) | |
f.RClient.On("DigestObjects", anyVal, nodes[1], cls, shard, digestIDs).Return(digestR, nil) | |
f.RClient.On("DigestObjects", anyVal, nodes[2], cls, shard, digestIDs).Return(digestR, errAny) | |
got, err := finder.GetOne(ctx, Quorum, shard, id, proj, adds) | |
assert.Nil(t, err) | |
assert.Equal(t, nilObject, got) | |
}) | |
} | |
func TestFinderGetOneWithConsistencyLevelOne(t *testing.T) { | |
var ( | |
id = strfmt.UUID("123") | |
cls = "C1" | |
shard = "SH1" | |
nodes = []string{"A", "B", "C"} | |
ctx = context.Background() | |
adds = additional.Properties{} | |
proj = search.SelectProperties{} | |
nilObject *storobj.Object | |
emptyItem = objects.Replica{} | |
) | |
t.Run("None", func(t *testing.T) { | |
var ( | |
f = newFakeFactory("C1", shard, nodes) | |
finder = f.newFinder("A") | |
// obj = objects.Replica{ID: id, Object: object(id, 3) | |
) | |
for _, n := range nodes { | |
f.RClient.On("FetchObject", anyVal, n, cls, shard, id, proj, adds).Return(emptyItem, errAny) | |
} | |
got, err := finder.GetOne(ctx, One, shard, id, proj, adds) | |
assert.ErrorIs(t, err, errRead) | |
f.assertLogErrorContains(t, errAny.Error()) | |
assert.Equal(t, nilObject, got) | |
}) | |
t.Run("Success", func(t *testing.T) { | |
var ( | |
f = newFakeFactory("C1", shard, nodes) | |
finder = f.newFinder(nodes[2]) | |
item = objects.Replica{ID: id, Object: object(id, 3)} | |
) | |
f.RClient.On("FetchObject", anyVal, nodes[2], cls, shard, id, proj, adds).Return(item, nil) | |
got, err := finder.GetOne(ctx, One, shard, id, proj, adds) | |
assert.Nil(t, err) | |
assert.Equal(t, item.Object, got) | |
}) | |
t.Run("NotFound", func(t *testing.T) { | |
var ( | |
f = newFakeFactory("C1", shard, nodes) | |
finder = f.newFinder("A") | |
) | |
f.RClient.On("FetchObject", anyVal, nodes[0], cls, shard, id, proj, adds).Return(emptyItem, nil) | |
got, err := finder.GetOne(ctx, One, shard, id, proj, adds) | |
assert.Nil(t, err) | |
assert.Equal(t, nilObject, got) | |
}) | |
} | |
func TestFinderExistsWithConsistencyLevelALL(t *testing.T) { | |
var ( | |
id = strfmt.UUID("123") | |
cls = "C1" | |
shard = "SH1" | |
nodes = []string{"A", "B", "C"} | |
ctx = context.Background() | |
nilReply = []RepairResponse(nil) | |
) | |
t.Run("None", func(t *testing.T) { | |
var ( | |
f = newFakeFactory("C1", shard, nodes) | |
finder = f.newFinder("A") | |
digestIDs = []strfmt.UUID{id} | |
digestR = []RepairResponse{{ID: id.String(), UpdateTime: 3}} | |
) | |
f.RClient.On("DigestObjects", anyVal, nodes[0], cls, shard, digestIDs).Return(digestR, nil) | |
f.RClient.On("DigestObjects", anyVal, nodes[1], cls, shard, digestIDs).Return(nilReply, errAny) | |
f.RClient.On("DigestObjects", anyVal, nodes[2], cls, shard, digestIDs).Return(digestR, nil) | |
got, err := finder.Exists(ctx, All, shard, id) | |
assert.ErrorIs(t, err, errRead) | |
f.assertLogErrorContains(t, errAny.Error()) | |
assert.Equal(t, false, got) | |
}) | |
t.Run("Success", func(t *testing.T) { | |
var ( | |
f = newFakeFactory("C1", shard, nodes) | |
finder = f.newFinder("A") | |
digestIDs = []strfmt.UUID{id} | |
digestR = []RepairResponse{{ID: id.String(), UpdateTime: 3}} | |
) | |
f.RClient.On("DigestObjects", anyVal, nodes[0], cls, shard, digestIDs).Return(digestR, nil) | |
f.RClient.On("DigestObjects", anyVal, nodes[1], cls, shard, digestIDs).Return(digestR, nil) | |
f.RClient.On("DigestObjects", anyVal, nodes[2], cls, shard, digestIDs).Return(digestR, nil) | |
got, err := finder.Exists(ctx, All, shard, id) | |
assert.Nil(t, err) | |
assert.Equal(t, true, got) | |
}) | |
t.Run("NotFound", func(t *testing.T) { | |
var ( | |
f = newFakeFactory("C1", shard, nodes) | |
finder = f.newFinder("A") | |
digestIDs = []strfmt.UUID{id} | |
digestR = []RepairResponse{{ID: id.String(), UpdateTime: 0, Deleted: true}} | |
) | |
f.RClient.On("DigestObjects", anyVal, nodes[0], cls, shard, digestIDs).Return(digestR, nil) | |
f.RClient.On("DigestObjects", anyVal, nodes[1], cls, shard, digestIDs).Return(digestR, nil) | |
f.RClient.On("DigestObjects", anyVal, nodes[2], cls, shard, digestIDs).Return(digestR, nil) | |
got, err := finder.Exists(ctx, All, shard, id) | |
assert.Nil(t, err) | |
assert.Equal(t, false, got) | |
}) | |
} | |
func TestFinderExistsWithConsistencyLevelQuorum(t *testing.T) { | |
var ( | |
id = strfmt.UUID("123") | |
cls = "C1" | |
shard = "SH1" | |
nodes = []string{"A", "B", "C"} | |
ctx = context.Background() | |
nilReply = []RepairResponse(nil) | |
) | |
t.Run("AllButOne", func(t *testing.T) { | |
var ( | |
f = newFakeFactory("C1", shard, nodes) | |
finder = f.newFinder("A") | |
digestIDs = []strfmt.UUID{id} | |
digestR = []RepairResponse{{ID: id.String(), UpdateTime: 3}} | |
) | |
f.RClient.On("DigestObjects", anyVal, nodes[0], cls, shard, digestIDs).Return(digestR, nil) | |
f.RClient.On("DigestObjects", anyVal, nodes[1], cls, shard, digestIDs).Return(nilReply, errAny) | |
f.RClient.On("DigestObjects", anyVal, nodes[2], cls, shard, digestIDs).Return(digestR, errAny) | |
got, err := finder.Exists(ctx, Quorum, shard, id) | |
assert.ErrorIs(t, err, errRead) | |
f.assertLogErrorContains(t, errAny.Error()) | |
assert.Equal(t, false, got) | |
}) | |
t.Run("Success", func(t *testing.T) { | |
var ( | |
f = newFakeFactory("C1", shard, nodes) | |
finder = f.newFinder("A") | |
digestIDs = []strfmt.UUID{id} | |
digestR = []RepairResponse{{ID: id.String(), UpdateTime: 3}} | |
) | |
f.RClient.On("DigestObjects", anyVal, nodes[0], cls, shard, digestIDs).Return(digestR, nil) | |
f.RClient.On("DigestObjects", anyVal, nodes[1], cls, shard, digestIDs).Return(digestR, nil) | |
f.RClient.On("DigestObjects", anyVal, nodes[2], cls, shard, digestIDs).Return(digestR, errAny) | |
got, err := finder.Exists(ctx, Quorum, shard, id) | |
assert.Nil(t, err) | |
assert.Equal(t, true, got) | |
}) | |
t.Run("NotFound", func(t *testing.T) { | |
var ( | |
f = newFakeFactory("C1", shard, nodes) | |
finder = f.newFinder("A") | |
digestIDs = []strfmt.UUID{id} | |
digestR = []RepairResponse{{ID: id.String(), UpdateTime: 0, Deleted: true}} | |
) | |
f.RClient.On("DigestObjects", anyVal, nodes[0], cls, shard, digestIDs).Return(digestR, nil) | |
f.RClient.On("DigestObjects", anyVal, nodes[1], cls, shard, digestIDs).Return(digestR, nil) | |
f.RClient.On("DigestObjects", anyVal, nodes[2], cls, shard, digestIDs).Return(digestR, errAny) | |
got, err := finder.Exists(ctx, Quorum, shard, id) | |
assert.Nil(t, err) | |
assert.Equal(t, false, got) | |
}) | |
} | |
func TestFinderExistsWithConsistencyLevelOne(t *testing.T) { | |
var ( | |
id = strfmt.UUID("123") | |
cls = "C1" | |
shard = "SH1" | |
nodes = []string{"A", "B"} | |
ctx = context.Background() | |
) | |
t.Run("Success", func(t *testing.T) { | |
var ( | |
f = newFakeFactory("C1", shard, nodes) | |
finder = f.newFinder("A") | |
digestIDs = []strfmt.UUID{id} | |
digestR = []RepairResponse{{ID: id.String(), UpdateTime: 3}} | |
) | |
f.RClient.On("DigestObjects", anyVal, nodes[0], cls, shard, digestIDs).Return(digestR, errAny) | |
f.RClient.On("DigestObjects", anyVal, nodes[1], cls, shard, digestIDs).Return(digestR, nil) | |
got, err := finder.Exists(ctx, One, shard, id) | |
assert.Nil(t, err) | |
assert.Equal(t, true, got) | |
}) | |
t.Run("NotFound", func(t *testing.T) { | |
var ( | |
f = newFakeFactory("C1", shard, nodes) | |
finder = f.newFinder("A") | |
digestIDs = []strfmt.UUID{id} | |
digestR = []RepairResponse{{ID: id.String(), UpdateTime: 0, Deleted: true}} | |
) | |
f.RClient.On("DigestObjects", anyVal, nodes[0], cls, shard, digestIDs).Return(digestR, nil) | |
got, err := finder.Exists(ctx, One, shard, id) | |
assert.Nil(t, err) | |
assert.Equal(t, false, got) | |
}) | |
} | |
func TestFinderCheckConsistencyALL(t *testing.T) { | |
var ( | |
ids = []strfmt.UUID{"0", "1", "2", "3", "4", "5"} | |
cls = "C1" | |
shards = []string{"S1", "S2", "S3"} | |
nodes = []string{"A", "B", "C"} | |
ctx = context.Background() | |
) | |
t.Run("ExceptOne", func(t *testing.T) { | |
var ( | |
shard = shards[0] | |
f = newFakeFactory("C1", shard, nodes) | |
finder = f.newFinder("A") | |
xs, digestR = genInputs("A", shard, 1, ids) | |
) | |
f.RClient.On("DigestObjects", anyVal, nodes[1], cls, shard, ids).Return(digestR, nil) | |
f.RClient.On("DigestObjects", anyVal, nodes[2], cls, shard, ids).Return(digestR, errAny) | |
err := finder.CheckConsistency(ctx, All, xs) | |
want := setObjectsConsistency(xs, false) | |
assert.ErrorIs(t, err, errRead) | |
assert.ElementsMatch(t, want, xs) | |
f.assertLogErrorContains(t, errRead.Error()) | |
}) | |
t.Run("OneShard", func(t *testing.T) { | |
var ( | |
shard = shards[0] | |
f = newFakeFactory("C1", shard, nodes) | |
finder = f.newFinder("A") | |
xs, digestR = genInputs("A", shard, 2, ids) | |
) | |
f.RClient.On("DigestObjects", anyVal, nodes[1], cls, shard, ids).Return(digestR, nil) | |
f.RClient.On("DigestObjects", anyVal, nodes[2], cls, shard, ids).Return(digestR, nil) | |
want := setObjectsConsistency(xs, true) | |
err := finder.CheckConsistency(ctx, All, xs) | |
assert.Nil(t, err) | |
assert.ElementsMatch(t, want, xs) | |
}) | |
t.Run("TwoShards", func(t *testing.T) { | |
var ( | |
f = newFakeFactory("C1", shards[0], nodes) | |
finder = f.newFinder("A") | |
idSet1 = ids[:3] | |
idSet2 = ids[3:6] | |
xs1, digestR1 = genInputs("A", shards[0], 1, idSet1) | |
xs2, digestR2 = genInputs("B", shards[1], 2, idSet2) | |
) | |
xs := make([]*storobj.Object, 0, len(xs1)+len(xs2)) | |
for i := 0; i < 3; i++ { | |
xs = append(xs, xs1[i]) | |
xs = append(xs, xs2[i]) | |
} | |
// first shard | |
f.RClient.On("DigestObjects", anyVal, nodes[1], cls, shards[0], idSet1).Return(digestR1, nil) | |
f.RClient.On("DigestObjects", anyVal, nodes[2], cls, shards[0], idSet1).Return(digestR1, nil) | |
// second shard | |
f.AddShard(shards[1], nodes) | |
f.RClient.On("DigestObjects", anyVal, nodes[0], cls, shards[1], idSet2).Return(digestR2, nil) | |
f.RClient.On("DigestObjects", anyVal, nodes[2], cls, shards[1], idSet2).Return(digestR2, nil) | |
want := setObjectsConsistency(xs, true) | |
err := finder.CheckConsistency(ctx, All, xs) | |
assert.Nil(t, err) | |
assert.ElementsMatch(t, want, xs) | |
}) | |
t.Run("ThreeShard", func(t *testing.T) { | |
var ( | |
f = newFakeFactory("C1", shards[0], nodes) | |
finder = f.newFinder("A") | |
ids1 = ids[:2] | |
ids2 = ids[2:4] | |
ids3 = ids[4:] | |
xs1, digestR1 = genInputs("A", shards[0], 1, ids1) | |
xs2, digestR2 = genInputs("B", shards[1], 2, ids2) | |
xs3, digestR3 = genInputs("C", shards[2], 3, ids3) | |
) | |
xs := make([]*storobj.Object, 0, len(xs1)+len(xs2)) | |
for i := 0; i < 2; i++ { | |
xs = append(xs, xs1[i]) | |
xs = append(xs, xs2[i]) | |
xs = append(xs, xs3[i]) | |
} | |
// first shard | |
f.RClient.On("DigestObjects", anyVal, nodes[1], cls, shards[0], ids1).Return(digestR1, nil) | |
f.RClient.On("DigestObjects", anyVal, nodes[2], cls, shards[0], ids1).Return(digestR1, nil) | |
// second shard | |
f.AddShard(shards[1], nodes) | |
f.RClient.On("DigestObjects", anyVal, nodes[0], cls, shards[1], ids2).Return(digestR2, nil) | |
f.RClient.On("DigestObjects", anyVal, nodes[2], cls, shards[1], ids2).Return(digestR2, nil) | |
// third shard | |
f.AddShard(shards[2], nodes) | |
f.RClient.On("DigestObjects", anyVal, nodes[0], cls, shards[2], ids3).Return(digestR3, nil) | |
f.RClient.On("DigestObjects", anyVal, nodes[1], cls, shards[2], ids3).Return(digestR3, nil) | |
want := setObjectsConsistency(xs, true) | |
err := finder.CheckConsistency(ctx, All, xs) | |
assert.Nil(t, err) | |
assert.ElementsMatch(t, want, xs) | |
}) | |
t.Run("TwoShardSingleNode", func(t *testing.T) { | |
var ( | |
f = newFakeFactory("C1", shards[0], nodes) | |
finder = f.newFinder("A") | |
ids1 = ids[:2] | |
ids2 = ids[2:4] | |
ids3 = ids[4:] | |
xs1, digestR1 = genInputs("A", shards[0], 1, ids1) | |
xs2, digestR2 = genInputs("B", shards[1], 1, ids2) | |
xs3, digestR3 = genInputs("A", shards[2], 2, ids3) | |
) | |
xs := make([]*storobj.Object, 0, len(xs1)+len(xs2)) | |
for i := 0; i < 2; i++ { | |
xs = append(xs, xs1[i]) | |
xs = append(xs, xs2[i]) | |
xs = append(xs, xs3[i]) | |
} | |
// first shard | |
f.RClient.On("DigestObjects", anyVal, nodes[1], cls, shards[0], ids1).Return(digestR1, nil) | |
f.RClient.On("DigestObjects", anyVal, nodes[2], cls, shards[0], ids1).Return(digestR1, nil) | |
// second shard | |
f.AddShard(shards[1], nodes) | |
f.RClient.On("DigestObjects", anyVal, nodes[0], cls, shards[1], ids2).Return(digestR2, nil) | |
f.RClient.On("DigestObjects", anyVal, nodes[2], cls, shards[1], ids2).Return(digestR2, nil) | |
// third shard | |
f.AddShard(shards[2], nodes) | |
f.RClient.On("DigestObjects", anyVal, nodes[1], cls, shards[2], ids3).Return(digestR3, nil) | |
f.RClient.On("DigestObjects", anyVal, nodes[2], cls, shards[2], ids3).Return(digestR3, nil) | |
want := setObjectsConsistency(xs, true) | |
err := finder.CheckConsistency(ctx, All, xs) | |
assert.Nil(t, err) | |
assert.ElementsMatch(t, want, xs) | |
}) | |
} | |
func TestFinderCheckConsistencyQuorum(t *testing.T) { | |
var ( | |
ids = []strfmt.UUID{"10", "20", "30"} | |
cls = "C1" | |
shard = "SH1" | |
nodes = []string{"A", "B", "C"} | |
ctx = context.Background() | |
) | |
t.Run("MalformedInputs", func(t *testing.T) { | |
var ( | |
ids = []strfmt.UUID{"10", "20", "30"} | |
shard = "SH1" | |
nodes = []string{"A", "B", "C"} | |
ctx = context.Background() | |
f = newFakeFactory("C1", shard, nodes) | |
finder = f.newFinder("A") | |
xs1 = []*storobj.Object{ | |
objectEx(ids[0], 4, shard, "A"), | |
nil, | |
objectEx(ids[2], 6, shard, "A"), | |
} | |
// BelongToShard and BelongToNode are empty | |
xs2 = []*storobj.Object{ | |
objectEx(ids[0], 4, shard, "A"), | |
{Object: models.Object{ID: ids[1]}}, | |
objectEx(ids[2], 6, shard, "A"), | |
} | |
) | |
assert.Nil(t, finder.CheckConsistency(ctx, Quorum, nil)) | |
err := finder.CheckConsistency(ctx, Quorum, xs1) | |
assert.NotNil(t, err) | |
err = finder.CheckConsistency(ctx, Quorum, xs2) | |
assert.NotNil(t, err) | |
}) | |
t.Run("None", func(t *testing.T) { | |
var ( | |
f = newFakeFactory("C1", shard, nodes) | |
finder = f.newFinder("A") | |
xs = []*storobj.Object{ | |
objectEx(ids[0], 1, shard, "A"), | |
objectEx(ids[1], 2, shard, "A"), | |
objectEx(ids[2], 3, shard, "A"), | |
} | |
digestR = []RepairResponse{ | |
{ID: ids[0].String(), UpdateTime: 1}, | |
{ID: ids[1].String(), UpdateTime: 2}, | |
{ID: ids[2].String(), UpdateTime: 3}, | |
} | |
) | |
f.RClient.On("DigestObjects", anyVal, nodes[1], cls, shard, ids).Return(digestR, errAny) | |
f.RClient.On("DigestObjects", anyVal, nodes[2], cls, shard, ids).Return(digestR, errAny) | |
err := finder.CheckConsistency(ctx, All, xs) | |
want := setObjectsConsistency(xs, false) | |
assert.ErrorIs(t, err, errRead) | |
assert.ElementsMatch(t, want, xs) | |
f.assertLogErrorContains(t, errRead.Error()) | |
}) | |
t.Run("Success", func(t *testing.T) { | |
var ( | |
f = newFakeFactory("C1", shard, nodes) | |
finder = f.newFinder("A") | |
xs = []*storobj.Object{ | |
objectEx(ids[0], 1, shard, "A"), | |
objectEx(ids[1], 2, shard, "A"), | |
objectEx(ids[2], 3, shard, "A"), | |
} | |
digestR = []RepairResponse{ | |
{ID: ids[0].String(), UpdateTime: 1}, | |
{ID: ids[1].String(), UpdateTime: 2}, | |
{ID: ids[2].String(), UpdateTime: 3}, | |
} | |
want = setObjectsConsistency(xs, true) | |
) | |
f.RClient.On("DigestObjects", anyVal, nodes[1], cls, shard, ids).Return(digestR, nil) | |
f.RClient.On("DigestObjects", anyVal, nodes[2], cls, shard, ids).Return(digestR, errAny) | |
err := finder.CheckConsistency(ctx, Quorum, xs) | |
assert.Nil(t, err) | |
assert.ElementsMatch(t, want, xs) | |
}) | |
} | |
func TestFinderCheckConsistencyOne(t *testing.T) { | |
var ( | |
ids = []strfmt.UUID{"10", "20", "30"} | |
shard = "SH1" | |
nodes = []string{"A", "B", "C"} | |
ctx = context.Background() | |
f = newFakeFactory("C1", shard, nodes) | |
finder = f.newFinder("A") | |
xs = []*storobj.Object{ | |
objectEx(ids[0], 4, shard, "A"), | |
objectEx(ids[1], 5, shard, "A"), | |
objectEx(ids[2], 6, shard, "A"), | |
} | |
want = setObjectsConsistency(xs, true) | |
) | |
err := finder.CheckConsistency(ctx, One, xs) | |
assert.Nil(t, err) | |
assert.Equal(t, want, xs) | |
} | |