Spaces:
Sleeping
Sleeping
| // _ _ | |
| // __ _____ __ ___ ___ __ _| |_ ___ | |
| // \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
| // \ V V / __/ (_| |\ V /| | (_| | || __/ | |
| // \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
| // | |
| // Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. | |
| // | |
| // CONTACT: [email protected] | |
| // | |
| package replica | |
| import ( | |
| "context" | |
| "fmt" | |
| "sync/atomic" | |
| "time" | |
| "github.com/go-openapi/strfmt" | |
| "github.com/sirupsen/logrus" | |
| "github.com/weaviate/weaviate/entities/storobj" | |
| "github.com/weaviate/weaviate/usecases/objects" | |
| ) | |
| // opID operation encode as and int | |
| type opID int | |
| const ( | |
| opPutObject opID = iota + 1 | |
| opMergeObject | |
| opDeleteObject | |
| opPutObjects = iota + 97 | |
| opAddReferences | |
| opDeleteObjects | |
| ) | |
| type ( | |
| shardingState interface { | |
| NodeName() string | |
| ResolveParentNodes(class, shardName string) (map[string]string, error) | |
| } | |
| nodeResolver interface { | |
| NodeHostname(nodeName string) (string, bool) | |
| } | |
| // _Result represents a valid value or an error ( _ prevent make it public). | |
| _Result[T any] struct { | |
| Value T | |
| Err error | |
| } | |
| ) | |
| type Replicator struct { | |
| class string | |
| stateGetter shardingState | |
| client Client | |
| resolver *resolver | |
| log logrus.FieldLogger | |
| requestCounter atomic.Uint64 | |
| stream replicatorStream | |
| *Finder | |
| } | |
| func NewReplicator(className string, | |
| stateGetter shardingState, | |
| nodeResolver nodeResolver, | |
| client Client, | |
| l logrus.FieldLogger, | |
| ) *Replicator { | |
| resolver := &resolver{ | |
| Schema: stateGetter, | |
| nodeResolver: nodeResolver, | |
| Class: className, | |
| NodeName: stateGetter.NodeName(), | |
| } | |
| return &Replicator{ | |
| class: className, | |
| stateGetter: stateGetter, | |
| client: client, | |
| resolver: resolver, | |
| log: l, | |
| Finder: NewFinder(className, resolver, client, l), | |
| } | |
| } | |
| func (r *Replicator) PutObject(ctx context.Context, | |
| shard string, | |
| obj *storobj.Object, | |
| l ConsistencyLevel, | |
| ) error { | |
| coord := newCoordinator[SimpleResponse](r, shard, r.requestID(opPutObject), r.log) | |
| isReady := func(ctx context.Context, host, requestID string) error { | |
| resp, err := r.client.PutObject(ctx, host, r.class, shard, requestID, obj) | |
| if err == nil { | |
| err = resp.FirstError() | |
| } | |
| if err != nil { | |
| return fmt.Errorf("%q: %w", host, err) | |
| } | |
| return nil | |
| } | |
| replyCh, level, err := coord.Push(ctx, l, isReady, r.simpleCommit(shard)) | |
| if err != nil { | |
| r.log.WithField("op", "push.one").WithField("class", r.class). | |
| WithField("shard", shard).Error(err) | |
| return fmt.Errorf("%s %q: %w", msgCLevel, l, errReplicas) | |
| } | |
| err = r.stream.readErrors(1, level, replyCh)[0] | |
| if err != nil { | |
| r.log.WithField("op", "put").WithField("class", r.class). | |
| WithField("shard", shard).WithField("uuid", obj.ID()).Error(err) | |
| } | |
| return err | |
| } | |
| func (r *Replicator) MergeObject(ctx context.Context, | |
| shard string, | |
| doc *objects.MergeDocument, | |
| l ConsistencyLevel, | |
| ) error { | |
| coord := newCoordinator[SimpleResponse](r, shard, r.requestID(opMergeObject), r.log) | |
| op := func(ctx context.Context, host, requestID string) error { | |
| resp, err := r.client.MergeObject(ctx, host, r.class, shard, requestID, doc) | |
| if err == nil { | |
| err = resp.FirstError() | |
| } | |
| if err != nil { | |
| return fmt.Errorf("%q: %w", host, err) | |
| } | |
| return nil | |
| } | |
| replyCh, level, err := coord.Push(ctx, l, op, r.simpleCommit(shard)) | |
| if err != nil { | |
| r.log.WithField("op", "push.merge").WithField("class", r.class). | |
| WithField("shard", shard).Error(err) | |
| return fmt.Errorf("%s %q: %w", msgCLevel, l, errReplicas) | |
| } | |
| err = r.stream.readErrors(1, level, replyCh)[0] | |
| if err != nil { | |
| r.log.WithField("op", "put").WithField("class", r.class). | |
| WithField("shard", shard).WithField("uuid", doc.ID).Error(err) | |
| } | |
| return err | |
| } | |
| func (r *Replicator) DeleteObject(ctx context.Context, | |
| shard string, | |
| id strfmt.UUID, | |
| l ConsistencyLevel, | |
| ) error { | |
| coord := newCoordinator[SimpleResponse](r, shard, r.requestID(opDeleteObject), r.log) | |
| op := func(ctx context.Context, host, requestID string) error { | |
| resp, err := r.client.DeleteObject(ctx, host, r.class, shard, requestID, id) | |
| if err == nil { | |
| err = resp.FirstError() | |
| } | |
| if err != nil { | |
| return fmt.Errorf("%q: %w", host, err) | |
| } | |
| return nil | |
| } | |
| replyCh, level, err := coord.Push(ctx, l, op, r.simpleCommit(shard)) | |
| if err != nil { | |
| r.log.WithField("op", "push.delete").WithField("class", r.class). | |
| WithField("shard", shard).Error(err) | |
| return fmt.Errorf("%s %q: %w", msgCLevel, l, errReplicas) | |
| } | |
| err = r.stream.readErrors(1, level, replyCh)[0] | |
| if err != nil { | |
| r.log.WithField("op", "put").WithField("class", r.class). | |
| WithField("shard", shard).WithField("uuid", id).Error(err) | |
| } | |
| return err | |
| } | |
| func (r *Replicator) PutObjects(ctx context.Context, | |
| shard string, | |
| objs []*storobj.Object, | |
| l ConsistencyLevel, | |
| ) []error { | |
| coord := newCoordinator[SimpleResponse](r, shard, r.requestID(opPutObjects), r.log) | |
| op := func(ctx context.Context, host, requestID string) error { | |
| resp, err := r.client.PutObjects(ctx, host, r.class, shard, requestID, objs) | |
| if err == nil { | |
| err = resp.FirstError() | |
| } | |
| if err != nil { | |
| return fmt.Errorf("%q: %w", host, err) | |
| } | |
| return nil | |
| } | |
| replyCh, level, err := coord.Push(ctx, l, op, r.simpleCommit(shard)) | |
| if err != nil { | |
| r.log.WithField("op", "push.many").WithField("class", r.class). | |
| WithField("shard", shard).Error(err) | |
| err = fmt.Errorf("%s %q: %w", msgCLevel, l, errReplicas) | |
| errs := make([]error, len(objs)) | |
| for i := 0; i < len(objs); i++ { | |
| errs[i] = err | |
| } | |
| return errs | |
| } | |
| errs := r.stream.readErrors(len(objs), level, replyCh) | |
| if err := firstError(errs); err != nil { | |
| r.log.WithField("op", "put.many").WithField("class", r.class). | |
| WithField("shard", shard).Error(errs) | |
| } | |
| return errs | |
| } | |
| func (r *Replicator) DeleteObjects(ctx context.Context, | |
| shard string, | |
| uuids []strfmt.UUID, | |
| dryRun bool, | |
| l ConsistencyLevel, | |
| ) []objects.BatchSimpleObject { | |
| coord := newCoordinator[DeleteBatchResponse](r, shard, r.requestID(opDeleteObjects), r.log) | |
| op := func(ctx context.Context, host, requestID string) error { | |
| resp, err := r.client.DeleteObjects( | |
| ctx, host, r.class, shard, requestID, uuids, dryRun) | |
| if err == nil { | |
| err = resp.FirstError() | |
| } | |
| if err != nil { | |
| return fmt.Errorf("%q: %w", host, err) | |
| } | |
| return nil | |
| } | |
| commit := func(ctx context.Context, host, requestID string) (DeleteBatchResponse, error) { | |
| resp := DeleteBatchResponse{} | |
| err := r.client.Commit(ctx, host, r.class, shard, requestID, &resp) | |
| if err == nil { | |
| err = resp.FirstError() | |
| } | |
| if err != nil { | |
| err = fmt.Errorf("%q: %w", host, err) | |
| } | |
| return resp, err | |
| } | |
| replyCh, level, err := coord.Push(ctx, l, op, commit) | |
| if err != nil { | |
| r.log.WithField("op", "push.deletes").WithField("class", r.class). | |
| WithField("shard", shard).Error(err) | |
| err = fmt.Errorf("%s %q: %w", msgCLevel, l, errReplicas) | |
| errs := make([]objects.BatchSimpleObject, len(uuids)) | |
| for i := 0; i < len(uuids); i++ { | |
| errs[i].Err = err | |
| } | |
| return errs | |
| } | |
| rs := r.stream.readDeletions(len(uuids), level, replyCh) | |
| if err := firstBatchError(rs); err != nil { | |
| r.log.WithField("op", "put.many").WithField("class", r.class). | |
| WithField("shard", shard).Error(rs) | |
| } | |
| return rs | |
| } | |
| func (r *Replicator) AddReferences(ctx context.Context, | |
| shard string, | |
| refs []objects.BatchReference, | |
| l ConsistencyLevel, | |
| ) []error { | |
| coord := newCoordinator[SimpleResponse](r, shard, r.requestID(opAddReferences), r.log) | |
| op := func(ctx context.Context, host, requestID string) error { | |
| resp, err := r.client.AddReferences(ctx, host, r.class, shard, requestID, refs) | |
| if err == nil { | |
| err = resp.FirstError() | |
| } | |
| if err != nil { | |
| return fmt.Errorf("%q: %w", host, err) | |
| } | |
| return nil | |
| } | |
| replyCh, level, err := coord.Push(ctx, l, op, r.simpleCommit(shard)) | |
| if err != nil { | |
| r.log.WithField("op", "push.refs").WithField("class", r.class). | |
| WithField("shard", shard).Error(err) | |
| err = fmt.Errorf("%s %q: %w", msgCLevel, l, errReplicas) | |
| errs := make([]error, len(refs)) | |
| for i := 0; i < len(refs); i++ { | |
| errs[i] = err | |
| } | |
| return errs | |
| } | |
| errs := r.stream.readErrors(len(refs), level, replyCh) | |
| if err := firstError(errs); err != nil { | |
| r.log.WithField("op", "put.refs").WithField("class", r.class). | |
| WithField("shard", shard).Error(errs) | |
| } | |
| return errs | |
| } | |
| // simpleCommit generate commit function for the coordinator | |
| func (r *Replicator) simpleCommit(shard string) commitOp[SimpleResponse] { | |
| return func(ctx context.Context, host, requestID string) (SimpleResponse, error) { | |
| resp := SimpleResponse{} | |
| err := r.client.Commit(ctx, host, r.class, shard, requestID, &resp) | |
| if err == nil { | |
| err = resp.FirstError() | |
| } | |
| if err != nil { | |
| err = fmt.Errorf("%s: %w", host, err) | |
| } | |
| return resp, err | |
| } | |
| } | |
| // requestID returns ID as [CoordinatorName-OpCode-TimeStamp-Counter]. | |
| // The coordinator uses it to uniquely identify a transaction. | |
| // ID makes the request observable in the cluster by specifying its origin | |
| // and the kind of replication request. | |
| func (r *Replicator) requestID(op opID) string { | |
| return fmt.Sprintf("%s-%.2x-%x-%x", | |
| r.stateGetter.NodeName(), | |
| op, | |
| time.Now().UnixMilli(), | |
| r.requestCounter.Add(1)) | |
| } | |