Spaces:
Running
Running
// _ _ | |
// __ _____ __ ___ ___ __ _| |_ ___ | |
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
// \ V V / __/ (_| |\ V /| | (_| | || __/ | |
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
// | |
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. | |
// | |
// CONTACT: [email protected] | |
// | |
package replica | |
import ( | |
"sort" | |
"github.com/go-openapi/strfmt" | |
"github.com/weaviate/weaviate/entities/storobj" | |
"github.com/weaviate/weaviate/usecases/objects" | |
) | |
// indexedBatch holds an indexed list of objects | |
type indexedBatch struct { | |
Data []*storobj.Object | |
// Index is z-index used to maintain object's order | |
Index []int | |
} | |
// createBatch creates indexedBatch from xs | |
func createBatch(xs []*storobj.Object) indexedBatch { | |
var bi indexedBatch | |
bi.Data = xs | |
bi.Index = make([]int, len(xs)) | |
for i := 0; i < len(xs); i++ { | |
bi.Index[i] = i | |
} | |
return bi | |
} | |
// cluster data object by shard | |
func cluster(bi indexedBatch) []shardPart { | |
index := bi.Index | |
data := bi.Data | |
sort.Slice(index, func(i, j int) bool { | |
return data[index[i]].BelongsToShard < data[index[j]].BelongsToShard | |
}) | |
clusters := make([]shardPart, 0, 16) | |
// partition | |
cur := data[index[0]] | |
j := 0 | |
for i := 1; i < len(index); i++ { | |
if data[index[i]].BelongsToShard == cur.BelongsToShard { | |
continue | |
} | |
clusters = append(clusters, shardPart{ | |
Shard: cur.BelongsToShard, | |
Node: cur.BelongsToNode, Data: data, | |
Index: index[j:i], | |
}) | |
j = i | |
cur = data[index[j]] | |
} | |
clusters = append(clusters, shardPart{ | |
Shard: cur.BelongsToShard, | |
Node: cur.BelongsToNode, Data: data, | |
Index: index[j:], | |
}) | |
return clusters | |
} | |
// shardPart represents a data partition belonging to a physical shard | |
type shardPart struct { | |
Shard string // one-to-one mapping between Shard and Node | |
Node string | |
Data []*storobj.Object | |
Index []int // index for data | |
} | |
func (b *shardPart) ObjectIDs() []strfmt.UUID { | |
xs := make([]strfmt.UUID, len(b.Index)) | |
for i, idx := range b.Index { | |
xs[i] = b.Data[idx].ID() | |
} | |
return xs | |
} | |
func (b *shardPart) Extract() ([]objects.Replica, []strfmt.UUID) { | |
xs := make([]objects.Replica, len(b.Index)) | |
ys := make([]strfmt.UUID, len(b.Index)) | |
for i, idx := range b.Index { | |
p := b.Data[idx] | |
xs[i] = objects.Replica{ID: p.ID(), Deleted: false, Object: p} | |
ys[i] = p.ID() | |
} | |
return xs, ys | |
} | |