Spaces:
Sleeping
Sleeping
| // _ _ | |
| // __ _____ __ ___ ___ __ _| |_ ___ | |
| // \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
| // \ V V / __/ (_| |\ V /| | (_| | || __/ | |
| // \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
| // | |
| // Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. | |
| // | |
| // CONTACT: [email protected] | |
| // | |
| package replica | |
| import ( | |
| "context" | |
| "fmt" | |
| "github.com/go-openapi/strfmt" | |
| "github.com/weaviate/weaviate/entities/additional" | |
| "github.com/weaviate/weaviate/entities/search" | |
| "github.com/weaviate/weaviate/entities/storobj" | |
| "github.com/weaviate/weaviate/usecases/objects" | |
| ) | |
| const ( | |
| // RequestKey is used to marshalling request IDs | |
| RequestKey = "request_id" | |
| ) | |
| // Client is used to read and write objects on replicas | |
| type Client interface { | |
| rClient | |
| wClient | |
| } | |
| // StatusCode is communicate the cause of failure during replication | |
| type StatusCode int | |
| const ( | |
| StatusOK = 0 | |
| StatusClassNotFound = iota + 200 | |
| StatusShardNotFound | |
| StatusNotFound | |
| StatusAlreadyExisted | |
| StatusNotReady | |
| StatusConflict = iota + 300 | |
| StatusPreconditionFailed | |
| StatusReadOnly | |
| ) | |
| // Error reports error happening during replication | |
| type Error struct { | |
| Code StatusCode `json:"code"` | |
| Msg string `json:"msg,omitempty"` | |
| Err error `json:"-"` | |
| } | |
| // Empty checks whether e is an empty error which equivalent to e == nil | |
| func (e *Error) Empty() bool { | |
| return e.Code == StatusOK && e.Msg == "" && e.Err == nil | |
| } | |
| // NewError create new replication error | |
| func NewError(code StatusCode, msg string) *Error { | |
| return &Error{code, msg, nil} | |
| } | |
| func (e *Error) Clone() *Error { | |
| return &Error{Code: e.Code, Msg: e.Msg, Err: e.Err} | |
| } | |
| // Unwrap underlying error | |
| func (e *Error) Unwrap() error { return e.Err } | |
| func (e *Error) Error() string { | |
| return fmt.Sprintf("%s %q: %v", statusText(e.Code), e.Msg, e.Err) | |
| } | |
| func (e *Error) IsStatusCode(sc StatusCode) bool { | |
| return e.Code == sc | |
| } | |
| // statusText returns a text for the status code. It returns the empty | |
| // string if the code is unknown. | |
| func statusText(code StatusCode) string { | |
| switch code { | |
| case StatusOK: | |
| return "ok" | |
| case StatusNotFound: | |
| return "not found" | |
| case StatusClassNotFound: | |
| return "class not found" | |
| case StatusShardNotFound: | |
| return "shard not found" | |
| case StatusConflict: | |
| return "conflict" | |
| case StatusPreconditionFailed: | |
| return "precondition failed" | |
| case StatusAlreadyExisted: | |
| return "already existed" | |
| case StatusNotReady: | |
| return "local index not ready" | |
| case StatusReadOnly: | |
| return "read only" | |
| default: | |
| return "" | |
| } | |
| } | |
| func (e *Error) Timeout() bool { | |
| t, ok := e.Err.(interface { | |
| Timeout() bool | |
| }) | |
| return ok && t.Timeout() | |
| } | |
| type SimpleResponse struct { | |
| Errors []Error `json:"errors,omitempty"` | |
| } | |
| func (r *SimpleResponse) FirstError() error { | |
| for i, err := range r.Errors { | |
| if !err.Empty() { | |
| return &r.Errors[i] | |
| } | |
| } | |
| return nil | |
| } | |
| // DeleteBatchResponse represents the response returned by DeleteObjects | |
| type DeleteBatchResponse struct { | |
| Batch []UUID2Error `json:"batch,omitempty"` | |
| } | |
| type UUID2Error struct { | |
| UUID string `json:"uuid,omitempty"` | |
| Error Error `json:"error,omitempty"` | |
| } | |
| // FirstError returns the first found error | |
| func (r *DeleteBatchResponse) FirstError() error { | |
| for i, ue := range r.Batch { | |
| if !ue.Error.Empty() { | |
| return &r.Batch[i].Error | |
| } | |
| } | |
| return nil | |
| } | |
| type RepairResponse struct { | |
| ID string // object id | |
| Version int64 // sender's current version of the object | |
| UpdateTime int64 // sender's current update time | |
| Err string | |
| Deleted bool | |
| } | |
| func fromReplicas(xs []objects.Replica) []*storobj.Object { | |
| rs := make([]*storobj.Object, len(xs)) | |
| for i := range xs { | |
| rs[i] = xs[i].Object | |
| } | |
| return rs | |
| } | |
| // wClient is the client used to write to replicas | |
| type wClient interface { | |
| PutObject(ctx context.Context, host, index, shard, requestID string, | |
| obj *storobj.Object) (SimpleResponse, error) | |
| DeleteObject(ctx context.Context, host, index, shard, requestID string, | |
| id strfmt.UUID) (SimpleResponse, error) | |
| PutObjects(ctx context.Context, host, index, shard, requestID string, | |
| objs []*storobj.Object) (SimpleResponse, error) | |
| MergeObject(ctx context.Context, host, index, shard, requestID string, | |
| mergeDoc *objects.MergeDocument) (SimpleResponse, error) | |
| DeleteObjects(ctx context.Context, host, index, shard, requestID string, | |
| uuids []strfmt.UUID, dryRun bool) (SimpleResponse, error) | |
| AddReferences(ctx context.Context, host, index, shard, requestID string, | |
| refs []objects.BatchReference) (SimpleResponse, error) | |
| Commit(ctx context.Context, host, index, shard, requestID string, resp interface{}) error | |
| Abort(ctx context.Context, host, index, shard, requestID string) (SimpleResponse, error) | |
| } | |
| // rClient is the client used to read from remote replicas | |
| type rClient interface { | |
| // FetchObject fetches one object | |
| FetchObject(_ context.Context, host, index, shard string, | |
| id strfmt.UUID, props search.SelectProperties, | |
| additional additional.Properties) (objects.Replica, error) | |
| // FetchObjects fetches objects specified in ids list. | |
| FetchObjects(_ context.Context, host, index, shard string, | |
| ids []strfmt.UUID) ([]objects.Replica, error) | |
| // OverwriteObjects conditionally updates existing objects. | |
| OverwriteObjects(_ context.Context, host, index, shard string, | |
| _ []*objects.VObject) ([]RepairResponse, error) | |
| // DigestObjects finds a list of objects and returns a compact representation | |
| // of a list of the objects. This is used by the replicator to optimize the | |
| // number of bytes transferred over the network when fetching a replicated | |
| // object | |
| DigestObjects(ctx context.Context, host, index, shard string, | |
| ids []strfmt.UUID) ([]RepairResponse, error) | |
| } | |
| // finderClient extends RClient with consistency checks | |
| type finderClient struct { | |
| cl rClient | |
| } | |
| // FullRead reads full object | |
| func (fc finderClient) FullRead(ctx context.Context, | |
| host, index, shard string, | |
| id strfmt.UUID, | |
| props search.SelectProperties, | |
| additional additional.Properties, | |
| ) (objects.Replica, error) { | |
| return fc.cl.FetchObject(ctx, host, index, shard, id, props, additional) | |
| } | |
| // DigestReads reads digests of all specified objects | |
| func (fc finderClient) DigestReads(ctx context.Context, | |
| host, index, shard string, | |
| ids []strfmt.UUID, | |
| ) ([]RepairResponse, error) { | |
| n := len(ids) | |
| rs, err := fc.cl.DigestObjects(ctx, host, index, shard, ids) | |
| if err == nil && len(rs) != n { | |
| err = fmt.Errorf("malformed digest read response: length expected %d got %d", n, len(rs)) | |
| } | |
| return rs, err | |
| } | |
| // FullReads read full objects | |
| func (fc finderClient) FullReads(ctx context.Context, | |
| host, index, shard string, | |
| ids []strfmt.UUID, | |
| ) ([]objects.Replica, error) { | |
| n := len(ids) | |
| rs, err := fc.cl.FetchObjects(ctx, host, index, shard, ids) | |
| if m := len(rs); err == nil && n != m { | |
| err = fmt.Errorf("malformed full read response: length expected %d got %d", n, m) | |
| } | |
| return rs, err | |
| } | |
| // Overwrite specified object with most recent contents | |
| func (fc finderClient) Overwrite(ctx context.Context, | |
| host, index, shard string, | |
| xs []*objects.VObject, | |
| ) ([]RepairResponse, error) { | |
| return fc.cl.OverwriteObjects(ctx, host, index, shard, xs) | |
| } | |