KevinStephenson
Adding in weaviate code
b110593
raw
history blame
6.77 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package replica
import (
"context"
"errors"
"fmt"
"github.com/go-openapi/strfmt"
"github.com/sirupsen/logrus"
"github.com/weaviate/weaviate/entities/additional"
"github.com/weaviate/weaviate/entities/search"
"github.com/weaviate/weaviate/entities/storobj"
"github.com/weaviate/weaviate/usecases/objects"
"golang.org/x/sync/errgroup"
)
var (
// msgCLevel consistency level cannot be achieved
msgCLevel = "cannot achieve consistency level"
errReplicas = errors.New("cannot reach enough replicas")
errRepair = errors.New("read repair error")
errRead = errors.New("read error")
)
type (
// senderReply is a container for the data received from a replica
senderReply[T any] struct {
sender string // hostname of the sender
Version int64 // sender's current version of the object
Data T // the data sent by the sender
UpdateTime int64 // sender's current update time
DigestRead bool
}
findOneReply senderReply[objects.Replica]
existReply struct {
Sender string
RepairResponse
}
)
// Finder finds replicated objects
type Finder struct {
resolver *resolver // host names of replicas
finderStream // stream of objects
}
// NewFinder constructs a new finder instance
func NewFinder(className string,
resolver *resolver,
client rClient,
l logrus.FieldLogger,
) *Finder {
cl := finderClient{client}
return &Finder{
resolver: resolver,
finderStream: finderStream{
repairer: repairer{
class: className,
client: cl,
},
log: l,
},
}
}
// GetOne gets object which satisfies the giving consistency
func (f *Finder) GetOne(ctx context.Context,
l ConsistencyLevel, shard string,
id strfmt.UUID,
props search.SelectProperties,
adds additional.Properties,
) (*storobj.Object, error) {
c := newReadCoordinator[findOneReply](f, shard)
op := func(ctx context.Context, host string, fullRead bool) (findOneReply, error) {
if fullRead {
r, err := f.client.FullRead(ctx, host, f.class, shard, id, props, adds)
return findOneReply{host, 0, r, r.UpdateTime(), false}, err
} else {
xs, err := f.client.DigestReads(ctx, host, f.class, shard, []strfmt.UUID{id})
var x RepairResponse
if len(xs) == 1 {
x = xs[0]
}
r := objects.Replica{ID: id, Deleted: x.Deleted}
return findOneReply{host, x.Version, r, x.UpdateTime, true}, err
}
}
replyCh, state, err := c.Pull(ctx, l, op, "")
if err != nil {
f.log.WithField("op", "pull.one").Error(err)
return nil, fmt.Errorf("%s %q: %w", msgCLevel, l, errReplicas)
}
result := <-f.readOne(ctx, shard, id, replyCh, state)
if err = result.Err; err != nil {
err = fmt.Errorf("%s %q: %w", msgCLevel, l, err)
}
return result.Value, err
}
type ShardDesc struct {
Name string
Node string
}
// CheckConsistency for objects belonging to different physical shards.
//
// For each x in xs the fields BelongsToNode and BelongsToShard must be set non empty
func (f *Finder) CheckConsistency(ctx context.Context,
l ConsistencyLevel, xs []*storobj.Object,
) (retErr error) {
if len(xs) == 0 {
return nil
}
for i, x := range xs { // check shard and node name are set
if x == nil {
return fmt.Errorf("contains nil at object at index %d", i)
}
if x.BelongsToNode == "" || x.BelongsToShard == "" {
return fmt.Errorf("missing node or shard at index %d", i)
}
}
if l == One { // already consistent
for i := range xs {
xs[i].IsConsistent = true
}
return nil
}
// check shard consistency concurrently
gr, ctx := errgroup.WithContext(ctx)
for _, part := range cluster(createBatch(xs)) {
part := part
gr.Go(func() error {
_, err := f.checkShardConsistency(ctx, l, part)
if err != nil {
f.log.WithField("op", "check_shard_consistency").
WithField("shard", part.Shard).Error(err)
}
return err
})
}
return gr.Wait()
}
// Exists checks if an object exists which satisfies the giving consistency
func (f *Finder) Exists(ctx context.Context,
l ConsistencyLevel,
shard string,
id strfmt.UUID,
) (bool, error) {
c := newReadCoordinator[existReply](f, shard)
op := func(ctx context.Context, host string, _ bool) (existReply, error) {
xs, err := f.client.DigestReads(ctx, host, f.class, shard, []strfmt.UUID{id})
var x RepairResponse
if len(xs) == 1 {
x = xs[0]
}
return existReply{host, x}, err
}
replyCh, state, err := c.Pull(ctx, l, op, "")
if err != nil {
f.log.WithField("op", "pull.exist").Error(err)
return false, fmt.Errorf("%s %q: %w", msgCLevel, l, errReplicas)
}
result := <-f.readExistence(ctx, shard, id, replyCh, state)
if err = result.Err; err != nil {
err = fmt.Errorf("%s %q: %w", msgCLevel, l, err)
}
return result.Value, err
}
// NodeObject gets object from a specific node.
// it is used mainly for debugging purposes
func (f *Finder) NodeObject(ctx context.Context,
nodeName,
shard string,
id strfmt.UUID,
props search.SelectProperties, adds additional.Properties,
) (*storobj.Object, error) {
host, ok := f.resolver.NodeHostname(nodeName)
if !ok || host == "" {
return nil, fmt.Errorf("cannot resolve node name: %s", nodeName)
}
r, err := f.client.FullRead(ctx, host, f.class, shard, id, props, adds)
return r.Object, err
}
// checkShardConsistency checks consistency for a set of objects belonging to a shard
// It returns the most recent objects or and error
func (f *Finder) checkShardConsistency(ctx context.Context,
l ConsistencyLevel,
batch shardPart,
) ([]*storobj.Object, error) {
var (
c = newReadCoordinator[batchReply](f, batch.Shard)
shard = batch.Shard
data, ids = batch.Extract() // extract from current content
)
op := func(ctx context.Context, host string, fullRead bool) (batchReply, error) {
if fullRead { // we already have the content
return batchReply{Sender: host, IsDigest: false, FullData: data}, nil
} else {
xs, err := f.client.DigestReads(ctx, host, f.class, shard, ids)
return batchReply{Sender: host, IsDigest: true, DigestData: xs}, err
}
}
replyCh, state, err := c.Pull(ctx, l, op, batch.Node)
if err != nil {
return nil, fmt.Errorf("pull shard: %w", errReplicas)
}
result := <-f.readBatchPart(ctx, batch, ids, replyCh, state)
return result.Value, result.Err
}