KevinStephenson
Adding in weaviate code
b110593
raw
history blame
15 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package sharding
import (
"fmt"
"math"
"math/rand"
"sort"
"github.com/spaolacci/murmur3"
"github.com/weaviate/weaviate/entities/schema"
"github.com/weaviate/weaviate/usecases/cluster"
)
const shardNameLength = 12
type State struct {
IndexID string `json:"indexID"` // for monitoring, reporting purposes. Does not influence the shard-calculations
Config Config `json:"config"`
Physical map[string]Physical `json:"physical"`
Virtual []Virtual `json:"virtual"`
PartitioningEnabled bool `json:"partitioningEnabled"`
// different for each node, not to be serialized
localNodeName string // TODO: localNodeName is static it is better to store just once
}
// MigrateFromOldFormat checks if the old (pre-v1.17) format was used and
// migrates it into the new format for backward-compatibility with all classes
// created before v1.17
func (s *State) MigrateFromOldFormat() {
for shardName, shard := range s.Physical {
if shard.LegacyBelongsToNodeForBackwardCompat != "" && len(shard.BelongsToNodes) == 0 {
shard.BelongsToNodes = []string{
shard.LegacyBelongsToNodeForBackwardCompat,
}
shard.LegacyBelongsToNodeForBackwardCompat = ""
}
s.Physical[shardName] = shard
}
}
type Virtual struct {
Name string `json:"name"`
Upper uint64 `json:"upper"`
OwnsPercentage float64 `json:"ownsPercentage"`
AssignedToPhysical string `json:"assignedToPhysical"`
}
type Physical struct {
Name string `json:"name"`
OwnsVirtual []string `json:"ownsVirtual,omitempty"`
OwnsPercentage float64 `json:"ownsPercentage"`
LegacyBelongsToNodeForBackwardCompat string `json:"belongsToNode,omitempty"`
BelongsToNodes []string `json:"belongsToNodes,omitempty"`
Status string `json:"status,omitempty"`
}
// BelongsToNode for backward-compatibility when there was no replication. It
// always returns the first node of the list
func (p Physical) BelongsToNode() string {
return p.BelongsToNodes[0]
}
// AdjustReplicas shrinks or extends the replica set (p.BelongsToNodes)
func (p *Physical) AdjustReplicas(count int, nodes nodes) error {
if count < 0 {
return fmt.Errorf("negative replication factor: %d", count)
}
// let's be defensive here and make sure available replicas are unique.
available := make(map[string]bool)
for _, n := range p.BelongsToNodes {
available[n] = true
}
// a == b should be always true except in case of bug
if b, a := len(p.BelongsToNodes), len(available); b > a {
p.BelongsToNodes = p.BelongsToNodes[:a]
i := 0
for n := range available {
p.BelongsToNodes[i] = n
i++
}
}
if count < len(p.BelongsToNodes) { // less replicas wanted
p.BelongsToNodes = p.BelongsToNodes[:count]
return nil
}
names := nodes.Candidates()
if count > len(names) {
return fmt.Errorf("not enough replicas: found %d want %d", len(names), count)
}
// make sure included nodes are unique
for _, n := range names {
if !available[n] {
p.BelongsToNodes = append(p.BelongsToNodes, n)
available[n] = true
}
if len(available) == count {
break
}
}
return nil
}
func (p *Physical) ActivityStatus() string {
return schema.ActivityStatus(p.Status)
}
type nodes interface {
Candidates() []string
LocalName() string
}
func InitState(id string, config Config, nodes nodes, replFactor int64, partitioningEnabled bool) (*State, error) {
out := &State{
Config: config,
IndexID: id,
localNodeName: nodes.LocalName(),
PartitioningEnabled: partitioningEnabled,
}
if partitioningEnabled {
out.Physical = make(map[string]Physical, 128)
return out, nil
}
names := nodes.Candidates()
if f, n := replFactor, len(names); f > int64(n) {
return nil, fmt.Errorf("not enough replicas: found %d want %d", n, f)
}
if err := out.initPhysical(names, replFactor); err != nil {
return nil, err
}
out.initVirtual()
out.distributeVirtualAmongPhysical()
return out, nil
}
// Shard returns the shard name if it exits and empty string otherwise
func (s *State) Shard(partitionKey, objectID string) string {
if s.PartitioningEnabled {
if _, ok := s.Physical[partitionKey]; ok {
return partitionKey // will change in the future
}
return ""
}
return s.PhysicalShard([]byte(objectID))
}
func (s *State) PhysicalShard(in []byte) string {
if len(s.Physical) == 0 {
panic("no physical shards present")
}
if len(s.Virtual) == 0 {
panic("no virtual shards present")
}
h := murmur3.New64()
h.Write(in)
token := h.Sum64()
virtual := s.virtualByToken(token)
return virtual.AssignedToPhysical
}
// CountPhysicalShards return a count of physical shards
func (s *State) CountPhysicalShards() int {
return len(s.Physical)
}
func (s *State) AllPhysicalShards() []string {
var names []string
for _, physical := range s.Physical {
names = append(names, physical.Name)
}
sort.Slice(names, func(a, b int) bool {
return names[a] < names[b]
})
return names
}
func (s *State) AllLocalPhysicalShards() []string {
var names []string
for _, physical := range s.Physical {
if s.IsLocalShard(physical.Name) {
names = append(names, physical.Name)
}
}
sort.Slice(names, func(a, b int) bool {
return names[a] < names[b]
})
return names
}
func (s *State) SetLocalName(name string) {
s.localNodeName = name
}
func (s *State) IsLocalShard(name string) bool {
for _, node := range s.Physical[name].BelongsToNodes {
if node == s.localNodeName {
return true
}
}
return false
}
// initPhysical assigns shards to nodes according to the following rules:
//
// - The starting point of the ring is random
// - Shard N+1's first node is the right neighbor of shard N's first node
// - If a shard has multiple nodes (replication) they are always the right
// neighbors of the first node of that shard
//
// Example with 3 nodes, 2 shards, replicationFactor=2:
//
// Shard 1: Node1, Node2
// Shard 2: Node2, Node3
//
// Example with 3 nodes, 3 shards, replicationFactor=3:
//
// Shard 1: Node1, Node2, Node3
// Shard 2: Node2, Node3, Node1
// Shard 3: Node3, Node1, Node2
//
// Example with 12 nodes, 3 shards, replicationFactor=5:
//
// Shard 1: Node7, Node8, Node9, Node10, Node 11
// Shard 2: Node8, Node9, Node10, Node 11, Node 12
// Shard 3: Node9, Node10, Node11, Node 12, Node 1
func (s *State) initPhysical(names []string, replFactor int64) error {
it, err := cluster.NewNodeIterator(names, cluster.StartAfter)
if err != nil {
return err
}
it.SetStartNode(names[len(names)-1])
s.Physical = map[string]Physical{}
for i := 0; i < s.Config.DesiredCount; i++ {
name := generateShardName()
shard := Physical{Name: name}
node := it.Next()
shard.BelongsToNodes = []string{node}
if replFactor > 1 {
// create a second node iterator and start after the already assigned
// one, this way we can identify our next n right neighbors without
// affecting the root iterator which will determine the next shard
replicationIter, err := cluster.NewNodeIterator(names, cluster.StartAfter)
if err != nil {
return fmt.Errorf("assign replication nodes: %w", err)
}
replicationIter.SetStartNode(node)
// the first node is already assigned, we only need to assign the
// additional nodes
for i := replFactor; i > 1; i-- {
shard.BelongsToNodes = append(shard.BelongsToNodes, replicationIter.Next())
}
}
s.Physical[name] = shard
}
return nil
}
// GetPartitions based on the specified shards, available nodes, and replFactor
// It doesn't change the internal state
func (s *State) GetPartitions(nodes nodes, shards []string, replFactor int64) (map[string][]string, error) {
names := nodes.Candidates()
if len(names) == 0 {
return nil, fmt.Errorf("list of node candidates is empty")
}
if f, n := replFactor, len(names); f > int64(n) {
return nil, fmt.Errorf("not enough replicas: found %d want %d", n, f)
}
it, err := cluster.NewNodeIterator(names, cluster.StartAfter)
if err != nil {
return nil, err
}
it.SetStartNode(names[len(names)-1])
partitions := make(map[string][]string, len(shards))
for _, name := range shards {
if _, alreadyExists := s.Physical[name]; alreadyExists {
continue
}
owners := make([]string, 1, replFactor)
node := it.Next()
owners[0] = node
if replFactor > 1 {
// create a second node iterator and start after the already assigned
// one, this way we can identify our next n right neighbors without
// affecting the root iterator which will determine the next shard
replicationIter, err := cluster.NewNodeIterator(names, cluster.StartAfter)
if err != nil {
return nil, fmt.Errorf("assign replication nodes: %w", err)
}
replicationIter.SetStartNode(node)
// the first node is already assigned, we only need to assign the
// additional nodes
for i := replFactor; i > 1; i-- {
owners = append(owners, replicationIter.Next())
}
}
partitions[name] = owners
}
return partitions, nil
}
// AddPartition to physical shards
func (s *State) AddPartition(name string, nodes []string, status string) Physical {
p := Physical{
Name: name,
BelongsToNodes: nodes,
OwnsPercentage: 1.0,
Status: status,
}
s.Physical[name] = p
return p
}
// DeletePartition to physical shards
func (s *State) DeletePartition(name string) {
delete(s.Physical, name)
}
// ApplyNodeMapping replaces node names with their new value form nodeMapping in s.
// If s.LegacyBelongsToNodeForBackwardCompat is non empty, it will also perform node name replacement if present in nodeMapping.
func (s *State) ApplyNodeMapping(nodeMapping map[string]string) {
if len(nodeMapping) == 0 {
return
}
for k, v := range s.Physical {
if v.LegacyBelongsToNodeForBackwardCompat != "" {
if newNodeName, ok := nodeMapping[v.LegacyBelongsToNodeForBackwardCompat]; ok {
v.LegacyBelongsToNodeForBackwardCompat = newNodeName
}
}
for i, nodeName := range v.BelongsToNodes {
if newNodeName, ok := nodeMapping[nodeName]; ok {
v.BelongsToNodes[i] = newNodeName
}
}
s.Physical[k] = v
}
}
func (s *State) initVirtual() {
count := s.Config.DesiredVirtualCount
s.Virtual = make([]Virtual, count)
for i := range s.Virtual {
name := generateShardName()
h := murmur3.New64()
h.Write([]byte(name))
s.Virtual[i] = Virtual{Name: name, Upper: h.Sum64()}
}
sort.Slice(s.Virtual, func(a, b int) bool {
return s.Virtual[a].Upper < s.Virtual[b].Upper
})
for i := range s.Virtual {
var tokenCount uint64
if i == 0 {
tokenCount = s.Virtual[0].Upper + (math.MaxUint64 - s.Virtual[len(s.Virtual)-1].Upper)
} else {
tokenCount = s.Virtual[i].Upper - s.Virtual[i-1].Upper
}
s.Virtual[i].OwnsPercentage = float64(tokenCount) / float64(math.MaxUint64)
}
}
// this is a primitive distribution that only works for initializing. Once we
// want to support dynamic sharding, we need to come up with something better
// than this
func (s *State) distributeVirtualAmongPhysical() {
ids := make([]string, len(s.Virtual))
for i, v := range s.Virtual {
ids[i] = v.Name
}
rand.Shuffle(len(s.Virtual), func(a, b int) {
ids[a], ids[b] = ids[b], ids[a]
})
physicalIDs := make([]string, 0, len(s.Physical))
for name := range s.Physical {
physicalIDs = append(physicalIDs, name)
}
for i, vid := range ids {
pickedPhysical := physicalIDs[i%len(physicalIDs)]
virtual := s.virtualByName(vid)
virtual.AssignedToPhysical = pickedPhysical
physical := s.Physical[pickedPhysical]
physical.OwnsVirtual = append(physical.OwnsVirtual, vid)
physical.OwnsPercentage += virtual.OwnsPercentage
s.Physical[pickedPhysical] = physical
}
}
// uses linear search, but should only be used during shard init and update
// operations, not in regular
func (s *State) virtualByName(name string) *Virtual {
for i := range s.Virtual {
if s.Virtual[i].Name == name {
return &s.Virtual[i]
}
}
return nil
}
func (s *State) virtualByToken(token uint64) *Virtual {
for i := range s.Virtual {
if token > s.Virtual[i].Upper {
continue
}
return &s.Virtual[i]
}
return &s.Virtual[0]
}
const shardNameChars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
func generateShardName() string {
b := make([]byte, shardNameLength)
for i := range b {
b[i] = shardNameChars[rand.Intn(len(shardNameChars))]
}
return string(b)
}
func (s State) DeepCopy() State {
var virtualCopy []Virtual
physicalCopy := make(map[string]Physical, len(s.Physical))
for name, shard := range s.Physical {
physicalCopy[name] = shard.DeepCopy()
}
if len(s.Virtual) > 0 {
virtualCopy = make([]Virtual, len(s.Virtual))
}
for i, virtual := range s.Virtual {
virtualCopy[i] = virtual.DeepCopy()
}
return State{
localNodeName: s.localNodeName,
IndexID: s.IndexID,
Config: s.Config.DeepCopy(),
Physical: physicalCopy,
Virtual: virtualCopy,
PartitioningEnabled: s.PartitioningEnabled,
}
}
func (c Config) DeepCopy() Config {
return Config{
VirtualPerPhysical: c.VirtualPerPhysical,
DesiredCount: c.DesiredCount,
ActualCount: c.ActualCount,
DesiredVirtualCount: c.DesiredVirtualCount,
ActualVirtualCount: c.ActualVirtualCount,
Key: c.Key,
Strategy: c.Strategy,
Function: c.Function,
}
}
func (p Physical) DeepCopy() Physical {
var ownsVirtualCopy []string
if len(p.OwnsVirtual) > 0 {
ownsVirtualCopy = make([]string, len(p.OwnsVirtual))
copy(ownsVirtualCopy, p.OwnsVirtual)
}
belongsCopy := make([]string, len(p.BelongsToNodes))
copy(belongsCopy, p.BelongsToNodes)
return Physical{
Name: p.Name,
OwnsVirtual: ownsVirtualCopy,
OwnsPercentage: p.OwnsPercentage,
BelongsToNodes: belongsCopy,
Status: p.Status,
}
}
func (v Virtual) DeepCopy() Virtual {
return Virtual{
Name: v.Name,
Upper: v.Upper,
OwnsPercentage: v.OwnsPercentage,
AssignedToPhysical: v.AssignedToPhysical,
}
}