KevinStephenson
Adding in weaviate code
b110593
raw
history blame
26.1 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package clients
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"github.com/go-openapi/strfmt"
"github.com/pkg/errors"
"github.com/weaviate/weaviate/adapters/handlers/rest/clusterapi"
"github.com/weaviate/weaviate/entities/additional"
"github.com/weaviate/weaviate/entities/aggregation"
"github.com/weaviate/weaviate/entities/filters"
"github.com/weaviate/weaviate/entities/search"
"github.com/weaviate/weaviate/entities/searchparams"
"github.com/weaviate/weaviate/entities/storobj"
"github.com/weaviate/weaviate/usecases/objects"
"github.com/weaviate/weaviate/usecases/scaler"
)
type RemoteIndex struct {
retryClient
}
func NewRemoteIndex(httpClient *http.Client) *RemoteIndex {
return &RemoteIndex{retryClient: retryClient{
client: httpClient,
retryer: newRetryer(),
}}
}
func (c *RemoteIndex) PutObject(ctx context.Context, hostName, indexName,
shardName string, obj *storobj.Object,
) error {
path := fmt.Sprintf("/indices/%s/shards/%s/objects", indexName, shardName)
method := http.MethodPost
url := url.URL{Scheme: "http", Host: hostName, Path: path}
marshalled, err := clusterapi.IndicesPayloads.SingleObject.Marshal(obj)
if err != nil {
return errors.Wrap(err, "marshal payload")
}
req, err := http.NewRequestWithContext(ctx, method, url.String(),
bytes.NewReader(marshalled))
if err != nil {
return errors.Wrap(err, "open http request")
}
clusterapi.IndicesPayloads.SingleObject.SetContentTypeHeaderReq(req)
res, err := c.client.Do(req)
if err != nil {
return errors.Wrap(err, "send http request")
}
defer res.Body.Close()
if res.StatusCode != http.StatusNoContent {
body, _ := io.ReadAll(res.Body)
return errors.Errorf("unexpected status code %d (%s)", res.StatusCode,
body)
}
return nil
}
func duplicateErr(in error, count int) []error {
out := make([]error, count)
for i := range out {
out[i] = in
}
return out
}
func (c *RemoteIndex) BatchPutObjects(ctx context.Context, hostName, indexName,
shardName string, objs []*storobj.Object, _ *additional.ReplicationProperties,
) []error {
path := fmt.Sprintf("/indices/%s/shards/%s/objects", indexName, shardName)
method := http.MethodPost
url := url.URL{Scheme: "http", Host: hostName, Path: path}
marshalled, err := clusterapi.IndicesPayloads.ObjectList.Marshal(objs)
if err != nil {
return duplicateErr(errors.Wrap(err, "marshal payload"), len(objs))
}
req, err := http.NewRequestWithContext(ctx, method, url.String(),
bytes.NewReader(marshalled))
if err != nil {
return duplicateErr(errors.Wrap(err, "open http request"), len(objs))
}
clusterapi.IndicesPayloads.ObjectList.SetContentTypeHeaderReq(req)
res, err := c.client.Do(req)
if err != nil {
return duplicateErr(errors.Wrap(err, "send http request"), len(objs))
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
body, _ := io.ReadAll(res.Body)
return duplicateErr(errors.Errorf("unexpected status code %d (%s)",
res.StatusCode, body), len(objs))
}
if ct, ok := clusterapi.IndicesPayloads.ErrorList.
CheckContentTypeHeader(res); !ok {
return duplicateErr(errors.Errorf("unexpected content type: %s",
ct), len(objs))
}
resBytes, err := io.ReadAll(res.Body)
if err != nil {
return duplicateErr(errors.Wrap(err, "ready body"), len(objs))
}
return clusterapi.IndicesPayloads.ErrorList.Unmarshal(resBytes)
}
func (c *RemoteIndex) BatchAddReferences(ctx context.Context, hostName, indexName,
shardName string, refs objects.BatchReferences,
) []error {
path := fmt.Sprintf("/indices/%s/shards/%s/references", indexName, shardName)
method := http.MethodPost
url := url.URL{Scheme: "http", Host: hostName, Path: path}
marshalled, err := clusterapi.IndicesPayloads.ReferenceList.Marshal(refs)
if err != nil {
return duplicateErr(errors.Wrap(err, "marshal payload"), len(refs))
}
req, err := http.NewRequestWithContext(ctx, method, url.String(),
bytes.NewReader(marshalled))
if err != nil {
return duplicateErr(errors.Wrap(err, "open http request"), len(refs))
}
clusterapi.IndicesPayloads.ReferenceList.SetContentTypeHeaderReq(req)
res, err := c.client.Do(req)
if err != nil {
return duplicateErr(errors.Wrap(err, "send http request"), len(refs))
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
body, _ := io.ReadAll(res.Body)
return duplicateErr(errors.Errorf("unexpected status code %d (%s)",
res.StatusCode, body), len(refs))
}
if ct, ok := clusterapi.IndicesPayloads.ErrorList.
CheckContentTypeHeader(res); !ok {
return duplicateErr(errors.Errorf("unexpected content type: %s",
ct), len(refs))
}
resBytes, err := io.ReadAll(res.Body)
if err != nil {
return duplicateErr(errors.Wrap(err, "ready body"), len(refs))
}
return clusterapi.IndicesPayloads.ErrorList.Unmarshal(resBytes)
}
func (c *RemoteIndex) GetObject(ctx context.Context, hostName, indexName,
shardName string, id strfmt.UUID, selectProps search.SelectProperties,
additional additional.Properties,
) (*storobj.Object, error) {
selectPropsBytes, err := json.Marshal(selectProps)
if err != nil {
return nil, errors.Wrap(err, "marshal selectProps props")
}
additionalBytes, err := json.Marshal(additional)
if err != nil {
return nil, errors.Wrap(err, "marshal additional props")
}
selectPropsEncoded := base64.StdEncoding.EncodeToString(selectPropsBytes)
additionalEncoded := base64.StdEncoding.EncodeToString(additionalBytes)
path := fmt.Sprintf("/indices/%s/shards/%s/objects/%s", indexName, shardName, id)
method := http.MethodGet
url := url.URL{Scheme: "http", Host: hostName, Path: path}
q := url.Query()
q.Set("additional", additionalEncoded)
q.Set("selectProperties", selectPropsEncoded)
url.RawQuery = q.Encode()
req, err := http.NewRequestWithContext(ctx, method, url.String(), nil)
if err != nil {
return nil, errors.Wrap(err, "open http request")
}
res, err := c.client.Do(req)
if err != nil {
return nil, errors.Wrap(err, "send http request")
}
defer res.Body.Close()
if res.StatusCode == http.StatusNotFound {
// this is a legitimate case - the requested ID doesn't exist, don't try
// to unmarshal anything
return nil, nil
}
if res.StatusCode != http.StatusOK {
body, _ := io.ReadAll(res.Body)
return nil, errors.Errorf("unexpected status code %d (%s)", res.StatusCode,
body)
}
ct, ok := clusterapi.IndicesPayloads.SingleObject.CheckContentTypeHeader(res)
if !ok {
return nil, errors.Errorf("unknown content type %s", ct)
}
objBytes, err := io.ReadAll(res.Body)
if err != nil {
return nil, errors.Wrap(err, "read body")
}
obj, err := clusterapi.IndicesPayloads.SingleObject.Unmarshal(objBytes)
if err != nil {
return nil, errors.Wrap(err, "unmarshal body")
}
return obj, nil
}
func (c *RemoteIndex) Exists(ctx context.Context, hostName, indexName,
shardName string, id strfmt.UUID,
) (bool, error) {
path := fmt.Sprintf("/indices/%s/shards/%s/objects/%s", indexName, shardName, id)
method := http.MethodGet
url := url.URL{Scheme: "http", Host: hostName, Path: path}
q := url.Query()
q.Set("check_exists", "true")
url.RawQuery = q.Encode()
req, err := http.NewRequestWithContext(ctx, method, url.String(), nil)
if err != nil {
return false, errors.Wrap(err, "open http request")
}
res, err := c.client.Do(req)
if err != nil {
return false, errors.Wrap(err, "send http request")
}
defer res.Body.Close()
if res.StatusCode == http.StatusNotFound {
// this is a legitimate case - the requested ID doesn't exist, don't try
// to unmarshal anything
return false, nil
}
if res.StatusCode != http.StatusNoContent {
body, _ := io.ReadAll(res.Body)
return false, errors.Errorf("unexpected status code %d (%s)", res.StatusCode,
body)
}
return true, nil
}
func (c *RemoteIndex) DeleteObject(ctx context.Context, hostName, indexName,
shardName string, id strfmt.UUID,
) error {
path := fmt.Sprintf("/indices/%s/shards/%s/objects/%s", indexName, shardName, id)
method := http.MethodDelete
url := url.URL{Scheme: "http", Host: hostName, Path: path}
req, err := http.NewRequestWithContext(ctx, method, url.String(), nil)
if err != nil {
return errors.Wrap(err, "open http request")
}
res, err := c.client.Do(req)
if err != nil {
return errors.Wrap(err, "send http request")
}
defer res.Body.Close()
if res.StatusCode == http.StatusNotFound {
// this is a legitimate case - the requested ID doesn't exist, don't try
// to unmarshal anything, we can assume it was already deleted
return nil
}
if res.StatusCode != http.StatusNoContent {
body, _ := io.ReadAll(res.Body)
return errors.Errorf("unexpected status code %d (%s)", res.StatusCode,
body)
}
return nil
}
func (c *RemoteIndex) MergeObject(ctx context.Context, hostName, indexName,
shardName string, mergeDoc objects.MergeDocument,
) error {
path := fmt.Sprintf("/indices/%s/shards/%s/objects/%s", indexName, shardName,
mergeDoc.ID)
method := http.MethodPatch
url := url.URL{Scheme: "http", Host: hostName, Path: path}
marshalled, err := clusterapi.IndicesPayloads.MergeDoc.Marshal(mergeDoc)
if err != nil {
return errors.Wrap(err, "marshal payload")
}
req, err := http.NewRequestWithContext(ctx, method, url.String(),
bytes.NewReader(marshalled))
if err != nil {
return errors.Wrap(err, "open http request")
}
clusterapi.IndicesPayloads.MergeDoc.SetContentTypeHeaderReq(req)
res, err := c.client.Do(req)
if err != nil {
return errors.Wrap(err, "send http request")
}
defer res.Body.Close()
if res.StatusCode != http.StatusNoContent {
body, _ := io.ReadAll(res.Body)
return errors.Errorf("unexpected status code %d (%s)", res.StatusCode,
body)
}
return nil
}
func (c *RemoteIndex) MultiGetObjects(ctx context.Context, hostName, indexName,
shardName string, ids []strfmt.UUID,
) ([]*storobj.Object, error) {
idsBytes, err := json.Marshal(ids)
if err != nil {
return nil, errors.Wrap(err, "marshal selectProps props")
}
idsEncoded := base64.StdEncoding.EncodeToString(idsBytes)
path := fmt.Sprintf("/indices/%s/shards/%s/objects", indexName, shardName)
method := http.MethodGet
url := url.URL{Scheme: "http", Host: hostName, Path: path}
q := url.Query()
q.Set("ids", idsEncoded)
url.RawQuery = q.Encode()
req, err := http.NewRequestWithContext(ctx, method, url.String(), nil)
if err != nil {
return nil, errors.Wrap(err, "open http request")
}
res, err := c.client.Do(req)
if err != nil {
return nil, errors.Wrap(err, "send http request")
}
defer res.Body.Close()
if res.StatusCode == http.StatusNotFound {
// this is a legitimate case - the requested ID doesn't exist, don't try
// to unmarshal anything
return nil, nil
}
if res.StatusCode != http.StatusOK {
body, _ := io.ReadAll(res.Body)
return nil, errors.Errorf("unexpected status code %d (%s)", res.StatusCode,
body)
}
ct, ok := clusterapi.IndicesPayloads.ObjectList.CheckContentTypeHeader(res)
if !ok {
return nil, errors.Errorf("unexpected content type: %s", ct)
}
bodyBytes, err := io.ReadAll(res.Body)
if err != nil {
return nil, errors.Wrap(err, "read response body")
}
objs, err := clusterapi.IndicesPayloads.ObjectList.Unmarshal(bodyBytes)
if err != nil {
return nil, errors.Wrap(err, "unmarshal objects")
}
return objs, nil
}
func (c *RemoteIndex) SearchShard(ctx context.Context, host, index, shard string,
vector []float32, limit int,
filters *filters.LocalFilter,
keywordRanking *searchparams.KeywordRanking,
sort []filters.Sort,
cursor *filters.Cursor,
groupBy *searchparams.GroupBy,
additional additional.Properties,
) ([]*storobj.Object, []float32, error) {
// new request
body, err := clusterapi.IndicesPayloads.SearchParams.
Marshal(vector, limit, filters, keywordRanking, sort, cursor, groupBy, additional)
if err != nil {
return nil, nil, fmt.Errorf("marshal request payload: %w", err)
}
url := url.URL{
Scheme: "http",
Host: host,
Path: fmt.Sprintf("/indices/%s/shards/%s/objects/_search", index, shard),
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url.String(), bytes.NewReader(body))
if err != nil {
return nil, nil, fmt.Errorf("create http request: %w", err)
}
clusterapi.IndicesPayloads.SearchParams.SetContentTypeHeaderReq(req)
// send request
resp := &searchShardResp{}
err = c.doWithCustomMarshaller(c.timeoutUnit*20, req, body, resp.decode)
return resp.Objects, resp.Distributions, err
}
type searchShardResp struct {
Objects []*storobj.Object
Distributions []float32
}
func (r *searchShardResp) decode(data []byte) (err error) {
r.Objects, r.Distributions, err = clusterapi.IndicesPayloads.SearchResults.Unmarshal(data)
return
}
type aggregateResp struct {
Result *aggregation.Result
}
func (r *aggregateResp) decode(data []byte) (err error) {
r.Result, err = clusterapi.IndicesPayloads.AggregationResult.Unmarshal(data)
return
}
func (c *RemoteIndex) Aggregate(ctx context.Context, hostName, index,
shard string, params aggregation.Params,
) (*aggregation.Result, error) {
// create new request
body, err := clusterapi.IndicesPayloads.AggregationParams.Marshal(params)
if err != nil {
return nil, fmt.Errorf("marshal request payload: %w", err)
}
url := &url.URL{
Scheme: "http",
Host: hostName,
Path: fmt.Sprintf("/indices/%s/shards/%s/objects/_aggregations", index, shard),
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url.String(), bytes.NewReader(body))
if err != nil {
return nil, fmt.Errorf("create http request: %w", err)
}
clusterapi.IndicesPayloads.AggregationParams.SetContentTypeHeaderReq(req)
// send request
resp := &aggregateResp{}
err = c.doWithCustomMarshaller(c.timeoutUnit*20, req, body, resp.decode)
return resp.Result, err
}
func (c *RemoteIndex) FindUUIDs(ctx context.Context, hostName, indexName,
shardName string, filters *filters.LocalFilter,
) ([]strfmt.UUID, error) {
paramsBytes, err := clusterapi.IndicesPayloads.FindUUIDsParams.Marshal(filters)
if err != nil {
return nil, errors.Wrap(err, "marshal request payload")
}
path := fmt.Sprintf("/indices/%s/shards/%s/objects/_find", indexName, shardName)
method := http.MethodPost
url := url.URL{Scheme: "http", Host: hostName, Path: path}
req, err := http.NewRequestWithContext(ctx, method, url.String(),
bytes.NewReader(paramsBytes))
if err != nil {
return nil, errors.Wrap(err, "open http request")
}
clusterapi.IndicesPayloads.FindUUIDsParams.SetContentTypeHeaderReq(req)
res, err := c.client.Do(req)
if err != nil {
return nil, errors.Wrap(err, "send http request")
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
body, _ := io.ReadAll(res.Body)
return nil, errors.Errorf("unexpected status code %d (%s)", res.StatusCode,
body)
}
resBytes, err := io.ReadAll(res.Body)
if err != nil {
return nil, errors.Wrap(err, "read body")
}
ct, ok := clusterapi.IndicesPayloads.FindUUIDsResults.CheckContentTypeHeader(res)
if !ok {
return nil, errors.Errorf("unexpected content type: %s", ct)
}
uuids, err := clusterapi.IndicesPayloads.FindUUIDsResults.Unmarshal(resBytes)
if err != nil {
return nil, errors.Wrap(err, "unmarshal body")
}
return uuids, nil
}
func (c *RemoteIndex) DeleteObjectBatch(ctx context.Context, hostName, indexName, shardName string,
uuids []strfmt.UUID, dryRun bool,
) objects.BatchSimpleObjects {
path := fmt.Sprintf("/indices/%s/shards/%s/objects", indexName, shardName)
method := http.MethodDelete
url := url.URL{Scheme: "http", Host: hostName, Path: path}
marshalled, err := clusterapi.IndicesPayloads.BatchDeleteParams.Marshal(uuids, dryRun)
if err != nil {
err := errors.Wrap(err, "marshal payload")
return objects.BatchSimpleObjects{objects.BatchSimpleObject{Err: err}}
}
req, err := http.NewRequestWithContext(ctx, method, url.String(),
bytes.NewReader(marshalled))
if err != nil {
err := errors.Wrap(err, "open http request")
return objects.BatchSimpleObjects{objects.BatchSimpleObject{Err: err}}
}
clusterapi.IndicesPayloads.BatchDeleteParams.SetContentTypeHeaderReq(req)
res, err := c.client.Do(req)
if err != nil {
err := errors.Wrap(err, "send http request")
return objects.BatchSimpleObjects{objects.BatchSimpleObject{Err: err}}
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
body, _ := io.ReadAll(res.Body)
err := errors.Errorf("unexpected status code %d (%s)", res.StatusCode, body)
return objects.BatchSimpleObjects{objects.BatchSimpleObject{Err: err}}
}
if ct, ok := clusterapi.IndicesPayloads.BatchDeleteResults.
CheckContentTypeHeader(res); !ok {
err := errors.Errorf("unexpected content type: %s", ct)
return objects.BatchSimpleObjects{objects.BatchSimpleObject{Err: err}}
}
resBytes, err := io.ReadAll(res.Body)
if err != nil {
err := errors.Wrap(err, "ready body")
return objects.BatchSimpleObjects{objects.BatchSimpleObject{Err: err}}
}
batchDeleteResults, err := clusterapi.IndicesPayloads.BatchDeleteResults.Unmarshal(resBytes)
if err != nil {
err := errors.Wrap(err, "unmarshal body")
return objects.BatchSimpleObjects{objects.BatchSimpleObject{Err: err}}
}
return batchDeleteResults
}
func (c *RemoteIndex) GetShardQueueSize(ctx context.Context,
hostName, indexName, shardName string,
) (int64, error) {
path := fmt.Sprintf("/indices/%s/shards/%s/queuesize", indexName, shardName)
method := http.MethodGet
url := url.URL{Scheme: "http", Host: hostName, Path: path}
req, err := http.NewRequestWithContext(ctx, method, url.String(), nil)
if err != nil {
return 0, errors.Wrap(err, "open http request")
}
var size int64
clusterapi.IndicesPayloads.GetShardQueueSizeParams.SetContentTypeHeaderReq(req)
try := func(ctx context.Context) (bool, error) {
res, err := c.client.Do(req)
if err != nil {
return ctx.Err() == nil, fmt.Errorf("connect: %w", err)
}
defer res.Body.Close()
if code := res.StatusCode; code != http.StatusOK {
body, _ := io.ReadAll(res.Body)
return shouldRetry(code), fmt.Errorf("status code: %v body: (%s)", code, body)
}
resBytes, err := io.ReadAll(res.Body)
if err != nil {
return false, errors.Wrap(err, "read body")
}
ct, ok := clusterapi.IndicesPayloads.GetShardQueueSizeResults.CheckContentTypeHeader(res)
if !ok {
return false, errors.Errorf("unexpected content type: %s", ct)
}
size, err = clusterapi.IndicesPayloads.GetShardQueueSizeResults.Unmarshal(resBytes)
if err != nil {
return false, errors.Wrap(err, "unmarshal body")
}
return false, nil
}
return size, c.retry(ctx, 9, try)
}
func (c *RemoteIndex) GetShardStatus(ctx context.Context,
hostName, indexName, shardName string,
) (string, error) {
path := fmt.Sprintf("/indices/%s/shards/%s/status", indexName, shardName)
method := http.MethodGet
url := url.URL{Scheme: "http", Host: hostName, Path: path}
req, err := http.NewRequestWithContext(ctx, method, url.String(), nil)
if err != nil {
return "", errors.Wrap(err, "open http request")
}
var status string
clusterapi.IndicesPayloads.GetShardStatusParams.SetContentTypeHeaderReq(req)
try := func(ctx context.Context) (bool, error) {
res, err := c.client.Do(req)
if err != nil {
return ctx.Err() == nil, fmt.Errorf("connect: %w", err)
}
defer res.Body.Close()
if code := res.StatusCode; code != http.StatusOK {
body, _ := io.ReadAll(res.Body)
return shouldRetry(code), fmt.Errorf("status code: %v body: (%s)", code, body)
}
resBytes, err := io.ReadAll(res.Body)
if err != nil {
return false, errors.Wrap(err, "read body")
}
ct, ok := clusterapi.IndicesPayloads.GetShardStatusResults.CheckContentTypeHeader(res)
if !ok {
return false, errors.Errorf("unexpected content type: %s", ct)
}
status, err = clusterapi.IndicesPayloads.GetShardStatusResults.Unmarshal(resBytes)
if err != nil {
return false, errors.Wrap(err, "unmarshal body")
}
return false, nil
}
return status, c.retry(ctx, 9, try)
}
func (c *RemoteIndex) UpdateShardStatus(ctx context.Context, hostName, indexName, shardName,
targetStatus string,
) error {
paramsBytes, err := clusterapi.IndicesPayloads.UpdateShardStatusParams.Marshal(targetStatus)
if err != nil {
return errors.Wrap(err, "marshal request payload")
}
path := fmt.Sprintf("/indices/%s/shards/%s/status", indexName, shardName)
method := http.MethodPost
url := url.URL{Scheme: "http", Host: hostName, Path: path}
try := func(ctx context.Context) (bool, error) {
req, err := http.NewRequestWithContext(ctx, method, url.String(),
bytes.NewReader(paramsBytes))
if err != nil {
return false, fmt.Errorf("create http request: %w", err)
}
clusterapi.IndicesPayloads.UpdateShardStatusParams.SetContentTypeHeaderReq(req)
res, err := c.client.Do(req)
if err != nil {
return ctx.Err() == nil, fmt.Errorf("connect: %w", err)
}
defer res.Body.Close()
if code := res.StatusCode; code != http.StatusOK {
body, _ := io.ReadAll(res.Body)
return shouldRetry(code), fmt.Errorf("status code: %v body: (%s)", code, body)
}
return false, nil
}
return c.retry(ctx, 9, try)
}
func (c *RemoteIndex) PutFile(ctx context.Context, hostName, indexName,
shardName, fileName string, payload io.ReadSeekCloser,
) error {
defer payload.Close()
path := fmt.Sprintf("/indices/%s/shards/%s/files/%s",
indexName, shardName, fileName)
method := http.MethodPost
url := url.URL{Scheme: "http", Host: hostName, Path: path}
try := func(ctx context.Context) (bool, error) {
req, err := http.NewRequestWithContext(ctx, method, url.String(), payload)
if err != nil {
return false, fmt.Errorf("create http request: %w", err)
}
clusterapi.IndicesPayloads.ShardFiles.SetContentTypeHeaderReq(req)
res, err := c.client.Do(req)
if err != nil {
return ctx.Err() == nil, fmt.Errorf("connect: %w", err)
}
defer res.Body.Close()
if code := res.StatusCode; code != http.StatusNoContent {
shouldRetry := shouldRetry(code)
if shouldRetry {
_, err := payload.Seek(0, 0)
shouldRetry = (err == nil)
}
body, _ := io.ReadAll(res.Body)
return shouldRetry, fmt.Errorf("status code: %v body: (%s)", code, body)
}
return false, nil
}
return c.retry(ctx, 12, try)
}
func (c *RemoteIndex) CreateShard(ctx context.Context,
hostName, indexName, shardName string,
) error {
path := fmt.Sprintf("/indices/%s/shards/%s", indexName, shardName)
method := http.MethodPost
url := url.URL{Scheme: "http", Host: hostName, Path: path}
req, err := http.NewRequestWithContext(ctx, method, url.String(), nil)
if err != nil {
return fmt.Errorf("create http request: %w", err)
}
try := func(ctx context.Context) (bool, error) {
res, err := c.client.Do(req)
if err != nil {
return ctx.Err() == nil, fmt.Errorf("connect: %w", err)
}
defer res.Body.Close()
if code := res.StatusCode; code != http.StatusCreated {
body, _ := io.ReadAll(res.Body)
return shouldRetry(code), fmt.Errorf("status code: %v body: (%s)", code, body)
}
return false, nil
}
return c.retry(ctx, 9, try)
}
func (c *RemoteIndex) ReInitShard(ctx context.Context,
hostName, indexName, shardName string,
) error {
path := fmt.Sprintf("/indices/%s/shards/%s:reinit", indexName, shardName)
method := http.MethodPut
url := url.URL{Scheme: "http", Host: hostName, Path: path}
req, err := http.NewRequestWithContext(ctx, method, url.String(), nil)
if err != nil {
return fmt.Errorf("create http request: %w", err)
}
try := func(ctx context.Context) (bool, error) {
res, err := c.client.Do(req)
if err != nil {
return ctx.Err() == nil, fmt.Errorf("connect: %w", err)
}
defer res.Body.Close()
if code := res.StatusCode; code != http.StatusNoContent {
body, _ := io.ReadAll(res.Body)
return shouldRetry(code), fmt.Errorf("status code: %v body: (%s)", code, body)
}
return false, nil
}
return c.retry(ctx, 9, try)
}
func (c *RemoteIndex) IncreaseReplicationFactor(ctx context.Context,
hostName, indexName string, dist scaler.ShardDist,
) error {
path := fmt.Sprintf("/replicas/indices/%s/replication-factor:increase", indexName)
method := http.MethodPut
url := url.URL{Scheme: "http", Host: hostName, Path: path}
body, err := clusterapi.IndicesPayloads.IncreaseReplicationFactor.Marshall(dist)
if err != nil {
return err
}
try := func(ctx context.Context) (bool, error) {
req, err := http.NewRequestWithContext(ctx, method, url.String(), bytes.NewReader(body))
if err != nil {
return false, fmt.Errorf("create http request: %w", err)
}
res, err := c.client.Do(req)
if err != nil {
return ctx.Err() == nil, fmt.Errorf("connect: %w", err)
}
defer res.Body.Close()
if code := res.StatusCode; code != http.StatusNoContent {
body, _ := io.ReadAll(res.Body)
return shouldRetry(code), fmt.Errorf("status code: %v body: (%s)", code, body)
}
return false, nil
}
return c.retry(ctx, 34, try)
}