SemanticSearchPOC / adapters /clients /cluster_schema_test.go
KevinStephenson
Adding in weaviate code
b110593
raw
history blame
10.9 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package clients
import (
"context"
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaviate/weaviate/usecases/cluster"
)
func TestOpenTransactionNoReturnPayload(t *testing.T) {
// The No-Return-Payload is the situation that existed prior to v1.17 where
// the only option for transactions was to broadcast updates. By keeping this
// test around, we can make sure that we are not breaking backward
// compatibility.
handler := func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
body, err := io.ReadAll(r.Body)
require.Nil(t, err)
var pl txPayload
err = json.Unmarshal(body, &pl)
require.Nil(t, err)
assert.Equal(t, "mamma-mia-paylodia-belissima", pl.Payload.(string))
w.WriteHeader(http.StatusCreated)
}
server := httptest.NewServer(http.HandlerFunc(handler))
defer server.Close()
u, _ := url.Parse(server.URL)
c := NewClusterSchema(&http.Client{})
tx := &cluster.Transaction{
ID: "12345",
Type: "the best",
Payload: "mamma-mia-paylodia-belissima",
}
err := c.OpenTransaction(context.Background(), u.Host, tx)
assert.Nil(t, err)
}
func TestOpenTransactionWithReturnPayload(t *testing.T) {
// Newly added as part of v1.17 where read-transactions were introduced which
// are used to sync up cluster schema state when nodes join
handler := func(w http.ResponseWriter, r *http.Request) {
resTx := txPayload{
Type: "my-tx",
ID: "987",
Payload: "gracie-mille-per-payload",
}
resBody, err := json.Marshal(resTx)
require.Nil(t, err)
w.WriteHeader(http.StatusCreated)
w.Write(resBody)
}
server := httptest.NewServer(http.HandlerFunc(handler))
defer server.Close()
u, _ := url.Parse(server.URL)
c := NewClusterSchema(&http.Client{})
txIn := &cluster.Transaction{
ID: "987",
Type: "my-tx",
}
err := c.OpenTransaction(context.Background(), u.Host, txIn)
assert.Nil(t, err)
expectedTxOut := &cluster.Transaction{
ID: "987",
Type: "my-tx",
Payload: json.RawMessage("\"gracie-mille-per-payload\""),
}
assert.Equal(t, expectedTxOut, txIn)
}
func TestOpenTransactionWithTTL(t *testing.T) {
deadline, err := time.Parse(time.RFC3339Nano, "2040-01-02T15:04:05.00Z")
require.Nil(t, err)
handler := func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
body, err := io.ReadAll(r.Body)
require.Nil(t, err)
var pl txPayload
err = json.Unmarshal(body, &pl)
require.Nil(t, err)
parsedDL := time.UnixMilli(pl.DeadlineMilli)
assert.Equal(t, deadline.UnixNano(), parsedDL.UnixNano())
w.WriteHeader(http.StatusCreated)
}
server := httptest.NewServer(http.HandlerFunc(handler))
defer server.Close()
u, _ := url.Parse(server.URL)
c := NewClusterSchema(&http.Client{})
tx := &cluster.Transaction{
ID: "12345",
Type: "the best",
Payload: "mamma-mia-paylodia-belissima",
Deadline: deadline,
}
err = c.OpenTransaction(context.Background(), u.Host, tx)
assert.Nil(t, err)
}
func TestOpenTransactionUnhappyPaths(t *testing.T) {
type test struct {
name string
handler http.HandlerFunc
expectedErr error
expectedErrContains string
ctx context.Context
shutdownPrematurely bool
}
expiredCtx, cancel := context.WithCancel(context.Background())
cancel()
tests := []test{
{
name: "concurrent transaction",
handler: func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
w.WriteHeader(http.StatusConflict)
},
expectedErr: cluster.ErrConcurrentTransaction,
},
{
name: "arbitrary 500",
handler: func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("nope!"))
},
expectedErrContains: "nope!",
},
{
name: "invalid json",
handler: func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
w.WriteHeader(http.StatusCreated)
w.Write([]byte("<<<!@#*)!@#****@!''"))
},
expectedErrContains: "error unmarshalling",
},
{
name: "expired ctx",
ctx: expiredCtx,
handler: func(w http.ResponseWriter, r *http.Request) {
},
expectedErrContains: "context",
},
{
name: "remote server shut down",
shutdownPrematurely: true,
handler: func(w http.ResponseWriter, r *http.Request) {
},
expectedErrContains: "refused",
},
{
name: "tx id mismatch",
handler: func(w http.ResponseWriter, r *http.Request) {
resTx := txPayload{
Type: "wrong-tx-id",
ID: "987",
Payload: "gracie-mille-per-payload",
}
resBody, err := json.Marshal(resTx)
require.Nil(t, err)
w.WriteHeader(http.StatusCreated)
w.Write(resBody)
},
expectedErrContains: "mismatch between outgoing and incoming tx ids",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(test.handler))
if test.shutdownPrematurely {
server.Close()
} else {
defer server.Close()
}
u, _ := url.Parse(server.URL)
c := NewClusterSchema(&http.Client{})
tx := &cluster.Transaction{
ID: "12345",
Type: "the best",
Payload: "mamma-mia-paylodia-belissima",
}
if test.ctx == nil {
test.ctx = context.Background()
}
err := c.OpenTransaction(test.ctx, u.Host, tx)
assert.NotNil(t, err)
if test.expectedErr != nil {
assert.Equal(t, test.expectedErr, err)
}
if test.expectedErrContains != "" {
assert.Contains(t, err.Error(), test.expectedErrContains)
}
})
}
}
func TestAbortTransaction(t *testing.T) {
handler := func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
w.WriteHeader(http.StatusNoContent)
}
server := httptest.NewServer(http.HandlerFunc(handler))
defer server.Close()
u, _ := url.Parse(server.URL)
c := NewClusterSchema(&http.Client{})
tx := &cluster.Transaction{
ID: "am-i-going-to-be-cancelled",
Type: "the worst",
Payload: "",
}
err := c.AbortTransaction(context.Background(), u.Host, tx)
assert.Nil(t, err)
}
func TestAbortTransactionUnhappyPaths(t *testing.T) {
type test struct {
name string
handler http.HandlerFunc
expectedErr error
expectedErrContains string
ctx context.Context
shutdownPrematurely bool
}
expiredCtx, cancel := context.WithCancel(context.Background())
cancel()
tests := []test{
{
name: "arbitrary 500",
handler: func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("nope!"))
},
expectedErrContains: "nope!",
},
{
name: "expired ctx",
ctx: expiredCtx,
handler: func(w http.ResponseWriter, r *http.Request) {
},
expectedErrContains: "context",
},
{
name: "remote server shut down",
shutdownPrematurely: true,
handler: func(w http.ResponseWriter, r *http.Request) {
},
expectedErrContains: "refused",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(test.handler))
if test.shutdownPrematurely {
server.Close()
} else {
defer server.Close()
}
u, _ := url.Parse(server.URL)
c := NewClusterSchema(&http.Client{})
tx := &cluster.Transaction{
ID: "12345",
Type: "the best",
Payload: "mamma-mia-paylodia-belissima",
}
if test.ctx == nil {
test.ctx = context.Background()
}
err := c.AbortTransaction(test.ctx, u.Host, tx)
assert.NotNil(t, err)
if test.expectedErr != nil {
assert.Equal(t, test.expectedErr, err)
}
if test.expectedErrContains != "" {
assert.Contains(t, err.Error(), test.expectedErrContains)
}
})
}
}
func TestCommitTransaction(t *testing.T) {
handler := func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
w.WriteHeader(http.StatusNoContent)
}
server := httptest.NewServer(http.HandlerFunc(handler))
defer server.Close()
u, _ := url.Parse(server.URL)
c := NewClusterSchema(&http.Client{})
tx := &cluster.Transaction{
ID: "am-i-going-to-be-cancelled",
Type: "the worst",
Payload: "",
}
err := c.CommitTransaction(context.Background(), u.Host, tx)
assert.Nil(t, err)
}
func TestCommitTransactionUnhappyPaths(t *testing.T) {
type test struct {
name string
handler http.HandlerFunc
expectedErr error
expectedErrContains string
ctx context.Context
shutdownPrematurely bool
}
expiredCtx, cancel := context.WithCancel(context.Background())
cancel()
tests := []test{
{
name: "arbitrary 500",
handler: func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("nope!"))
},
expectedErrContains: "nope!",
},
{
name: "expired ctx",
ctx: expiredCtx,
handler: func(w http.ResponseWriter, r *http.Request) {
},
expectedErrContains: "context",
},
{
name: "remote server shut down",
shutdownPrematurely: true,
handler: func(w http.ResponseWriter, r *http.Request) {
},
expectedErrContains: "refused",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(test.handler))
if test.shutdownPrematurely {
server.Close()
} else {
defer server.Close()
}
u, _ := url.Parse(server.URL)
c := NewClusterSchema(&http.Client{})
tx := &cluster.Transaction{
ID: "12345",
Type: "the best",
Payload: "mamma-mia-paylodia-belissima",
}
if test.ctx == nil {
test.ctx = context.Background()
}
err := c.CommitTransaction(test.ctx, u.Host, tx)
assert.NotNil(t, err)
if test.expectedErr != nil {
assert.Equal(t, test.expectedErr, err)
}
if test.expectedErrContains != "" {
assert.Contains(t, err.Error(), test.expectedErrContains)
}
})
}
}