Spaces:
Running
Running
// _ _ | |
// __ _____ __ ___ ___ __ _| |_ ___ | |
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
// \ V V / __/ (_| |\ V /| | (_| | || __/ | |
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
// | |
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. | |
// | |
// CONTACT: [email protected] | |
// | |
// _ _ | |
// | |
// __ _____ __ ___ ___ __ _| |_ ___ | |
// | |
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
// \ V V / __/ (_| |\ V /| | (_| | || __/ | |
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
// | |
// Copyright © 2016 - 2022 SeMI Technologies B.V. All rights reserved. | |
// | |
// CONTACT: [email protected] | |
package clients | |
import ( | |
"context" | |
"fmt" | |
"io" | |
"net/http" | |
"net/http/httptest" | |
"strings" | |
"testing" | |
"time" | |
"github.com/stretchr/testify/assert" | |
"github.com/weaviate/weaviate/adapters/handlers/rest/clusterapi" | |
) | |
func TestRemoteIndexIncreaseRF(t *testing.T) { | |
t.Parallel() | |
ctx := context.Background() | |
path := "/replicas/indices/C1/replication-factor:increase" | |
fs := newFakeRemoteIndexServer(t, http.MethodPut, path) | |
ts := fs.server(t) | |
defer ts.Close() | |
client := newRemoteIndex(ts.Client()) | |
t.Run("ConnectionError", func(t *testing.T) { | |
err := client.IncreaseReplicationFactor(ctx, "", "C1", nil) | |
assert.NotNil(t, err) | |
assert.Contains(t, err.Error(), "connect") | |
}) | |
n := 0 | |
fs.doAfter = func(w http.ResponseWriter, r *http.Request) { | |
if n == 0 { | |
w.WriteHeader(http.StatusInternalServerError) | |
} else if n == 1 { | |
w.WriteHeader(http.StatusTooManyRequests) | |
} else { | |
w.WriteHeader(http.StatusNoContent) | |
} | |
n++ | |
} | |
t.Run("Success", func(t *testing.T) { | |
err := client.IncreaseReplicationFactor(ctx, fs.host, "C1", nil) | |
assert.Nil(t, err) | |
}) | |
} | |
func TestRemoteIndexReInitShardIn(t *testing.T) { | |
t.Parallel() | |
ctx := context.Background() | |
path := "/indices/C1/shards/S1:reinit" | |
fs := newFakeRemoteIndexServer(t, http.MethodPut, path) | |
ts := fs.server(t) | |
defer ts.Close() | |
client := newRemoteIndex(ts.Client()) | |
t.Run("ConnectionError", func(t *testing.T) { | |
err := client.ReInitShard(ctx, "", "C1", "S1") | |
assert.NotNil(t, err) | |
assert.Contains(t, err.Error(), "connect") | |
}) | |
n := 0 | |
fs.doAfter = func(w http.ResponseWriter, r *http.Request) { | |
if n == 0 { | |
w.WriteHeader(http.StatusInternalServerError) | |
} else if n == 1 { | |
w.WriteHeader(http.StatusTooManyRequests) | |
} else { | |
w.WriteHeader(http.StatusNoContent) | |
} | |
n++ | |
} | |
t.Run("Success", func(t *testing.T) { | |
err := client.ReInitShard(ctx, fs.host, "C1", "S1") | |
assert.Nil(t, err) | |
}) | |
} | |
func TestRemoteIndexCreateShard(t *testing.T) { | |
t.Parallel() | |
ctx := context.Background() | |
path := "/indices/C1/shards/S1" | |
fs := newFakeRemoteIndexServer(t, http.MethodPost, path) | |
ts := fs.server(t) | |
defer ts.Close() | |
client := newRemoteIndex(ts.Client()) | |
t.Run("ConnectionError", func(t *testing.T) { | |
err := client.CreateShard(ctx, "", "C1", "S1") | |
assert.NotNil(t, err) | |
assert.Contains(t, err.Error(), "connect") | |
}) | |
n := 0 | |
fs.doAfter = func(w http.ResponseWriter, r *http.Request) { | |
if n == 0 { | |
w.WriteHeader(http.StatusInternalServerError) | |
} else if n == 1 { | |
w.WriteHeader(http.StatusTooManyRequests) | |
} else { | |
w.WriteHeader(http.StatusCreated) | |
} | |
n++ | |
} | |
t.Run("Success", func(t *testing.T) { | |
err := client.CreateShard(ctx, fs.host, "C1", "S1") | |
assert.Nil(t, err) | |
}) | |
} | |
func TestRemoteIndexUpdateShardStatus(t *testing.T) { | |
t.Parallel() | |
ctx := context.Background() | |
path := "/indices/C1/shards/S1/status" | |
fs := newFakeRemoteIndexServer(t, http.MethodPost, path) | |
ts := fs.server(t) | |
defer ts.Close() | |
client := newRemoteIndex(ts.Client()) | |
t.Run("ConnectionError", func(t *testing.T) { | |
err := client.UpdateShardStatus(ctx, "", "C1", "S1", "NewStatus") | |
assert.NotNil(t, err) | |
assert.Contains(t, err.Error(), "connect") | |
}) | |
n := 0 | |
fs.doAfter = func(w http.ResponseWriter, r *http.Request) { | |
if n == 0 { | |
w.WriteHeader(http.StatusInternalServerError) | |
} else if n == 1 { | |
w.WriteHeader(http.StatusTooManyRequests) | |
} | |
n++ | |
} | |
t.Run("Success", func(t *testing.T) { | |
err := client.UpdateShardStatus(ctx, fs.host, "C1", "S1", "NewStatus") | |
assert.Nil(t, err) | |
}) | |
} | |
func TestRemoteIndexShardStatus(t *testing.T) { | |
t.Parallel() | |
var ( | |
ctx = context.Background() | |
path = "/indices/C1/shards/S1/status" | |
fs = newFakeRemoteIndexServer(t, http.MethodGet, path) | |
Status = "READONLY" | |
) | |
ts := fs.server(t) | |
defer ts.Close() | |
client := newRemoteIndex(ts.Client()) | |
t.Run("ConnectionError", func(t *testing.T) { | |
_, err := client.GetShardStatus(ctx, "", "C1", "S1") | |
assert.NotNil(t, err) | |
assert.Contains(t, err.Error(), "connect") | |
}) | |
n := 0 | |
fs.doAfter = func(w http.ResponseWriter, r *http.Request) { | |
if n == 0 { | |
w.WriteHeader(http.StatusInternalServerError) | |
} else if n == 1 { | |
w.WriteHeader(http.StatusTooManyRequests) | |
} else if n == 2 { | |
w.Header().Set("content-type", "any") | |
} else if n == 3 { | |
clusterapi.IndicesPayloads.GetShardStatusResults.SetContentTypeHeader(w) | |
} else { | |
clusterapi.IndicesPayloads.GetShardStatusResults.SetContentTypeHeader(w) | |
bytes, _ := clusterapi.IndicesPayloads.GetShardStatusResults.Marshal(Status) | |
w.Write(bytes) | |
} | |
n++ | |
} | |
t.Run("ContentType", func(t *testing.T) { | |
_, err := client.GetShardStatus(ctx, fs.host, "C1", "S1") | |
assert.NotNil(t, err) | |
}) | |
t.Run("Status", func(t *testing.T) { | |
_, err := client.GetShardStatus(ctx, fs.host, "C1", "S1") | |
assert.NotNil(t, err) | |
}) | |
t.Run("Success", func(t *testing.T) { | |
st, err := client.GetShardStatus(ctx, fs.host, "C1", "S1") | |
assert.Nil(t, err) | |
assert.Equal(t, "READONLY", st) | |
}) | |
} | |
func TestRemoteIndexPutFile(t *testing.T) { | |
t.Parallel() | |
var ( | |
ctx = context.Background() | |
path = "/indices/C1/shards/S1/files/file1" | |
fs = newFakeRemoteIndexServer(t, http.MethodPost, path) | |
) | |
ts := fs.server(t) | |
defer ts.Close() | |
client := newRemoteIndex(ts.Client()) | |
rsc := struct { | |
*strings.Reader | |
io.Closer | |
}{ | |
strings.NewReader("hello, world"), | |
io.NopCloser(nil), | |
} | |
t.Run("ConnectionError", func(t *testing.T) { | |
err := client.PutFile(ctx, "", "C1", "S1", "file1", rsc) | |
assert.NotNil(t, err) | |
assert.Contains(t, err.Error(), "connect") | |
}) | |
n := 0 | |
fs.doAfter = func(w http.ResponseWriter, r *http.Request) { | |
if n == 0 { | |
w.WriteHeader(http.StatusInternalServerError) | |
} else if n == 1 { | |
w.WriteHeader(http.StatusTooManyRequests) | |
} else { | |
w.WriteHeader(http.StatusNoContent) | |
} | |
n++ | |
} | |
t.Run("Success", func(t *testing.T) { | |
err := client.PutFile(ctx, fs.host, "C1", "S1", "file1", rsc) | |
assert.Nil(t, err) | |
}) | |
} | |
func newRemoteIndex(httpClient *http.Client) *RemoteIndex { | |
ri := NewRemoteIndex(httpClient) | |
ri.minBackOff = time.Millisecond * 1 | |
ri.maxBackOff = time.Millisecond * 10 | |
ri.timeoutUnit = time.Millisecond * 20 | |
return ri | |
} | |
type fakeRemoteIndexServer struct { | |
method string | |
path string | |
host string | |
doBefore func(w http.ResponseWriter, r *http.Request) error | |
doAfter func(w http.ResponseWriter, r *http.Request) | |
} | |
func newFakeRemoteIndexServer(t *testing.T, method, path string) *fakeRemoteIndexServer { | |
f := &fakeRemoteIndexServer{ | |
method: method, | |
path: path, | |
} | |
f.doBefore = func(w http.ResponseWriter, r *http.Request) error { | |
if r.Method != f.method { | |
w.WriteHeader(http.StatusBadRequest) | |
return fmt.Errorf("method want %s got %s", method, r.Method) | |
} | |
if f.path != r.URL.Path { | |
w.WriteHeader(http.StatusBadRequest) | |
return fmt.Errorf("path want %s got %s", path, r.URL.Path) | |
} | |
return nil | |
} | |
return f | |
} | |
func (f *fakeRemoteIndexServer) server(t *testing.T) *httptest.Server { | |
if f.doBefore == nil { | |
f.doBefore = func(w http.ResponseWriter, r *http.Request) error { | |
if r.Method != f.method { | |
w.WriteHeader(http.StatusBadRequest) | |
return fmt.Errorf("method want %s got %s", f.method, r.Method) | |
} | |
if f.path != r.URL.Path { | |
w.WriteHeader(http.StatusBadRequest) | |
return fmt.Errorf("path want %s got %s", f.path, r.URL.Path) | |
} | |
return nil | |
} | |
} | |
handler := func(w http.ResponseWriter, r *http.Request) { | |
if err := f.doBefore(w, r); err != nil { | |
t.Error(err) | |
return | |
} | |
if f.doAfter != nil { | |
f.doAfter(w, r) | |
} | |
} | |
serv := httptest.NewServer(http.HandlerFunc(handler)) | |
f.host = serv.URL[7:] | |
return serv | |
} | |