Spaces:
Running
Running
File size: 4,413 Bytes
b110593 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package sharding
import (
"encoding/json"
"fmt"
"github.com/pkg/errors"
)
const (
DefaultVirtualPerPhysical = 128
DefaultKey = "_id"
DefaultStrategy = "hash"
DefaultFunction = "murmur3"
)
type Config struct {
VirtualPerPhysical int `json:"virtualPerPhysical"`
DesiredCount int `json:"desiredCount"`
ActualCount int `json:"actualCount"`
DesiredVirtualCount int `json:"desiredVirtualCount"`
ActualVirtualCount int `json:"actualVirtualCount"`
Key string `json:"key"`
Strategy string `json:"strategy"`
Function string `json:"function"`
}
func (c *Config) setDefaults(nodeCount int) {
c.VirtualPerPhysical = DefaultVirtualPerPhysical
c.DesiredCount = nodeCount
c.DesiredVirtualCount = c.DesiredCount * c.VirtualPerPhysical
c.Function = DefaultFunction
c.Key = DefaultKey
c.Strategy = DefaultStrategy
// these will only differ once there is an async component through replication
// or dynamic scaling. For now they have to be the same
c.ActualCount = c.DesiredCount
c.ActualVirtualCount = c.DesiredVirtualCount
}
func (c *Config) validate() error {
if c.Key != "_id" {
return errors.Errorf("sharding only supported on key '_id' for now, "+
"got: %s", c.Key)
}
if c.Strategy != "hash" {
return errors.Errorf("sharding only supported with strategy 'hash' for now, "+
"got: %s", c.Strategy)
}
if c.Function != "murmur3" {
return errors.Errorf("sharding only supported with function 'murmur3' for now, "+
"got: %s", c.Function)
}
return nil
}
func ParseConfig(input interface{}, nodeCount int) (Config, error) {
out := Config{}
out.setDefaults(nodeCount)
if input == nil {
return out, nil
}
asMap, ok := input.(map[string]interface{})
if !ok || asMap == nil {
return out, fmt.Errorf("input must be a non-nil map")
}
if err := optionalIntFromMap(asMap, "virtualPerPhysical", func(v int) {
out.VirtualPerPhysical = v
}); err != nil {
return out, err
}
if err := optionalIntFromMap(asMap, "desiredCount", func(v int) {
out.DesiredCount = v
}); err != nil {
return out, err
}
out.DesiredVirtualCount = out.DesiredCount * out.VirtualPerPhysical
if err := optionalIntFromMap(asMap, "desiredCount", func(v int) {
out.DesiredCount = v
}); err != nil {
return out, err
}
if err := optionalStringFromMap(asMap, "key", func(v string) {
out.Key = v
}); err != nil {
return out, err
}
if err := optionalStringFromMap(asMap, "strategy", func(v string) {
out.Strategy = v
}); err != nil {
return out, err
}
if err := optionalStringFromMap(asMap, "function", func(v string) {
out.Function = v
}); err != nil {
return out, err
}
// these will only differ once there is an async component through replication
// or dynamic scaling. For now they have to be the same
out.ActualCount = out.DesiredCount
out.ActualVirtualCount = out.DesiredVirtualCount
if err := out.validate(); err != nil {
return out, err
}
return out, nil
}
func optionalIntFromMap(in map[string]interface{}, name string,
setFn func(v int),
) error {
value, ok := in[name]
if !ok {
return nil
}
var asInt64 int64
var err error
// depending on whether we get the results from disk or from the REST API,
// numbers may be represented slightly differently
switch typed := value.(type) {
case json.Number:
asInt64, err = typed.Int64()
case int:
asInt64 = int64(typed)
case float64:
asInt64 = int64(typed)
}
if err != nil {
return errors.Wrapf(err, "%s", name)
}
setFn(int(asInt64))
return nil
}
func optionalStringFromMap(in map[string]interface{}, name string,
setFn func(v string),
) error {
value, ok := in[name]
if !ok {
return nil
}
asString, ok := value.(string)
if !ok {
return errors.Errorf("field %q must be of type string, got: %T", name, value)
}
setFn(asString)
return nil
}
|