Spaces:
Running
Running
// _ _ | |
// __ _____ __ ___ ___ __ _| |_ ___ | |
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
// \ V V / __/ (_| |\ V /| | (_| | || __/ | |
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
// | |
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. | |
// | |
// CONTACT: [email protected] | |
// | |
package replica | |
import ( | |
"context" | |
"errors" | |
"testing" | |
"time" | |
"github.com/go-openapi/strfmt" | |
"github.com/sirupsen/logrus" | |
"github.com/sirupsen/logrus/hooks/test" | |
"github.com/stretchr/testify/assert" | |
"github.com/stretchr/testify/mock" | |
"github.com/weaviate/weaviate/entities/storobj" | |
"github.com/weaviate/weaviate/usecases/objects" | |
) | |
var ( | |
anyVal = mock.Anything | |
errAny = errors.New("any error") | |
) | |
func TestReplicatorReplicaNotFound(t *testing.T) { | |
ctx := context.Background() | |
t.Run("PutObject", func(t *testing.T) { | |
f := newFakeFactory("C1", "S", []string{}) | |
rep := f.newReplicator() | |
err := rep.PutObject(ctx, "S", nil, All) | |
assert.ErrorIs(t, err, errReplicas) | |
f.assertLogErrorContains(t, errNoReplicaFound.Error()) | |
}) | |
t.Run("MergeObject", func(t *testing.T) { | |
f := newFakeFactory("C1", "S", []string{}) | |
rep := f.newReplicator() | |
err := rep.MergeObject(ctx, "S", nil, All) | |
assert.ErrorIs(t, err, errReplicas) | |
f.assertLogErrorContains(t, errNoReplicaFound.Error()) | |
}) | |
t.Run("DeleteObject", func(t *testing.T) { | |
f := newFakeFactory("C1", "S", []string{}) | |
rep := f.newReplicator() | |
err := rep.DeleteObject(ctx, "S", "id", All) | |
assert.ErrorIs(t, err, errReplicas) | |
f.assertLogErrorContains(t, errNoReplicaFound.Error()) | |
}) | |
t.Run("PutObjects", func(t *testing.T) { | |
f := newFakeFactory("C1", "S", []string{}) | |
rep := f.newReplicator() | |
errs := rep.PutObjects(ctx, "S", []*storobj.Object{{}, {}}, All) | |
assert.Equal(t, 2, len(errs)) | |
for _, err := range errs { | |
assert.ErrorIs(t, err, errReplicas) | |
} | |
f.assertLogErrorContains(t, errNoReplicaFound.Error()) | |
}) | |
t.Run("DeleteObjects", func(t *testing.T) { | |
f := newFakeFactory("C1", "S", []string{}) | |
rep := f.newReplicator() | |
xs := rep.DeleteObjects(ctx, "S", []strfmt.UUID{strfmt.UUID("1"), strfmt.UUID("2"), strfmt.UUID("3")}, false, All) | |
assert.Equal(t, 3, len(xs)) | |
for _, x := range xs { | |
assert.ErrorIs(t, x.Err, errReplicas) | |
} | |
f.assertLogErrorContains(t, errNoReplicaFound.Error()) | |
}) | |
t.Run("AddReferences", func(t *testing.T) { | |
f := newFakeFactory("C1", "S", []string{}) | |
rep := f.newReplicator() | |
errs := rep.AddReferences(ctx, "S", []objects.BatchReference{{}, {}}, All) | |
assert.Equal(t, 2, len(errs)) | |
for _, err := range errs { | |
assert.ErrorIs(t, err, errReplicas) | |
} | |
f.assertLogErrorContains(t, errNoReplicaFound.Error()) | |
}) | |
} | |
func TestReplicatorPutObject(t *testing.T) { | |
var ( | |
cls = "C1" | |
shard = "SH1" | |
nodes = []string{"A", "B"} | |
ctx = context.Background() | |
obj = &storobj.Object{} | |
) | |
t.Run("SuccessWithConsistencyLevelAll", func(t *testing.T) { | |
nodes := []string{"A", "B", "C"} | |
f := newFakeFactory("C1", shard, nodes) | |
rep := f.newReplicator() | |
resp := SimpleResponse{} | |
for _, n := range nodes { | |
f.WClient.On("PutObject", ctx, n, cls, shard, anyVal, obj).Return(resp, nil) | |
f.WClient.On("Commit", ctx, n, "C1", shard, anyVal, anyVal).Return(nil) | |
} | |
err := rep.PutObject(ctx, shard, obj, All) | |
assert.Nil(t, err) | |
}) | |
t.Run("SuccessWithConsistencyLevelOne", func(t *testing.T) { | |
nodes := []string{"A", "B", "C"} | |
f := newFakeFactory("C1", shard, nodes) | |
rep := f.newReplicator() | |
resp := SimpleResponse{} | |
f.WClient.On("PutObject", ctx, "A", cls, shard, anyVal, obj).Return(resp, errAny).After(time.Second * 10) | |
f.WClient.On("Abort", ctx, "A", cls, shard, anyVal).Return(resp, nil) | |
f.WClient.On("PutObject", ctx, "B", cls, shard, anyVal, obj).Return(resp, nil) | |
f.WClient.On("Commit", ctx, "B", cls, shard, anyVal, anyVal).Return(errAny) | |
f.WClient.On("PutObject", ctx, "C", cls, shard, anyVal, obj).Return(resp, nil) | |
f.WClient.On("Commit", ctx, "C", cls, shard, anyVal, anyVal).Return(nil) | |
err := rep.PutObject(ctx, shard, obj, One) | |
assert.Nil(t, err) | |
}) | |
t.Run("SuccessWithConsistencyLevelQuorum", func(t *testing.T) { | |
nodes := []string{"A", "B", "C"} | |
f := newFakeFactory("C1", shard, nodes) | |
rep := f.newReplicator() | |
resp := SimpleResponse{} | |
for _, n := range nodes[:2] { | |
f.WClient.On("PutObject", ctx, n, cls, shard, anyVal, obj).Return(resp, nil) | |
f.WClient.On("Commit", ctx, n, "C1", shard, anyVal, anyVal).Return(nil) | |
} | |
f.WClient.On("PutObject", ctx, "C", cls, shard, anyVal, obj).Return(resp, nil) | |
f.WClient.On("Commit", ctx, "C", cls, shard, anyVal, anyVal).Return(nil).RunFn = func(a mock.Arguments) { | |
resp := a[5].(*SimpleResponse) | |
*resp = SimpleResponse{Errors: []Error{{Msg: "e3"}}} | |
} | |
err := rep.PutObject(ctx, shard, obj, Quorum) | |
assert.Nil(t, err) | |
}) | |
t.Run("PhaseOneConnectionError", func(t *testing.T) { | |
f := newFakeFactory("C1", shard, nodes) | |
rep := f.newReplicator() | |
resp := SimpleResponse{} | |
f.WClient.On("PutObject", ctx, nodes[0], cls, shard, anyVal, obj).Return(resp, nil) | |
f.WClient.On("PutObject", ctx, nodes[1], cls, shard, anyVal, obj).Return(resp, errAny) | |
f.WClient.On("Abort", ctx, nodes[0], "C1", shard, anyVal).Return(resp, nil) | |
f.WClient.On("Abort", ctx, nodes[1], "C1", shard, anyVal).Return(resp, nil) | |
err := rep.PutObject(ctx, shard, obj, All) | |
assert.ErrorIs(t, err, errReplicas) | |
}) | |
t.Run("PhaseOneUnsuccessfulResponse", func(t *testing.T) { | |
f := newFakeFactory("C1", shard, nodes) | |
rep := f.newReplicator() | |
resp := SimpleResponse{} | |
f.WClient.On("PutObject", ctx, nodes[0], cls, shard, anyVal, obj).Return(resp, nil) | |
resp2 := SimpleResponse{[]Error{{Err: errAny}}} | |
f.WClient.On("PutObject", ctx, nodes[1], cls, shard, anyVal, obj).Return(resp2, nil) | |
f.WClient.On("Abort", ctx, nodes[0], "C1", shard, anyVal).Return(resp, nil) | |
f.WClient.On("Abort", ctx, nodes[1], "C1", shard, anyVal).Return(resp, nil) | |
err := rep.PutObject(ctx, shard, obj, All) | |
assert.ErrorIs(t, err, errReplicas) | |
}) | |
t.Run("Commit", func(t *testing.T) { | |
f := newFakeFactory("C1", shard, nodes) | |
rep := f.newReplicator() | |
resp := SimpleResponse{} | |
for _, n := range nodes { | |
f.WClient.On("PutObject", ctx, n, cls, shard, anyVal, obj).Return(resp, nil) | |
} | |
f.WClient.On("Commit", ctx, nodes[0], "C1", shard, anyVal, anyVal).Return(nil) | |
f.WClient.On("Commit", ctx, nodes[1], "C1", shard, anyVal, anyVal).Return(errAny) | |
err := rep.PutObject(ctx, shard, obj, All) | |
assert.ErrorIs(t, err, errAny) | |
}) | |
} | |
func TestReplicatorMergeObject(t *testing.T) { | |
var ( | |
cls = "C1" | |
shard = "SH1" | |
nodes = []string{"A", "B"} | |
ctx = context.Background() | |
merge = &objects.MergeDocument{} | |
) | |
t.Run("Success", func(t *testing.T) { | |
f := newFakeFactory("C1", shard, nodes) | |
rep := f.newReplicator() | |
resp := SimpleResponse{} | |
for _, n := range nodes { | |
f.WClient.On("MergeObject", ctx, n, cls, shard, anyVal, merge).Return(resp, nil) | |
f.WClient.On("Commit", ctx, n, cls, shard, anyVal, anyVal).Return(nil) | |
} | |
err := rep.MergeObject(ctx, shard, merge, All) | |
assert.Nil(t, err) | |
}) | |
t.Run("PhaseOneConnectionError", func(t *testing.T) { | |
f := newFakeFactory("C1", shard, nodes) | |
rep := f.newReplicator() | |
resp := SimpleResponse{} | |
f.WClient.On("MergeObject", ctx, nodes[0], cls, shard, anyVal, merge).Return(resp, nil) | |
f.WClient.On("MergeObject", ctx, nodes[1], cls, shard, anyVal, merge).Return(resp, errAny) | |
f.WClient.On("Abort", ctx, nodes[0], cls, shard, anyVal).Return(resp, nil) | |
f.WClient.On("Abort", ctx, nodes[1], cls, shard, anyVal).Return(resp, nil) | |
err := rep.MergeObject(ctx, shard, merge, All) | |
assert.ErrorIs(t, err, errReplicas) | |
}) | |
t.Run("PhaseOneUnsuccessfulResponse", func(t *testing.T) { | |
f := newFakeFactory("C1", shard, nodes) | |
rep := f.newReplicator() | |
resp := SimpleResponse{} | |
f.WClient.On("MergeObject", ctx, nodes[0], cls, shard, anyVal, merge).Return(resp, nil) | |
resp2 := SimpleResponse{[]Error{{Err: errAny}}} | |
f.WClient.On("MergeObject", ctx, nodes[1], cls, shard, anyVal, merge).Return(resp2, nil) | |
f.WClient.On("Abort", ctx, nodes[0], cls, shard, anyVal).Return(resp, nil) | |
f.WClient.On("Abort", ctx, nodes[1], cls, shard, anyVal).Return(resp, nil) | |
err := rep.MergeObject(ctx, shard, merge, All) | |
assert.ErrorIs(t, err, errReplicas) | |
}) | |
t.Run("Commit", func(t *testing.T) { | |
f := newFakeFactory("C1", shard, nodes) | |
rep := f.newReplicator() | |
resp := SimpleResponse{} | |
for _, n := range nodes { | |
f.WClient.On("MergeObject", ctx, n, cls, shard, anyVal, merge).Return(resp, nil) | |
} | |
f.WClient.On("Commit", ctx, nodes[0], cls, shard, anyVal, anyVal).Return(nil) | |
f.WClient.On("Commit", ctx, nodes[1], cls, shard, anyVal, anyVal).Return(errAny) | |
err := rep.MergeObject(ctx, shard, merge, All) | |
assert.ErrorIs(t, err, errAny) | |
}) | |
} | |
func TestReplicatorDeleteObject(t *testing.T) { | |
var ( | |
cls = "C1" | |
shard = "SH1" | |
nodes = []string{"A", "B", "C"} | |
uuid = strfmt.UUID("1234") | |
ctx = context.Background() | |
) | |
t.Run("PhaseOneConnectionError", func(t *testing.T) { | |
factory := newFakeFactory("C1", shard, nodes) | |
client := factory.WClient | |
rep := factory.newReplicator() | |
resp := SimpleResponse{Errors: make([]Error, 1)} | |
for _, n := range nodes[:2] { | |
client.On("DeleteObject", ctx, n, cls, shard, anyVal, uuid).Return(resp, nil) | |
client.On("Commit", ctx, n, "C1", shard, anyVal, anyVal).Return(nil) | |
} | |
client.On("DeleteObject", ctx, "C", cls, shard, anyVal, uuid).Return(SimpleResponse{}, errAny) | |
for _, n := range nodes { | |
client.On("Abort", ctx, n, "C1", shard, anyVal).Return(resp, nil) | |
} | |
err := rep.DeleteObject(ctx, shard, uuid, All) | |
assert.NotNil(t, err) | |
assert.ErrorIs(t, err, errReplicas) | |
}) | |
t.Run("SuccessWithConsistencyLevelAll", func(t *testing.T) { | |
factory := newFakeFactory("C1", shard, nodes) | |
client := factory.WClient | |
rep := factory.newReplicator() | |
resp := SimpleResponse{Errors: make([]Error, 1)} | |
for _, n := range nodes { | |
client.On("DeleteObject", ctx, n, cls, shard, anyVal, uuid).Return(resp, nil) | |
client.On("Commit", ctx, n, "C1", shard, anyVal, anyVal).Return(nil) | |
} | |
assert.Nil(t, rep.DeleteObject(ctx, shard, uuid, All)) | |
assert.Nil(t, rep.DeleteObject(ctx, shard, uuid, Quorum)) | |
assert.Nil(t, rep.DeleteObject(ctx, shard, uuid, One)) | |
}) | |
t.Run("SuccessWithConsistencyQuorum", func(t *testing.T) { | |
factory := newFakeFactory("C1", shard, nodes) | |
client := factory.WClient | |
rep := factory.newReplicator() | |
resp := SimpleResponse{Errors: make([]Error, 1)} | |
for _, n := range nodes[:2] { | |
client.On("DeleteObject", ctx, n, cls, shard, anyVal, uuid).Return(resp, nil) | |
client.On("Commit", ctx, n, "C1", shard, anyVal, anyVal).Return(nil).RunFn = func(a mock.Arguments) { | |
resp := a[5].(*SimpleResponse) | |
*resp = SimpleResponse{ | |
Errors: []Error{{}}, | |
} | |
} | |
} | |
client.On("DeleteObject", ctx, "C", cls, shard, anyVal, uuid).Return(resp, nil) | |
client.On("Commit", ctx, "C", "C1", shard, anyVal, anyVal).Return(nil).RunFn = func(a mock.Arguments) { | |
resp := a[5].(*SimpleResponse) | |
*resp = SimpleResponse{ | |
Errors: []Error{{Msg: "e3"}}, | |
} | |
} | |
assert.NotNil(t, rep.DeleteObject(ctx, shard, uuid, All)) | |
assert.Nil(t, rep.DeleteObject(ctx, shard, uuid, Quorum)) | |
assert.Nil(t, rep.DeleteObject(ctx, shard, uuid, One)) | |
}) | |
t.Run("SuccessWithConsistencyQuorum", func(t *testing.T) { | |
factory := newFakeFactory("C1", shard, nodes) | |
client := factory.WClient | |
rep := factory.newReplicator() | |
resp := SimpleResponse{Errors: make([]Error, 1)} | |
for _, n := range nodes[:2] { | |
client.On("DeleteObject", ctx, n, cls, shard, anyVal, uuid).Return(resp, nil) | |
client.On("Commit", ctx, n, "C1", shard, anyVal, anyVal).Return(nil).RunFn = func(a mock.Arguments) { | |
resp := a[5].(*SimpleResponse) | |
*resp = SimpleResponse{ | |
Errors: []Error{{}}, | |
} | |
} | |
} | |
client.On("DeleteObject", ctx, "C", cls, shard, anyVal, uuid).Return(resp, nil) | |
client.On("Commit", ctx, "C", "C1", shard, anyVal, anyVal).Return(nil).RunFn = func(a mock.Arguments) { | |
resp := a[5].(*SimpleResponse) | |
*resp = SimpleResponse{ | |
Errors: []Error{{Msg: "e3"}}, | |
} | |
} | |
assert.NotNil(t, rep.DeleteObject(ctx, shard, uuid, All)) | |
assert.Nil(t, rep.DeleteObject(ctx, shard, uuid, Quorum)) | |
assert.Nil(t, rep.DeleteObject(ctx, shard, uuid, One)) | |
}) | |
} | |
func TestReplicatorDeleteObjects(t *testing.T) { | |
var ( | |
cls = "C1" | |
shard = "SH1" | |
nodes = []string{"A", "B"} | |
ctx = context.Background() | |
) | |
t.Run("PhaseOneConnectionError", func(t *testing.T) { | |
factory := newFakeFactory("C1", shard, nodes) | |
client := factory.WClient | |
docIDs := []strfmt.UUID{strfmt.UUID("1"), strfmt.UUID("2")} | |
client.On("DeleteObjects", ctx, nodes[0], cls, shard, anyVal, docIDs, false).Return(SimpleResponse{}, nil) | |
client.On("DeleteObjects", ctx, nodes[1], cls, shard, anyVal, docIDs, false).Return(SimpleResponse{}, errAny) | |
for _, n := range nodes { | |
client.On("Abort", ctx, n, "C1", shard, anyVal).Return(SimpleResponse{}, nil) | |
} | |
result := factory.newReplicator().DeleteObjects(ctx, shard, docIDs, false, All) | |
assert.Equal(t, len(result), 2) | |
for _, r := range result { | |
assert.ErrorIs(t, r.Err, errReplicas) | |
} | |
}) | |
t.Run("PhaseTwoDecodingError", func(t *testing.T) { | |
factory := newFakeFactory("C1", shard, nodes) | |
client := factory.WClient | |
docIDs := []strfmt.UUID{strfmt.UUID("1"), strfmt.UUID("2")} | |
for _, n := range nodes { | |
client.On("DeleteObjects", ctx, n, cls, shard, anyVal, docIDs, false).Return(SimpleResponse{}, nil) | |
client.On("Commit", ctx, n, cls, shard, anyVal, anyVal).Return(errAny) | |
} | |
result := factory.newReplicator().DeleteObjects(ctx, shard, docIDs, false, All) | |
assert.Equal(t, len(result), 2) | |
}) | |
t.Run("PartialSuccess", func(t *testing.T) { | |
factory := newFakeFactory("C1", shard, nodes) | |
client := factory.WClient | |
rep := factory.newReplicator() | |
docIDs := []strfmt.UUID{strfmt.UUID("1"), strfmt.UUID("2")} | |
resp1 := SimpleResponse{} | |
for _, n := range nodes { | |
client.On("DeleteObjects", ctx, n, cls, shard, anyVal, docIDs, false).Return(resp1, nil) | |
client.On("Commit", ctx, n, cls, shard, anyVal, anyVal).Return(nil).RunFn = func(args mock.Arguments) { | |
resp := args[5].(*DeleteBatchResponse) | |
*resp = DeleteBatchResponse{ | |
Batch: []UUID2Error{{"1", Error{}}, {"2", Error{Msg: "e1"}}}, | |
} | |
} | |
} | |
result := rep.DeleteObjects(ctx, shard, docIDs, false, All) | |
assert.Equal(t, len(result), 2) | |
assert.Equal(t, objects.BatchSimpleObject{UUID: "1", Err: nil}, result[0]) | |
assert.Equal(t, objects.BatchSimpleObject{UUID: "2", Err: &Error{Msg: "e1"}}, result[1]) | |
}) | |
t.Run("SuccessWithConsistencyLevelAll", func(t *testing.T) { | |
factory := newFakeFactory("C1", shard, nodes) | |
client := factory.WClient | |
rep := factory.newReplicator() | |
docIDs := []strfmt.UUID{strfmt.UUID("1"), strfmt.UUID("2")} | |
resp1 := SimpleResponse{} | |
for _, n := range nodes { | |
client.On("DeleteObjects", ctx, n, cls, shard, anyVal, docIDs, false).Return(resp1, nil) | |
client.On("Commit", ctx, n, cls, shard, anyVal, anyVal).Return(nil).RunFn = func(args mock.Arguments) { | |
resp := args[5].(*DeleteBatchResponse) | |
*resp = DeleteBatchResponse{ | |
Batch: []UUID2Error{{UUID: "1"}, {UUID: "2"}}, | |
} | |
} | |
} | |
result := rep.DeleteObjects(ctx, shard, docIDs, false, All) | |
assert.Equal(t, len(result), 2) | |
assert.Equal(t, objects.BatchSimpleObject{UUID: "1", Err: nil}, result[0]) | |
assert.Equal(t, objects.BatchSimpleObject{UUID: "2", Err: nil}, result[1]) | |
}) | |
t.Run("SuccessWithConsistencyLevelOne", func(t *testing.T) { | |
factory := newFakeFactory("C1", shard, nodes) | |
client := factory.WClient | |
rep := factory.newReplicator() | |
docIDs := []strfmt.UUID{strfmt.UUID("1"), strfmt.UUID("2")} | |
resp1 := SimpleResponse{} | |
client.On("DeleteObjects", ctx, nodes[0], cls, shard, anyVal, docIDs, false).Return(resp1, nil) | |
client.On("DeleteObjects", ctx, nodes[1], cls, shard, anyVal, docIDs, false).Return(resp1, errAny) | |
client.On("Commit", ctx, nodes[0], cls, shard, anyVal, anyVal).Return(nil).RunFn = func(args mock.Arguments) { | |
resp := args[5].(*DeleteBatchResponse) | |
*resp = DeleteBatchResponse{ | |
Batch: []UUID2Error{{UUID: "1"}, {UUID: "2"}}, | |
} | |
} | |
result := rep.DeleteObjects(ctx, shard, docIDs, false, One) | |
assert.Equal(t, len(result), 2) | |
assert.Equal(t, []objects.BatchSimpleObject{{UUID: "1"}, {UUID: "2"}}, result) | |
}) | |
t.Run("SuccessWithConsistencyQuorum", func(t *testing.T) { | |
nodes = []string{"A", "B", "C"} | |
factory := newFakeFactory("C1", shard, nodes) | |
client := factory.WClient | |
rep := factory.newReplicator() | |
docIDs := []strfmt.UUID{strfmt.UUID("1"), strfmt.UUID("2")} | |
resp1 := SimpleResponse{} | |
for _, n := range nodes { | |
client.On("DeleteObjects", ctx, n, cls, shard, anyVal, docIDs, false).Return(resp1, nil) | |
} | |
for _, n := range nodes[:2] { | |
client.On("Commit", ctx, n, cls, shard, anyVal, anyVal).Return(nil).RunFn = func(args mock.Arguments) { | |
resp := args[5].(*DeleteBatchResponse) | |
*resp = DeleteBatchResponse{ | |
Batch: []UUID2Error{{UUID: "1"}, {UUID: "2"}}, | |
} | |
} | |
} | |
client.On("Commit", ctx, "C", cls, shard, anyVal, anyVal).Return(nil).RunFn = func(args mock.Arguments) { | |
resp := args[5].(*DeleteBatchResponse) | |
*resp = DeleteBatchResponse{ | |
Batch: []UUID2Error{{UUID: "1"}, {UUID: "2", Error: Error{Msg: "e2"}}}, | |
} | |
} | |
result := rep.DeleteObjects(ctx, shard, docIDs, false, Quorum) | |
assert.Equal(t, len(result), 2) | |
assert.Equal(t, []objects.BatchSimpleObject{{UUID: "1"}, {UUID: "2"}}, result) | |
}) | |
} | |
func TestReplicatorPutObjects(t *testing.T) { | |
var ( | |
cls = "C1" | |
shard = "SH1" | |
nodes = []string{"A", "B"} | |
ctx = context.Background() | |
objs = []*storobj.Object{{}, {}, {}} | |
resp1 = SimpleResponse{[]Error{{}}} | |
) | |
t.Run("SuccessWithConsistencyLevelAll", func(t *testing.T) { | |
f := newFakeFactory("C1", shard, nodes) | |
rep := f.newReplicator() | |
resp := SimpleResponse{Errors: make([]Error, 3)} | |
for _, n := range nodes { | |
f.WClient.On("PutObjects", ctx, n, cls, shard, anyVal, objs).Return(resp, nil) | |
f.WClient.On("Commit", ctx, n, cls, shard, anyVal, anyVal).Return(nil) | |
} | |
errs := rep.PutObjects(ctx, shard, objs, All) | |
assert.Equal(t, []error{nil, nil, nil}, errs) | |
}) | |
t.Run("SuccessWithConsistencyLevelOne", func(t *testing.T) { | |
nodes := []string{"A", "B", "C"} | |
f := newFakeFactory("C1", shard, nodes) | |
rep := f.newReplicator() | |
for _, n := range nodes[:2] { | |
f.WClient.On("PutObjects", ctx, n, cls, shard, anyVal, objs).Return(resp1, nil) | |
f.WClient.On("Commit", ctx, n, cls, shard, anyVal, anyVal).Return(nil).RunFn = func(a mock.Arguments) { | |
resp := a[5].(*SimpleResponse) | |
*resp = SimpleResponse{Errors: []Error{{}, {}, {Msg: "e3"}}} | |
} | |
} | |
f.WClient.On("PutObjects", ctx, "C", cls, shard, anyVal, objs).Return(resp1, nil) | |
f.WClient.On("Commit", ctx, "C", cls, shard, anyVal, anyVal).Return(nil).RunFn = func(a mock.Arguments) { | |
resp := a[5].(*SimpleResponse) | |
*resp = SimpleResponse{Errors: make([]Error, 3)} | |
} | |
errs := rep.PutObjects(ctx, shard, objs, One) | |
assert.Equal(t, []error{nil, nil, nil}, errs) | |
}) | |
t.Run("SuccessWithConsistencyLevelQuorum", func(t *testing.T) { | |
nodes := []string{"A", "B", "C"} | |
f := newFakeFactory("C1", shard, nodes) | |
rep := f.newReplicator() | |
for _, n := range nodes[:2] { | |
f.WClient.On("PutObjects", ctx, n, cls, shard, anyVal, objs).Return(resp1, nil) | |
f.WClient.On("Commit", ctx, n, cls, shard, anyVal, anyVal).Return(nil).RunFn = func(a mock.Arguments) { | |
resp := a[5].(*SimpleResponse) | |
*resp = SimpleResponse{Errors: []Error{{}}} | |
} | |
} | |
f.WClient.On("PutObjects", ctx, "C", cls, shard, anyVal, objs).Return(resp1, nil) | |
f.WClient.On("Commit", ctx, "C", cls, shard, anyVal, anyVal).Return(nil).RunFn = func(a mock.Arguments) { | |
resp := a[5].(*SimpleResponse) | |
*resp = SimpleResponse{Errors: []Error{{Msg: "e3"}}} | |
} | |
errs := rep.PutObjects(ctx, shard, objs, Quorum) | |
assert.Equal(t, []error{nil, nil, nil}, errs) | |
}) | |
t.Run("PhaseOneConnectionError", func(t *testing.T) { | |
f := newFakeFactory("C1", shard, nodes) | |
rep := f.newReplicator() | |
f.WClient.On("PutObjects", ctx, nodes[0], cls, shard, anyVal, objs).Return(resp1, nil) | |
f.WClient.On("PutObjects", ctx, nodes[1], cls, shard, anyVal, objs).Return(resp1, errAny) | |
f.WClient.On("Abort", ctx, nodes[0], "C1", shard, anyVal).Return(resp1, nil) | |
f.WClient.On("Abort", ctx, nodes[1], "C1", shard, anyVal).Return(resp1, nil) | |
errs := rep.PutObjects(ctx, shard, objs, All) | |
assert.Equal(t, 3, len(errs)) | |
assert.ErrorIs(t, errs[0], errReplicas) | |
}) | |
t.Run("PhaseOneUnsuccessfulResponse", func(t *testing.T) { | |
f := newFakeFactory("C1", shard, nodes) | |
rep := f.newReplicator() | |
f.WClient.On("PutObjects", ctx, nodes[0], cls, shard, anyVal, objs).Return(resp1, nil) | |
resp2 := SimpleResponse{[]Error{{Msg: "E1"}, {Msg: "E2"}}} | |
f.WClient.On("PutObjects", ctx, nodes[1], cls, shard, anyVal, objs).Return(resp2, nil) | |
f.WClient.On("Abort", ctx, nodes[0], "C1", shard, anyVal).Return(resp1, nil) | |
f.WClient.On("Abort", ctx, nodes[1], "C1", shard, anyVal).Return(resp1, nil) | |
errs := rep.PutObjects(ctx, shard, objs, All) | |
assert.Equal(t, 3, len(errs)) | |
for _, err := range errs { | |
assert.ErrorIs(t, err, errReplicas) | |
} | |
}) | |
t.Run("PhaseTwoDecodingError", func(t *testing.T) { | |
f := newFakeFactory(cls, shard, nodes) | |
rep := f.newReplicator() | |
for _, n := range nodes { | |
f.WClient.On("PutObjects", ctx, n, cls, shard, anyVal, objs).Return(resp1, nil) | |
} | |
f.WClient.On("Commit", ctx, nodes[0], cls, shard, anyVal, anyVal).Return(nil).RunFn = func(a mock.Arguments) { | |
resp := a[5].(*SimpleResponse) | |
*resp = SimpleResponse{Errors: make([]Error, 3)} | |
} | |
f.WClient.On("Commit", ctx, nodes[1], cls, shard, anyVal, anyVal).Return(errAny) | |
errs := rep.PutObjects(ctx, shard, objs, All) | |
assert.Equal(t, len(errs), 3) | |
assert.ErrorIs(t, errs[0], errAny) | |
assert.ErrorIs(t, errs[1], errAny) | |
assert.ErrorIs(t, errs[2], errAny) | |
}) | |
t.Run("PhaseTwoUnsuccessfulResponse", func(t *testing.T) { | |
f := newFakeFactory(cls, shard, nodes) | |
rep := f.newReplicator() | |
node2Errs := []Error{{Msg: "E1"}, {}, {Msg: "E3"}} | |
for _, n := range nodes { | |
f.WClient.On("PutObjects", ctx, n, cls, shard, anyVal, objs).Return(resp1, nil) | |
} | |
f.WClient.On("Commit", ctx, nodes[0], cls, shard, anyVal, anyVal).Return(nil).RunFn = func(a mock.Arguments) { | |
resp := a[5].(*SimpleResponse) | |
*resp = SimpleResponse{Errors: make([]Error, 3)} | |
} | |
f.WClient.On("Commit", ctx, nodes[1], cls, shard, anyVal, anyVal).Return(errAny).RunFn = func(a mock.Arguments) { | |
resp := a[5].(*SimpleResponse) | |
*resp = SimpleResponse{Errors: node2Errs} | |
} | |
errs := rep.PutObjects(ctx, shard, objs, All) | |
assert.Equal(t, len(errs), len(objs)) | |
wantError := []error{&node2Errs[0], nil, &node2Errs[2]} | |
assert.Equal(t, wantError, errs) | |
}) | |
} | |
func TestReplicatorAddReferences(t *testing.T) { | |
var ( | |
cls = "C1" | |
shard = "SH1" | |
nodes = []string{"A", "B"} | |
ctx = context.Background() | |
refs = []objects.BatchReference{{}, {}} | |
) | |
t.Run("Success", func(t *testing.T) { | |
f := newFakeFactory("C1", shard, nodes) | |
rep := f.newReplicator() | |
resp := SimpleResponse{} | |
for _, n := range nodes { | |
f.WClient.On("AddReferences", ctx, n, cls, shard, anyVal, refs).Return(resp, nil) | |
f.WClient.On("Commit", ctx, n, cls, shard, anyVal, anyVal).Return(nil) | |
} | |
errs := rep.AddReferences(ctx, shard, refs, All) | |
assert.Equal(t, []error{nil, nil}, errs) | |
}) | |
t.Run("PhaseOneConnectionError", func(t *testing.T) { | |
f := newFakeFactory("C1", shard, nodes) | |
rep := f.newReplicator() | |
resp := SimpleResponse{} | |
f.WClient.On("AddReferences", ctx, nodes[0], cls, shard, anyVal, refs).Return(resp, nil) | |
f.WClient.On("AddReferences", ctx, nodes[1], cls, shard, anyVal, refs).Return(resp, errAny) | |
f.WClient.On("Abort", ctx, nodes[0], "C1", shard, anyVal).Return(resp, nil) | |
f.WClient.On("Abort", ctx, nodes[1], "C1", shard, anyVal).Return(resp, nil) | |
errs := rep.AddReferences(ctx, shard, refs, All) | |
assert.Equal(t, 2, len(errs)) | |
assert.ErrorIs(t, errs[0], errReplicas) | |
}) | |
t.Run("PhaseOneUnsuccessfulResponse", func(t *testing.T) { | |
f := newFakeFactory("C1", shard, nodes) | |
rep := f.newReplicator() | |
resp := SimpleResponse{} | |
f.WClient.On("AddReferences", ctx, nodes[0], cls, shard, anyVal, refs).Return(resp, nil) | |
resp2 := SimpleResponse{[]Error{{Msg: "E1"}, {Msg: "E2"}}} | |
f.WClient.On("AddReferences", ctx, nodes[1], cls, shard, anyVal, refs).Return(resp2, nil) | |
f.WClient.On("Abort", ctx, nodes[0], "C1", shard, anyVal).Return(resp, nil) | |
f.WClient.On("Abort", ctx, nodes[1], "C1", shard, anyVal).Return(resp, nil) | |
errs := rep.AddReferences(ctx, shard, refs, All) | |
assert.Equal(t, 2, len(errs)) | |
for _, err := range errs { | |
assert.ErrorIs(t, err, errReplicas) | |
} | |
}) | |
t.Run("Commit", func(t *testing.T) { | |
f := newFakeFactory(cls, shard, nodes) | |
rep := f.newReplicator() | |
resp := SimpleResponse{} | |
for _, n := range nodes { | |
f.WClient.On("AddReferences", ctx, n, cls, shard, anyVal, refs).Return(resp, nil) | |
} | |
f.WClient.On("Commit", ctx, nodes[0], cls, shard, anyVal, anyVal).Return(nil) | |
f.WClient.On("Commit", ctx, nodes[1], cls, shard, anyVal, anyVal).Return(errAny) | |
errs := rep.AddReferences(ctx, shard, refs, All) | |
assert.Equal(t, len(errs), 2) | |
assert.ErrorIs(t, errs[0], errAny) | |
assert.ErrorIs(t, errs[1], errAny) | |
}) | |
} | |
type fakeFactory struct { | |
CLS string | |
Nodes []string | |
Shard2replicas map[string][]string | |
WClient *fakeClient | |
RClient *fakeRClient | |
log *logrus.Logger | |
hook *test.Hook | |
} | |
func newFakeFactory(class, shard string, nodes []string) *fakeFactory { | |
logger, hook := test.NewNullLogger() | |
return &fakeFactory{ | |
CLS: class, | |
Nodes: nodes, | |
Shard2replicas: map[string][]string{shard: nodes}, | |
WClient: &fakeClient{}, | |
RClient: &fakeRClient{}, | |
log: logger, | |
hook: hook, | |
} | |
} | |
func (f fakeFactory) AddShard(shard string, nodes []string) { | |
f.Shard2replicas[shard] = nodes | |
} | |
func (f fakeFactory) newReplicator() *Replicator { | |
nodeResolver := newFakeNodeResolver(f.Nodes) | |
shardingState := newFakeShardingState("A", f.Shard2replicas, nodeResolver) | |
return NewReplicator( | |
f.CLS, | |
shardingState, | |
nodeResolver, | |
struct { | |
rClient | |
wClient | |
}{f.RClient, f.WClient}, f.log) | |
} | |
func (f fakeFactory) newFinder(thisNode string) *Finder { | |
nodeResolver := newFakeNodeResolver(f.Nodes) | |
resolver := &resolver{ | |
Schema: newFakeShardingState(thisNode, f.Shard2replicas, nodeResolver), | |
nodeResolver: nodeResolver, | |
Class: f.CLS, | |
NodeName: thisNode, | |
} | |
return NewFinder(f.CLS, resolver, f.RClient, f.log) | |
} | |
func (f fakeFactory) assertLogContains(t *testing.T, key string, xs ...string) { | |
t.Helper() | |
// logging might happen after returning to the caller | |
// Therefore, we need to make sure that the goroutine | |
// running in the background is writing to the log | |
entry := f.hook.LastEntry() | |
for i := 0; entry == nil && i < 20; i++ { | |
<-time.After(time.Millisecond * 10) | |
entry = f.hook.LastEntry() | |
} | |
data := "" | |
if entry != nil { | |
data, _ = entry.Data[key].(string) | |
} else { | |
t.Errorf("log entry is empty") | |
return | |
} | |
for _, x := range xs { | |
assert.Contains(t, data, x) | |
} | |
} | |
func (f fakeFactory) assertLogErrorContains(t *testing.T, xs ...string) { | |
t.Helper() | |
// logging might happen after returning to the caller | |
// Therefore, we need to make sure that the goroutine | |
// running in the background is writing to the log | |
entry := f.hook.LastEntry() | |
for i := 0; entry == nil && i < 20; i++ { | |
<-time.After(time.Millisecond * 10) | |
entry = f.hook.LastEntry() | |
} | |
if entry == nil { | |
t.Errorf("log entry is empty") | |
return | |
} | |
for _, x := range xs { | |
assert.Contains(t, entry.Message, x) | |
} | |
} | |