// _ _ // __ _____ __ ___ ___ __ _| |_ ___ // \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ // \ V V / __/ (_| |\ V /| | (_| | || __/ // \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| // // Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. // // CONTACT: hello@weaviate.io // package sharding import ( "fmt" "math" "math/rand" "sort" "github.com/spaolacci/murmur3" "github.com/weaviate/weaviate/entities/schema" "github.com/weaviate/weaviate/usecases/cluster" ) const shardNameLength = 12 type State struct { IndexID string `json:"indexID"` // for monitoring, reporting purposes. Does not influence the shard-calculations Config Config `json:"config"` Physical map[string]Physical `json:"physical"` Virtual []Virtual `json:"virtual"` PartitioningEnabled bool `json:"partitioningEnabled"` // different for each node, not to be serialized localNodeName string // TODO: localNodeName is static it is better to store just once } // MigrateFromOldFormat checks if the old (pre-v1.17) format was used and // migrates it into the new format for backward-compatibility with all classes // created before v1.17 func (s *State) MigrateFromOldFormat() { for shardName, shard := range s.Physical { if shard.LegacyBelongsToNodeForBackwardCompat != "" && len(shard.BelongsToNodes) == 0 { shard.BelongsToNodes = []string{ shard.LegacyBelongsToNodeForBackwardCompat, } shard.LegacyBelongsToNodeForBackwardCompat = "" } s.Physical[shardName] = shard } } type Virtual struct { Name string `json:"name"` Upper uint64 `json:"upper"` OwnsPercentage float64 `json:"ownsPercentage"` AssignedToPhysical string `json:"assignedToPhysical"` } type Physical struct { Name string `json:"name"` OwnsVirtual []string `json:"ownsVirtual,omitempty"` OwnsPercentage float64 `json:"ownsPercentage"` LegacyBelongsToNodeForBackwardCompat string `json:"belongsToNode,omitempty"` BelongsToNodes []string `json:"belongsToNodes,omitempty"` Status string `json:"status,omitempty"` } // BelongsToNode for backward-compatibility when there was no replication. It // always returns the first node of the list func (p Physical) BelongsToNode() string { return p.BelongsToNodes[0] } // AdjustReplicas shrinks or extends the replica set (p.BelongsToNodes) func (p *Physical) AdjustReplicas(count int, nodes nodes) error { if count < 0 { return fmt.Errorf("negative replication factor: %d", count) } // let's be defensive here and make sure available replicas are unique. available := make(map[string]bool) for _, n := range p.BelongsToNodes { available[n] = true } // a == b should be always true except in case of bug if b, a := len(p.BelongsToNodes), len(available); b > a { p.BelongsToNodes = p.BelongsToNodes[:a] i := 0 for n := range available { p.BelongsToNodes[i] = n i++ } } if count < len(p.BelongsToNodes) { // less replicas wanted p.BelongsToNodes = p.BelongsToNodes[:count] return nil } names := nodes.Candidates() if count > len(names) { return fmt.Errorf("not enough replicas: found %d want %d", len(names), count) } // make sure included nodes are unique for _, n := range names { if !available[n] { p.BelongsToNodes = append(p.BelongsToNodes, n) available[n] = true } if len(available) == count { break } } return nil } func (p *Physical) ActivityStatus() string { return schema.ActivityStatus(p.Status) } type nodes interface { Candidates() []string LocalName() string } func InitState(id string, config Config, nodes nodes, replFactor int64, partitioningEnabled bool) (*State, error) { out := &State{ Config: config, IndexID: id, localNodeName: nodes.LocalName(), PartitioningEnabled: partitioningEnabled, } if partitioningEnabled { out.Physical = make(map[string]Physical, 128) return out, nil } names := nodes.Candidates() if f, n := replFactor, len(names); f > int64(n) { return nil, fmt.Errorf("not enough replicas: found %d want %d", n, f) } if err := out.initPhysical(names, replFactor); err != nil { return nil, err } out.initVirtual() out.distributeVirtualAmongPhysical() return out, nil } // Shard returns the shard name if it exits and empty string otherwise func (s *State) Shard(partitionKey, objectID string) string { if s.PartitioningEnabled { if _, ok := s.Physical[partitionKey]; ok { return partitionKey // will change in the future } return "" } return s.PhysicalShard([]byte(objectID)) } func (s *State) PhysicalShard(in []byte) string { if len(s.Physical) == 0 { panic("no physical shards present") } if len(s.Virtual) == 0 { panic("no virtual shards present") } h := murmur3.New64() h.Write(in) token := h.Sum64() virtual := s.virtualByToken(token) return virtual.AssignedToPhysical } // CountPhysicalShards return a count of physical shards func (s *State) CountPhysicalShards() int { return len(s.Physical) } func (s *State) AllPhysicalShards() []string { var names []string for _, physical := range s.Physical { names = append(names, physical.Name) } sort.Slice(names, func(a, b int) bool { return names[a] < names[b] }) return names } func (s *State) AllLocalPhysicalShards() []string { var names []string for _, physical := range s.Physical { if s.IsLocalShard(physical.Name) { names = append(names, physical.Name) } } sort.Slice(names, func(a, b int) bool { return names[a] < names[b] }) return names } func (s *State) SetLocalName(name string) { s.localNodeName = name } func (s *State) IsLocalShard(name string) bool { for _, node := range s.Physical[name].BelongsToNodes { if node == s.localNodeName { return true } } return false } // initPhysical assigns shards to nodes according to the following rules: // // - The starting point of the ring is random // - Shard N+1's first node is the right neighbor of shard N's first node // - If a shard has multiple nodes (replication) they are always the right // neighbors of the first node of that shard // // Example with 3 nodes, 2 shards, replicationFactor=2: // // Shard 1: Node1, Node2 // Shard 2: Node2, Node3 // // Example with 3 nodes, 3 shards, replicationFactor=3: // // Shard 1: Node1, Node2, Node3 // Shard 2: Node2, Node3, Node1 // Shard 3: Node3, Node1, Node2 // // Example with 12 nodes, 3 shards, replicationFactor=5: // // Shard 1: Node7, Node8, Node9, Node10, Node 11 // Shard 2: Node8, Node9, Node10, Node 11, Node 12 // Shard 3: Node9, Node10, Node11, Node 12, Node 1 func (s *State) initPhysical(names []string, replFactor int64) error { it, err := cluster.NewNodeIterator(names, cluster.StartAfter) if err != nil { return err } it.SetStartNode(names[len(names)-1]) s.Physical = map[string]Physical{} for i := 0; i < s.Config.DesiredCount; i++ { name := generateShardName() shard := Physical{Name: name} node := it.Next() shard.BelongsToNodes = []string{node} if replFactor > 1 { // create a second node iterator and start after the already assigned // one, this way we can identify our next n right neighbors without // affecting the root iterator which will determine the next shard replicationIter, err := cluster.NewNodeIterator(names, cluster.StartAfter) if err != nil { return fmt.Errorf("assign replication nodes: %w", err) } replicationIter.SetStartNode(node) // the first node is already assigned, we only need to assign the // additional nodes for i := replFactor; i > 1; i-- { shard.BelongsToNodes = append(shard.BelongsToNodes, replicationIter.Next()) } } s.Physical[name] = shard } return nil } // GetPartitions based on the specified shards, available nodes, and replFactor // It doesn't change the internal state func (s *State) GetPartitions(nodes nodes, shards []string, replFactor int64) (map[string][]string, error) { names := nodes.Candidates() if len(names) == 0 { return nil, fmt.Errorf("list of node candidates is empty") } if f, n := replFactor, len(names); f > int64(n) { return nil, fmt.Errorf("not enough replicas: found %d want %d", n, f) } it, err := cluster.NewNodeIterator(names, cluster.StartAfter) if err != nil { return nil, err } it.SetStartNode(names[len(names)-1]) partitions := make(map[string][]string, len(shards)) for _, name := range shards { if _, alreadyExists := s.Physical[name]; alreadyExists { continue } owners := make([]string, 1, replFactor) node := it.Next() owners[0] = node if replFactor > 1 { // create a second node iterator and start after the already assigned // one, this way we can identify our next n right neighbors without // affecting the root iterator which will determine the next shard replicationIter, err := cluster.NewNodeIterator(names, cluster.StartAfter) if err != nil { return nil, fmt.Errorf("assign replication nodes: %w", err) } replicationIter.SetStartNode(node) // the first node is already assigned, we only need to assign the // additional nodes for i := replFactor; i > 1; i-- { owners = append(owners, replicationIter.Next()) } } partitions[name] = owners } return partitions, nil } // AddPartition to physical shards func (s *State) AddPartition(name string, nodes []string, status string) Physical { p := Physical{ Name: name, BelongsToNodes: nodes, OwnsPercentage: 1.0, Status: status, } s.Physical[name] = p return p } // DeletePartition to physical shards func (s *State) DeletePartition(name string) { delete(s.Physical, name) } // ApplyNodeMapping replaces node names with their new value form nodeMapping in s. // If s.LegacyBelongsToNodeForBackwardCompat is non empty, it will also perform node name replacement if present in nodeMapping. func (s *State) ApplyNodeMapping(nodeMapping map[string]string) { if len(nodeMapping) == 0 { return } for k, v := range s.Physical { if v.LegacyBelongsToNodeForBackwardCompat != "" { if newNodeName, ok := nodeMapping[v.LegacyBelongsToNodeForBackwardCompat]; ok { v.LegacyBelongsToNodeForBackwardCompat = newNodeName } } for i, nodeName := range v.BelongsToNodes { if newNodeName, ok := nodeMapping[nodeName]; ok { v.BelongsToNodes[i] = newNodeName } } s.Physical[k] = v } } func (s *State) initVirtual() { count := s.Config.DesiredVirtualCount s.Virtual = make([]Virtual, count) for i := range s.Virtual { name := generateShardName() h := murmur3.New64() h.Write([]byte(name)) s.Virtual[i] = Virtual{Name: name, Upper: h.Sum64()} } sort.Slice(s.Virtual, func(a, b int) bool { return s.Virtual[a].Upper < s.Virtual[b].Upper }) for i := range s.Virtual { var tokenCount uint64 if i == 0 { tokenCount = s.Virtual[0].Upper + (math.MaxUint64 - s.Virtual[len(s.Virtual)-1].Upper) } else { tokenCount = s.Virtual[i].Upper - s.Virtual[i-1].Upper } s.Virtual[i].OwnsPercentage = float64(tokenCount) / float64(math.MaxUint64) } } // this is a primitive distribution that only works for initializing. Once we // want to support dynamic sharding, we need to come up with something better // than this func (s *State) distributeVirtualAmongPhysical() { ids := make([]string, len(s.Virtual)) for i, v := range s.Virtual { ids[i] = v.Name } rand.Shuffle(len(s.Virtual), func(a, b int) { ids[a], ids[b] = ids[b], ids[a] }) physicalIDs := make([]string, 0, len(s.Physical)) for name := range s.Physical { physicalIDs = append(physicalIDs, name) } for i, vid := range ids { pickedPhysical := physicalIDs[i%len(physicalIDs)] virtual := s.virtualByName(vid) virtual.AssignedToPhysical = pickedPhysical physical := s.Physical[pickedPhysical] physical.OwnsVirtual = append(physical.OwnsVirtual, vid) physical.OwnsPercentage += virtual.OwnsPercentage s.Physical[pickedPhysical] = physical } } // uses linear search, but should only be used during shard init and update // operations, not in regular func (s *State) virtualByName(name string) *Virtual { for i := range s.Virtual { if s.Virtual[i].Name == name { return &s.Virtual[i] } } return nil } func (s *State) virtualByToken(token uint64) *Virtual { for i := range s.Virtual { if token > s.Virtual[i].Upper { continue } return &s.Virtual[i] } return &s.Virtual[0] } const shardNameChars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" func generateShardName() string { b := make([]byte, shardNameLength) for i := range b { b[i] = shardNameChars[rand.Intn(len(shardNameChars))] } return string(b) } func (s State) DeepCopy() State { var virtualCopy []Virtual physicalCopy := make(map[string]Physical, len(s.Physical)) for name, shard := range s.Physical { physicalCopy[name] = shard.DeepCopy() } if len(s.Virtual) > 0 { virtualCopy = make([]Virtual, len(s.Virtual)) } for i, virtual := range s.Virtual { virtualCopy[i] = virtual.DeepCopy() } return State{ localNodeName: s.localNodeName, IndexID: s.IndexID, Config: s.Config.DeepCopy(), Physical: physicalCopy, Virtual: virtualCopy, PartitioningEnabled: s.PartitioningEnabled, } } func (c Config) DeepCopy() Config { return Config{ VirtualPerPhysical: c.VirtualPerPhysical, DesiredCount: c.DesiredCount, ActualCount: c.ActualCount, DesiredVirtualCount: c.DesiredVirtualCount, ActualVirtualCount: c.ActualVirtualCount, Key: c.Key, Strategy: c.Strategy, Function: c.Function, } } func (p Physical) DeepCopy() Physical { var ownsVirtualCopy []string if len(p.OwnsVirtual) > 0 { ownsVirtualCopy = make([]string, len(p.OwnsVirtual)) copy(ownsVirtualCopy, p.OwnsVirtual) } belongsCopy := make([]string, len(p.BelongsToNodes)) copy(belongsCopy, p.BelongsToNodes) return Physical{ Name: p.Name, OwnsVirtual: ownsVirtualCopy, OwnsPercentage: p.OwnsPercentage, BelongsToNodes: belongsCopy, Status: p.Status, } } func (v Virtual) DeepCopy() Virtual { return Virtual{ Name: v.Name, Upper: v.Upper, OwnsPercentage: v.OwnsPercentage, AssignedToPhysical: v.AssignedToPhysical, } }