KevinStephenson
Adding in weaviate code
b110593
raw
history blame
9.5 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package replica
import (
"context"
"fmt"
"sync/atomic"
"time"
"github.com/go-openapi/strfmt"
"github.com/sirupsen/logrus"
"github.com/weaviate/weaviate/entities/storobj"
"github.com/weaviate/weaviate/usecases/objects"
)
// opID operation encode as and int
type opID int
const (
opPutObject opID = iota + 1
opMergeObject
opDeleteObject
opPutObjects = iota + 97
opAddReferences
opDeleteObjects
)
type (
shardingState interface {
NodeName() string
ResolveParentNodes(class, shardName string) (map[string]string, error)
}
nodeResolver interface {
NodeHostname(nodeName string) (string, bool)
}
// _Result represents a valid value or an error ( _ prevent make it public).
_Result[T any] struct {
Value T
Err error
}
)
type Replicator struct {
class string
stateGetter shardingState
client Client
resolver *resolver
log logrus.FieldLogger
requestCounter atomic.Uint64
stream replicatorStream
*Finder
}
func NewReplicator(className string,
stateGetter shardingState,
nodeResolver nodeResolver,
client Client,
l logrus.FieldLogger,
) *Replicator {
resolver := &resolver{
Schema: stateGetter,
nodeResolver: nodeResolver,
Class: className,
NodeName: stateGetter.NodeName(),
}
return &Replicator{
class: className,
stateGetter: stateGetter,
client: client,
resolver: resolver,
log: l,
Finder: NewFinder(className, resolver, client, l),
}
}
func (r *Replicator) PutObject(ctx context.Context,
shard string,
obj *storobj.Object,
l ConsistencyLevel,
) error {
coord := newCoordinator[SimpleResponse](r, shard, r.requestID(opPutObject), r.log)
isReady := func(ctx context.Context, host, requestID string) error {
resp, err := r.client.PutObject(ctx, host, r.class, shard, requestID, obj)
if err == nil {
err = resp.FirstError()
}
if err != nil {
return fmt.Errorf("%q: %w", host, err)
}
return nil
}
replyCh, level, err := coord.Push(ctx, l, isReady, r.simpleCommit(shard))
if err != nil {
r.log.WithField("op", "push.one").WithField("class", r.class).
WithField("shard", shard).Error(err)
return fmt.Errorf("%s %q: %w", msgCLevel, l, errReplicas)
}
err = r.stream.readErrors(1, level, replyCh)[0]
if err != nil {
r.log.WithField("op", "put").WithField("class", r.class).
WithField("shard", shard).WithField("uuid", obj.ID()).Error(err)
}
return err
}
func (r *Replicator) MergeObject(ctx context.Context,
shard string,
doc *objects.MergeDocument,
l ConsistencyLevel,
) error {
coord := newCoordinator[SimpleResponse](r, shard, r.requestID(opMergeObject), r.log)
op := func(ctx context.Context, host, requestID string) error {
resp, err := r.client.MergeObject(ctx, host, r.class, shard, requestID, doc)
if err == nil {
err = resp.FirstError()
}
if err != nil {
return fmt.Errorf("%q: %w", host, err)
}
return nil
}
replyCh, level, err := coord.Push(ctx, l, op, r.simpleCommit(shard))
if err != nil {
r.log.WithField("op", "push.merge").WithField("class", r.class).
WithField("shard", shard).Error(err)
return fmt.Errorf("%s %q: %w", msgCLevel, l, errReplicas)
}
err = r.stream.readErrors(1, level, replyCh)[0]
if err != nil {
r.log.WithField("op", "put").WithField("class", r.class).
WithField("shard", shard).WithField("uuid", doc.ID).Error(err)
}
return err
}
func (r *Replicator) DeleteObject(ctx context.Context,
shard string,
id strfmt.UUID,
l ConsistencyLevel,
) error {
coord := newCoordinator[SimpleResponse](r, shard, r.requestID(opDeleteObject), r.log)
op := func(ctx context.Context, host, requestID string) error {
resp, err := r.client.DeleteObject(ctx, host, r.class, shard, requestID, id)
if err == nil {
err = resp.FirstError()
}
if err != nil {
return fmt.Errorf("%q: %w", host, err)
}
return nil
}
replyCh, level, err := coord.Push(ctx, l, op, r.simpleCommit(shard))
if err != nil {
r.log.WithField("op", "push.delete").WithField("class", r.class).
WithField("shard", shard).Error(err)
return fmt.Errorf("%s %q: %w", msgCLevel, l, errReplicas)
}
err = r.stream.readErrors(1, level, replyCh)[0]
if err != nil {
r.log.WithField("op", "put").WithField("class", r.class).
WithField("shard", shard).WithField("uuid", id).Error(err)
}
return err
}
func (r *Replicator) PutObjects(ctx context.Context,
shard string,
objs []*storobj.Object,
l ConsistencyLevel,
) []error {
coord := newCoordinator[SimpleResponse](r, shard, r.requestID(opPutObjects), r.log)
op := func(ctx context.Context, host, requestID string) error {
resp, err := r.client.PutObjects(ctx, host, r.class, shard, requestID, objs)
if err == nil {
err = resp.FirstError()
}
if err != nil {
return fmt.Errorf("%q: %w", host, err)
}
return nil
}
replyCh, level, err := coord.Push(ctx, l, op, r.simpleCommit(shard))
if err != nil {
r.log.WithField("op", "push.many").WithField("class", r.class).
WithField("shard", shard).Error(err)
err = fmt.Errorf("%s %q: %w", msgCLevel, l, errReplicas)
errs := make([]error, len(objs))
for i := 0; i < len(objs); i++ {
errs[i] = err
}
return errs
}
errs := r.stream.readErrors(len(objs), level, replyCh)
if err := firstError(errs); err != nil {
r.log.WithField("op", "put.many").WithField("class", r.class).
WithField("shard", shard).Error(errs)
}
return errs
}
func (r *Replicator) DeleteObjects(ctx context.Context,
shard string,
uuids []strfmt.UUID,
dryRun bool,
l ConsistencyLevel,
) []objects.BatchSimpleObject {
coord := newCoordinator[DeleteBatchResponse](r, shard, r.requestID(opDeleteObjects), r.log)
op := func(ctx context.Context, host, requestID string) error {
resp, err := r.client.DeleteObjects(
ctx, host, r.class, shard, requestID, uuids, dryRun)
if err == nil {
err = resp.FirstError()
}
if err != nil {
return fmt.Errorf("%q: %w", host, err)
}
return nil
}
commit := func(ctx context.Context, host, requestID string) (DeleteBatchResponse, error) {
resp := DeleteBatchResponse{}
err := r.client.Commit(ctx, host, r.class, shard, requestID, &resp)
if err == nil {
err = resp.FirstError()
}
if err != nil {
err = fmt.Errorf("%q: %w", host, err)
}
return resp, err
}
replyCh, level, err := coord.Push(ctx, l, op, commit)
if err != nil {
r.log.WithField("op", "push.deletes").WithField("class", r.class).
WithField("shard", shard).Error(err)
err = fmt.Errorf("%s %q: %w", msgCLevel, l, errReplicas)
errs := make([]objects.BatchSimpleObject, len(uuids))
for i := 0; i < len(uuids); i++ {
errs[i].Err = err
}
return errs
}
rs := r.stream.readDeletions(len(uuids), level, replyCh)
if err := firstBatchError(rs); err != nil {
r.log.WithField("op", "put.many").WithField("class", r.class).
WithField("shard", shard).Error(rs)
}
return rs
}
func (r *Replicator) AddReferences(ctx context.Context,
shard string,
refs []objects.BatchReference,
l ConsistencyLevel,
) []error {
coord := newCoordinator[SimpleResponse](r, shard, r.requestID(opAddReferences), r.log)
op := func(ctx context.Context, host, requestID string) error {
resp, err := r.client.AddReferences(ctx, host, r.class, shard, requestID, refs)
if err == nil {
err = resp.FirstError()
}
if err != nil {
return fmt.Errorf("%q: %w", host, err)
}
return nil
}
replyCh, level, err := coord.Push(ctx, l, op, r.simpleCommit(shard))
if err != nil {
r.log.WithField("op", "push.refs").WithField("class", r.class).
WithField("shard", shard).Error(err)
err = fmt.Errorf("%s %q: %w", msgCLevel, l, errReplicas)
errs := make([]error, len(refs))
for i := 0; i < len(refs); i++ {
errs[i] = err
}
return errs
}
errs := r.stream.readErrors(len(refs), level, replyCh)
if err := firstError(errs); err != nil {
r.log.WithField("op", "put.refs").WithField("class", r.class).
WithField("shard", shard).Error(errs)
}
return errs
}
// simpleCommit generate commit function for the coordinator
func (r *Replicator) simpleCommit(shard string) commitOp[SimpleResponse] {
return func(ctx context.Context, host, requestID string) (SimpleResponse, error) {
resp := SimpleResponse{}
err := r.client.Commit(ctx, host, r.class, shard, requestID, &resp)
if err == nil {
err = resp.FirstError()
}
if err != nil {
err = fmt.Errorf("%s: %w", host, err)
}
return resp, err
}
}
// requestID returns ID as [CoordinatorName-OpCode-TimeStamp-Counter].
// The coordinator uses it to uniquely identify a transaction.
// ID makes the request observable in the cluster by specifying its origin
// and the kind of replication request.
func (r *Replicator) requestID(op opID) string {
return fmt.Sprintf("%s-%.2x-%x-%x",
r.stateGetter.NodeName(),
op,
time.Now().UnixMilli(),
r.requestCounter.Add(1))
}