SemanticSearchPOC / usecases /replica /replicator_stream.go
KevinStephenson
Adding in weaviate code
b110593
raw
history blame
3.82 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package replica
import (
"fmt"
"github.com/go-openapi/strfmt"
"github.com/weaviate/weaviate/usecases/objects"
)
type (
// replicatorStream represents an incoming stream of responses
// to replication requests sent to replicas
replicatorStream struct{}
)
// readErrors reads errors from incoming responses.
// It returns as soon as the specified consistency level l has been reached
func (r replicatorStream) readErrors(batchSize int,
level int,
ch <-chan _Result[SimpleResponse],
) []error {
urs := make([]SimpleResponse, 0, level)
var firstError error
for x := range ch {
if x.Err != nil {
urs = append(urs, x.Value)
if len(x.Value.Errors) == 0 && firstError == nil {
firstError = x.Err
}
} else {
level--
if level == 0 { // consistency level reached
return make([]error, batchSize)
}
}
}
if level > 0 && firstError == nil {
firstError = fmt.Errorf("broadcast: %w", errReplicas)
}
return r.flattenErrors(batchSize, urs, firstError)
}
// readDeletions reads deletion results from incoming responses.
// It returns as soon as the specified consistency level l has been reached
func (r replicatorStream) readDeletions(batchSize int,
level int,
ch <-chan _Result[DeleteBatchResponse],
) []objects.BatchSimpleObject {
rs := make([]DeleteBatchResponse, 0, level)
urs := make([]DeleteBatchResponse, 0, level)
var firstError error
for x := range ch {
if x.Err != nil {
urs = append(urs, x.Value)
if len(x.Value.Batch) == 0 && firstError == nil {
firstError = x.Err
}
} else {
level--
rs = append(rs, x.Value)
if level == 0 { // consistency level reached
return r.flattenDeletions(batchSize, rs, nil)
}
}
}
if level > 0 && firstError == nil {
firstError = fmt.Errorf("broadcast: %w", errReplicas)
}
urs = append(urs, rs...)
return r.flattenDeletions(batchSize, urs, firstError)
}
// flattenErrors extracts errors from responses
func (replicatorStream) flattenErrors(batchSize int,
rs []SimpleResponse,
defaultErr error,
) []error {
errs := make([]error, batchSize)
n := 0
for _, resp := range rs {
if len(resp.Errors) != batchSize {
continue
}
n++
for i, err := range resp.Errors {
if !err.Empty() && errs[i] == nil {
errs[i] = err.Clone()
}
}
}
if n == 0 || n != len(rs) {
for i := range errs {
if errs[i] == nil {
errs[i] = defaultErr
}
}
}
return errs
}
// flattenDeletions extracts deletion results from responses
func (replicatorStream) flattenDeletions(batchSize int,
rs []DeleteBatchResponse,
defaultErr error,
) []objects.BatchSimpleObject {
ret := make([]objects.BatchSimpleObject, batchSize)
n := 0
for _, resp := range rs {
if len(resp.Batch) != batchSize {
continue
}
n++
for i, x := range resp.Batch {
if !x.Error.Empty() && ret[i].Err == nil {
ret[i].Err = x.Error.Clone()
}
if ret[i].UUID == "" && x.UUID != "" {
ret[i].UUID = strfmt.UUID(x.UUID)
}
}
}
if n == 0 || n != len(rs) {
for i := range ret {
if ret[i].Err == nil {
ret[i].Err = defaultErr
}
}
}
return ret
}
func firstError(es []error) error {
for _, e := range es {
if e != nil {
return e
}
}
return nil
}
func firstBatchError(xs []objects.BatchSimpleObject) error {
for _, x := range xs {
if x.Err != nil {
return x.Err
}
}
return nil
}