SemanticSearchPOC / usecases /cluster /transactions_test.go
KevinStephenson
Adding in weaviate code
b110593
raw
history blame
14.7 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package cluster
import (
"context"
"fmt"
"sync"
"testing"
"time"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestSuccessfulOutgoingWriteTransaction(t *testing.T) {
payload := "my-payload"
trType := TransactionType("my-type")
ctx := context.Background()
man := newTestTxManager()
tx, err := man.BeginTransaction(ctx, trType, payload, 0)
require.Nil(t, err)
err = man.CommitWriteTransaction(ctx, tx)
require.Nil(t, err)
}
func TestTryingToOpenTwoTransactions(t *testing.T) {
payload := "my-payload"
trType := TransactionType("my-type")
ctx := context.Background()
man := newTestTxManager()
tx1, err := man.BeginTransaction(ctx, trType, payload, 0)
require.Nil(t, err)
tx2, err := man.BeginTransaction(ctx, trType, payload, 0)
assert.Nil(t, tx2)
require.NotNil(t, err)
assert.Equal(t, "concurrent transaction", err.Error())
err = man.CommitWriteTransaction(ctx, tx1)
assert.Nil(t, err, "original transaction can still be committed")
}
func TestTryingToCommitInvalidTransaction(t *testing.T) {
payload := "my-payload"
trType := TransactionType("my-type")
ctx := context.Background()
man := newTestTxManager()
tx1, err := man.BeginTransaction(ctx, trType, payload, 0)
require.Nil(t, err)
invalidTx := &Transaction{ID: "invalid"}
err = man.CommitWriteTransaction(ctx, invalidTx)
require.NotNil(t, err)
assert.Equal(t, "invalid transaction", err.Error())
err = man.CommitWriteTransaction(ctx, tx1)
assert.Nil(t, err, "original transaction can still be committed")
}
func TestTryingToCommitTransactionPastTTL(t *testing.T) {
payload := "my-payload"
trType := TransactionType("my-type")
ctx := context.Background()
man := newTestTxManager()
tx1, err := man.BeginTransaction(ctx, trType, payload, time.Microsecond)
require.Nil(t, err)
expiredTx := &Transaction{ID: tx1.ID}
// give the cancel handler some time to run
time.Sleep(50 * time.Millisecond)
err = man.CommitWriteTransaction(ctx, expiredTx)
require.NotNil(t, err)
assert.Contains(t, err.Error(), "transaction TTL")
// make sure it is possible to open future transactions
_, err = man.BeginTransaction(context.Background(), trType, payload, 0)
require.Nil(t, err)
}
func TestTryingToCommitIncomingTransactionPastTTL(t *testing.T) {
payload := "my-payload"
trType := TransactionType("my-type")
ctx := context.Background()
man := newTestTxManager()
dl := time.Now().Add(1 * time.Microsecond)
tx := &Transaction{
ID: "123456",
Type: trType,
Payload: payload,
Deadline: dl,
}
man.IncomingBeginTransaction(context.Background(), tx)
// give the cancel handler some time to run
time.Sleep(50 * time.Millisecond)
err := man.IncomingCommitTransaction(ctx, tx)
require.NotNil(t, err)
assert.Contains(t, err.Error(), "transaction TTL")
// make sure it is possible to open future transactions
_, err = man.BeginTransaction(context.Background(), trType, payload, 0)
require.Nil(t, err)
}
func TestLettingATransactionExpire(t *testing.T) {
payload := "my-payload"
trType := TransactionType("my-type")
ctx := context.Background()
man := newTestTxManager()
tx1, err := man.BeginTransaction(ctx, trType, payload, time.Microsecond)
require.Nil(t, err)
// give the cancel handler some time to run
time.Sleep(50 * time.Millisecond)
// try to open a new one
_, err = man.BeginTransaction(context.Background(), trType, payload, 0)
require.Nil(t, err)
// since the old one expired, we now expect a TTL error instead of a
// concurrent tx error when trying to refer to the old one
err = man.CommitWriteTransaction(context.Background(), tx1)
require.NotNil(t, err)
assert.Contains(t, err.Error(), "transaction TTL")
}
func TestRemoteDoesntAllowOpeningTransaction(t *testing.T) {
payload := "my-payload"
trType := TransactionType("my-type")
ctx := context.Background()
broadcaster := &fakeBroadcaster{
openErr: ErrConcurrentTransaction,
}
man := newTestTxManagerWithRemote(broadcaster)
tx1, err := man.BeginTransaction(ctx, trType, payload, 0)
require.Nil(t, tx1)
require.NotNil(t, err)
assert.Contains(t, err.Error(), "open transaction")
assert.Len(t, broadcaster.abortCalledId, 36, "a valid uuid was aborted")
}
func TestRemoteDoesntAllowOpeningTransactionAbortFails(t *testing.T) {
payload := "my-payload"
trType := TransactionType("my-type")
ctx := context.Background()
broadcaster := &fakeBroadcaster{
openErr: ErrConcurrentTransaction,
abortErr: fmt.Errorf("cannot abort"),
}
man, hook := newTestTxManagerWithRemoteLoggerHook(broadcaster)
tx1, err := man.BeginTransaction(ctx, trType, payload, 0)
require.Nil(t, tx1)
require.NotNil(t, err)
assert.Contains(t, err.Error(), "open transaction")
assert.Len(t, broadcaster.abortCalledId, 36, "a valid uuid was aborted")
require.Len(t, hook.Entries, 1)
assert.Equal(t, "broadcast tx abort failed", hook.Entries[0].Message)
}
type fakeBroadcaster struct {
openErr error
commitErr error
abortErr error
abortCalledId string
}
func (f *fakeBroadcaster) BroadcastTransaction(ctx context.Context,
tx *Transaction,
) error {
return f.openErr
}
func (f *fakeBroadcaster) BroadcastAbortTransaction(ctx context.Context,
tx *Transaction,
) error {
f.abortCalledId = tx.ID
return f.abortErr
}
func (f *fakeBroadcaster) BroadcastCommitTransaction(ctx context.Context,
tx *Transaction,
) error {
return f.commitErr
}
func TestSuccessfulDistributedWriteTransaction(t *testing.T) {
ctx := context.Background()
var remoteState interface{}
remote := newTestTxManager()
remote.SetCommitFn(func(ctx context.Context, tx *Transaction) error {
remoteState = tx.Payload
return nil
})
local := NewTxManager(&wrapTxManagerAsBroadcaster{remote},
&fakeTxPersistence{}, remote.logger)
local.StartAcceptIncoming()
payload := "my-payload"
trType := TransactionType("my-type")
tx, err := local.BeginTransaction(ctx, trType, payload, 0)
require.Nil(t, err)
err = local.CommitWriteTransaction(ctx, tx)
require.Nil(t, err)
assert.Equal(t, "my-payload", remoteState)
}
func TestConcurrentDistributedTransaction(t *testing.T) {
ctx := context.Background()
var remoteState interface{}
remote := newTestTxManager()
remote.SetCommitFn(func(ctx context.Context, tx *Transaction) error {
remoteState = tx.Payload
return nil
})
local := NewTxManager(&wrapTxManagerAsBroadcaster{remote},
&fakeTxPersistence{}, remote.logger)
payload := "my-payload"
trType := TransactionType("my-type")
// open a transaction on the remote to simulate a concurrent transaction.
// Since it uses the fakeBroadcaster it does not tell anyone about it, this
// way we can be sure that the reason for failure is actually a concurrent
// transaction on the remote side, not on the local side. Compare this to a
// situation where broadcasting was bi-directional: Then this transaction
// would have been opened successfully and already be replicated to the
// "local" tx manager. So the next call on "local" would also fail, but for
// the wrong reason: It would fail because another transaction is already in
// place. We, however want to simulate a situation where due to network
// delays, etc. both sides try to open a transaction more or less in
// parallel.
_, err := remote.BeginTransaction(ctx, trType, "wrong payload", 0)
require.Nil(t, err)
tx, err := local.BeginTransaction(ctx, trType, payload, 0)
require.Nil(t, tx)
require.NotNil(t, err)
assert.Contains(t, err.Error(), "concurrent transaction")
assert.Equal(t, nil, remoteState, "remote state should not have been updated")
}
// This test simulates three nodes trying to open a tx at basically the same
// time with the simulated network being so slow that other nodes will try to
// open their own transactions before they receive the incoming tx. This is a
// situation where everyone thinks they were the first to open the tx and there
// is no clear winner. All attempts must fail!
func TestConcurrentOpenAttemptsOnSlowNetwork(t *testing.T) {
ctx := context.Background()
broadcaster := &slowMultiBroadcaster{delay: 100 * time.Millisecond}
node1 := newTestTxManagerWithRemote(broadcaster)
node2 := newTestTxManagerWithRemote(broadcaster)
node3 := newTestTxManagerWithRemote(broadcaster)
broadcaster.nodes = []*TxManager{node1, node2, node3}
trType := TransactionType("my-type")
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
_, err := node1.BeginTransaction(ctx, trType, "payload-from-node-1", 0)
assert.NotNil(t, err, "open tx 1 must fail")
}()
wg.Add(1)
go func() {
defer wg.Done()
_, err := node2.BeginTransaction(ctx, trType, "payload-from-node-2", 0)
assert.NotNil(t, err, "open tx 2 must fail")
}()
wg.Add(1)
go func() {
defer wg.Done()
_, err := node3.BeginTransaction(ctx, trType, "payload-from-node-3", 0)
assert.NotNil(t, err, "open tx 3 must fail")
}()
wg.Wait()
}
type wrapTxManagerAsBroadcaster struct {
txManager *TxManager
}
func (w *wrapTxManagerAsBroadcaster) BroadcastTransaction(ctx context.Context,
tx *Transaction,
) error {
_, err := w.txManager.IncomingBeginTransaction(ctx, tx)
return err
}
func (w *wrapTxManagerAsBroadcaster) BroadcastAbortTransaction(ctx context.Context,
tx *Transaction,
) error {
w.txManager.IncomingAbortTransaction(ctx, tx)
return nil
}
func (w *wrapTxManagerAsBroadcaster) BroadcastCommitTransaction(ctx context.Context,
tx *Transaction,
) error {
return w.txManager.IncomingCommitTransaction(ctx, tx)
}
type slowMultiBroadcaster struct {
delay time.Duration
nodes []*TxManager
}
func (b *slowMultiBroadcaster) BroadcastTransaction(ctx context.Context,
tx *Transaction,
) error {
time.Sleep(b.delay)
for _, node := range b.nodes {
if _, err := node.IncomingBeginTransaction(ctx, tx); err != nil {
return err
}
}
return nil
}
func (b *slowMultiBroadcaster) BroadcastAbortTransaction(ctx context.Context,
tx *Transaction,
) error {
time.Sleep(b.delay)
for _, node := range b.nodes {
node.IncomingAbortTransaction(ctx, tx)
}
return nil
}
func (b *slowMultiBroadcaster) BroadcastCommitTransaction(ctx context.Context,
tx *Transaction,
) error {
time.Sleep(b.delay)
for _, node := range b.nodes {
if err := node.IncomingCommitTransaction(ctx, tx); err != nil {
return err
}
}
return nil
}
func TestSuccessfulDistributedReadTransaction(t *testing.T) {
ctx := context.Background()
payload := "my-payload"
remote := newTestTxManager()
remote.SetResponseFn(func(ctx context.Context, tx *Transaction) ([]byte, error) {
tx.Payload = payload
return nil, nil
})
local := NewTxManager(&wrapTxManagerAsBroadcaster{remote},
&fakeTxPersistence{}, remote.logger)
// TODO local.SetConsensusFn
trType := TransactionType("my-read-tx")
tx, err := local.BeginTransaction(ctx, trType, nil, 0)
require.Nil(t, err)
local.CloseReadTransaction(ctx, tx)
assert.Equal(t, "my-payload", tx.Payload)
}
func TestSuccessfulDistributedTransactionSetAllowUnready(t *testing.T) {
ctx := context.Background()
payload := "my-payload"
types := []TransactionType{"type0", "type1"}
remote := newTestTxManagerAllowUnready(types)
remote.SetResponseFn(func(ctx context.Context, tx *Transaction) ([]byte, error) {
tx.Payload = payload
return nil, nil
})
local := NewTxManager(&wrapTxManagerAsBroadcaster{remote},
&fakeTxPersistence{}, remote.logger)
local.SetAllowUnready(types)
trType := TransactionType("my-read-tx")
tx, err := local.BeginTransaction(ctx, trType, nil, 0)
require.Nil(t, err)
local.CloseReadTransaction(ctx, tx)
assert.ElementsMatch(t, types, remote.allowUnready)
assert.ElementsMatch(t, types, local.allowUnready)
assert.Equal(t, "my-payload", tx.Payload)
}
func TestTxWithDeadline(t *testing.T) {
t.Run("expired", func(t *testing.T) {
payload := "my-payload"
trType := TransactionType("my-type")
ctx := context.Background()
man := newTestTxManager()
tx, err := man.BeginTransaction(ctx, trType, payload, 1*time.Nanosecond)
require.Nil(t, err)
ctx, cancel := context.WithDeadline(context.Background(), tx.Deadline)
defer cancel()
assert.NotNil(t, ctx.Err())
})
t.Run("still valid", func(t *testing.T) {
payload := "my-payload"
trType := TransactionType("my-type")
ctx := context.Background()
man := newTestTxManager()
tx, err := man.BeginTransaction(ctx, trType, payload, 10*time.Second)
require.Nil(t, err)
ctx, cancel := context.WithDeadline(context.Background(), tx.Deadline)
defer cancel()
assert.Nil(t, ctx.Err())
})
}
func newTestTxManager() *TxManager {
logger, _ := test.NewNullLogger()
m := NewTxManager(&fakeBroadcaster{}, &fakeTxPersistence{}, logger)
m.StartAcceptIncoming()
return m
}
func newTestTxManagerWithRemote(remote Remote) *TxManager {
logger, _ := test.NewNullLogger()
m := NewTxManager(remote, &fakeTxPersistence{}, logger)
m.StartAcceptIncoming()
return m
}
func newTestTxManagerWithRemoteLoggerHook(remote Remote) (*TxManager, *test.Hook) {
logger, hook := test.NewNullLogger()
m := NewTxManager(remote, &fakeTxPersistence{}, logger)
m.StartAcceptIncoming()
return m, hook
}
func newTestTxManagerAllowUnready(types []TransactionType) *TxManager {
logger, _ := test.NewNullLogger()
m := NewTxManager(&fakeBroadcaster{}, &fakeTxPersistence{}, logger)
m.SetAllowUnready(types)
m.StartAcceptIncoming()
return m
}
// does nothing as these do not involve crashes
type fakeTxPersistence struct{}
func (f *fakeTxPersistence) StoreTx(ctx context.Context,
tx *Transaction,
) error {
return nil
}
func (f *fakeTxPersistence) DeleteTx(ctx context.Context,
txID string,
) error {
return nil
}
func (f *fakeTxPersistence) IterateAll(ctx context.Context,
cb func(tx *Transaction),
) error {
return nil
}