Spaces:
Running
Running
// _ _ | |
// __ _____ __ ___ ___ __ _| |_ ___ | |
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
// \ V V / __/ (_| |\ V /| | (_| | || __/ | |
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
// | |
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. | |
// | |
// CONTACT: [email protected] | |
// | |
package clients | |
import ( | |
"bytes" | |
"context" | |
"encoding/base64" | |
"encoding/json" | |
"fmt" | |
"io" | |
"math/rand" | |
"net/http" | |
"net/url" | |
"time" | |
"github.com/go-openapi/strfmt" | |
"github.com/weaviate/weaviate/adapters/handlers/rest/clusterapi" | |
"github.com/weaviate/weaviate/entities/additional" | |
"github.com/weaviate/weaviate/entities/search" | |
"github.com/weaviate/weaviate/entities/storobj" | |
"github.com/weaviate/weaviate/usecases/objects" | |
"github.com/weaviate/weaviate/usecases/replica" | |
) | |
// ReplicationClient is to coordinate operations among replicas | |
type replicationClient retryClient | |
func NewReplicationClient(httpClient *http.Client) replica.Client { | |
return &replicationClient{ | |
client: httpClient, | |
retryer: newRetryer(), | |
} | |
} | |
// FetchObject fetches one object it exits | |
func (c *replicationClient) FetchObject(ctx context.Context, host, index, | |
shard string, id strfmt.UUID, selectProps search.SelectProperties, | |
additional additional.Properties, | |
) (objects.Replica, error) { | |
resp := objects.Replica{} | |
req, err := newHttpReplicaRequest(ctx, http.MethodGet, host, index, shard, "", id.String(), nil) | |
if err != nil { | |
return resp, fmt.Errorf("create http request: %w", err) | |
} | |
err = c.doCustomUnmarshal(c.timeoutUnit*20, req, nil, resp.UnmarshalBinary) | |
return resp, err | |
} | |
func (c *replicationClient) DigestObjects(ctx context.Context, | |
host, index, shard string, ids []strfmt.UUID, | |
) (result []replica.RepairResponse, err error) { | |
var resp []replica.RepairResponse | |
body, err := json.Marshal(ids) | |
if err != nil { | |
return nil, fmt.Errorf("marshal digest objects input: %w", err) | |
} | |
req, err := newHttpReplicaRequest( | |
ctx, http.MethodGet, host, index, shard, | |
"", "_digest", bytes.NewReader(body)) | |
if err != nil { | |
return resp, fmt.Errorf("create http request: %w", err) | |
} | |
err = c.do(c.timeoutUnit*20, req, body, &resp) | |
return resp, err | |
} | |
func (c *replicationClient) OverwriteObjects(ctx context.Context, | |
host, index, shard string, vobjects []*objects.VObject, | |
) ([]replica.RepairResponse, error) { | |
var resp []replica.RepairResponse | |
body, err := clusterapi.IndicesPayloads.VersionedObjectList.Marshal(vobjects) | |
if err != nil { | |
return nil, fmt.Errorf("encode request: %w", err) | |
} | |
req, err := newHttpReplicaRequest( | |
ctx, http.MethodPut, host, index, shard, | |
"", "_overwrite", bytes.NewReader(body)) | |
if err != nil { | |
return resp, fmt.Errorf("create http request: %w", err) | |
} | |
err = c.do(c.timeoutUnit*90, req, body, &resp) | |
return resp, err | |
} | |
func (c *replicationClient) FetchObjects(ctx context.Context, host, | |
index, shard string, ids []strfmt.UUID, | |
) ([]objects.Replica, error) { | |
resp := make(objects.Replicas, len(ids)) | |
idsBytes, err := json.Marshal(ids) | |
if err != nil { | |
return nil, fmt.Errorf("marshal ids: %w", err) | |
} | |
idsEncoded := base64.StdEncoding.EncodeToString(idsBytes) | |
req, err := newHttpReplicaRequest(ctx, http.MethodGet, host, index, shard, "", "", nil) | |
if err != nil { | |
return nil, fmt.Errorf("create http request: %w", err) | |
} | |
req.URL.RawQuery = url.Values{"ids": []string{idsEncoded}}.Encode() | |
err = c.doCustomUnmarshal(c.timeoutUnit*90, req, nil, resp.UnmarshalBinary) | |
return resp, err | |
} | |
func (c *replicationClient) PutObject(ctx context.Context, host, index, | |
shard, requestID string, obj *storobj.Object, | |
) (replica.SimpleResponse, error) { | |
var resp replica.SimpleResponse | |
body, err := clusterapi.IndicesPayloads.SingleObject.Marshal(obj) | |
if err != nil { | |
return resp, fmt.Errorf("encode request: %w", err) | |
} | |
req, err := newHttpReplicaRequest(ctx, http.MethodPost, host, index, shard, requestID, "", nil) | |
if err != nil { | |
return resp, fmt.Errorf("create http request: %w", err) | |
} | |
clusterapi.IndicesPayloads.SingleObject.SetContentTypeHeaderReq(req) | |
err = c.do(c.timeoutUnit*90, req, body, &resp) | |
return resp, err | |
} | |
func (c *replicationClient) DeleteObject(ctx context.Context, host, index, | |
shard, requestID string, uuid strfmt.UUID, | |
) (replica.SimpleResponse, error) { | |
var resp replica.SimpleResponse | |
req, err := newHttpReplicaRequest(ctx, http.MethodDelete, host, index, shard, requestID, uuid.String(), nil) | |
if err != nil { | |
return resp, fmt.Errorf("create http request: %w", err) | |
} | |
err = c.do(c.timeoutUnit*90, req, nil, &resp) | |
return resp, err | |
} | |
func (c *replicationClient) PutObjects(ctx context.Context, host, index, | |
shard, requestID string, objects []*storobj.Object, | |
) (replica.SimpleResponse, error) { | |
var resp replica.SimpleResponse | |
body, err := clusterapi.IndicesPayloads.ObjectList.Marshal(objects) | |
if err != nil { | |
return resp, fmt.Errorf("encode request: %w", err) | |
} | |
req, err := newHttpReplicaRequest(ctx, http.MethodPost, host, index, shard, requestID, "", nil) | |
if err != nil { | |
return resp, fmt.Errorf("create http request: %w", err) | |
} | |
clusterapi.IndicesPayloads.ObjectList.SetContentTypeHeaderReq(req) | |
err = c.do(c.timeoutUnit*90, req, body, &resp) | |
return resp, err | |
} | |
func (c *replicationClient) MergeObject(ctx context.Context, host, index, shard, requestID string, | |
doc *objects.MergeDocument, | |
) (replica.SimpleResponse, error) { | |
var resp replica.SimpleResponse | |
body, err := clusterapi.IndicesPayloads.MergeDoc.Marshal(*doc) | |
if err != nil { | |
return resp, fmt.Errorf("encode request: %w", err) | |
} | |
req, err := newHttpReplicaRequest(ctx, http.MethodPatch, host, index, shard, | |
requestID, doc.ID.String(), nil) | |
if err != nil { | |
return resp, fmt.Errorf("create http request: %w", err) | |
} | |
clusterapi.IndicesPayloads.MergeDoc.SetContentTypeHeaderReq(req) | |
err = c.do(c.timeoutUnit*90, req, body, &resp) | |
return resp, err | |
} | |
func (c *replicationClient) AddReferences(ctx context.Context, host, index, | |
shard, requestID string, refs []objects.BatchReference, | |
) (replica.SimpleResponse, error) { | |
var resp replica.SimpleResponse | |
body, err := clusterapi.IndicesPayloads.ReferenceList.Marshal(refs) | |
if err != nil { | |
return resp, fmt.Errorf("encode request: %w", err) | |
} | |
req, err := newHttpReplicaRequest(ctx, http.MethodPost, host, index, shard, | |
requestID, "references", nil) | |
if err != nil { | |
return resp, fmt.Errorf("create http request: %w", err) | |
} | |
clusterapi.IndicesPayloads.ReferenceList.SetContentTypeHeaderReq(req) | |
err = c.do(c.timeoutUnit*90, req, body, &resp) | |
return resp, err | |
} | |
func (c *replicationClient) DeleteObjects(ctx context.Context, host, index, shard, requestID string, | |
uuids []strfmt.UUID, dryRun bool, | |
) (resp replica.SimpleResponse, err error) { | |
body, err := clusterapi.IndicesPayloads.BatchDeleteParams.Marshal(uuids, dryRun) | |
if err != nil { | |
return resp, fmt.Errorf("encode request: %w", err) | |
} | |
req, err := newHttpReplicaRequest(ctx, http.MethodDelete, host, index, shard, requestID, "", nil) | |
if err != nil { | |
return resp, fmt.Errorf("create http request: %w", err) | |
} | |
clusterapi.IndicesPayloads.BatchDeleteParams.SetContentTypeHeaderReq(req) | |
err = c.do(c.timeoutUnit*90, req, body, &resp) | |
return resp, err | |
} | |
// Commit asks a host to commit and stores the response in the value pointed to by resp | |
func (c *replicationClient) Commit(ctx context.Context, host, index, shard string, requestID string, resp interface{}) error { | |
req, err := newHttpReplicaCMD(host, "commit", index, shard, requestID, nil) | |
if err != nil { | |
return fmt.Errorf("create http request: %w", err) | |
} | |
return c.do(c.timeoutUnit*90, req, nil, resp) | |
} | |
func (c *replicationClient) Abort(ctx context.Context, host, index, shard, requestID string) ( | |
resp replica.SimpleResponse, err error, | |
) { | |
req, err := newHttpReplicaCMD(host, "abort", index, shard, requestID, nil) | |
if err != nil { | |
return resp, fmt.Errorf("create http request: %w", err) | |
} | |
err = c.do(c.timeoutUnit*5, req, nil, &resp) | |
return resp, err | |
} | |
func newHttpReplicaRequest(ctx context.Context, method, host, index, shard, requestId, suffix string, body io.Reader) (*http.Request, error) { | |
path := fmt.Sprintf("/replicas/indices/%s/shards/%s/objects", index, shard) | |
if suffix != "" { | |
path = fmt.Sprintf("%s/%s", path, suffix) | |
} | |
u := url.URL{ | |
Scheme: "http", | |
Host: host, | |
Path: path, | |
} | |
if requestId != "" { | |
u.RawQuery = url.Values{replica.RequestKey: []string{requestId}}.Encode() | |
} | |
return http.NewRequestWithContext(ctx, method, u.String(), body) | |
} | |
func newHttpReplicaCMD(host, cmd, index, shard, requestId string, body io.Reader) (*http.Request, error) { | |
path := fmt.Sprintf("/replicas/indices/%s/shards/%s:%s", index, shard, cmd) | |
q := url.Values{replica.RequestKey: []string{requestId}}.Encode() | |
url := url.URL{Scheme: "http", Host: host, Path: path, RawQuery: q} | |
return http.NewRequest(http.MethodPost, url.String(), body) | |
} | |
func (c *replicationClient) do(timeout time.Duration, req *http.Request, body []byte, resp interface{}) (err error) { | |
ctx, cancel := context.WithTimeout(req.Context(), timeout) | |
defer cancel() | |
try := func(ctx context.Context) (bool, error) { | |
if body != nil { | |
req.Body = io.NopCloser(bytes.NewReader(body)) | |
} | |
res, err := c.client.Do(req) | |
if err != nil { | |
return ctx.Err() == nil, fmt.Errorf("connect: %w", err) | |
} | |
defer res.Body.Close() | |
if code := res.StatusCode; code != http.StatusOK { | |
b, _ := io.ReadAll(res.Body) | |
return shouldRetry(code), fmt.Errorf("status code: %v, error: %s", code, b) | |
} | |
if err := json.NewDecoder(res.Body).Decode(resp); err != nil { | |
return false, fmt.Errorf("decode response: %w", err) | |
} | |
return false, nil | |
} | |
return c.retry(ctx, 9, try) | |
} | |
func (c *replicationClient) doCustomUnmarshal(timeout time.Duration, | |
req *http.Request, body []byte, decode func([]byte) error, | |
) (err error) { | |
return (*retryClient)(c).doWithCustomMarshaller(timeout, req, body, decode) | |
} | |
// backOff return a new random duration in the interval [d, 3d]. | |
// It implements truncated exponential back-off with introduced jitter. | |
func backOff(d time.Duration) time.Duration { | |
return time.Duration(float64(d.Nanoseconds()*2) * (0.5 + rand.Float64())) | |
} | |
func shouldRetry(code int) bool { | |
return code == http.StatusInternalServerError || | |
code == http.StatusTooManyRequests || | |
code == http.StatusServiceUnavailable | |
} | |