Spaces:
Running
Running
// _ _ | |
// __ _____ __ ___ ___ __ _| |_ ___ | |
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
// \ V V / __/ (_| |\ V /| | (_| | || __/ | |
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
// | |
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. | |
// | |
// CONTACT: [email protected] | |
// | |
/* Remark: | |
In the current implementation, there is no guarantee of consistent updates to the schema | |
as updating the actual index and the schema itself is not an atomic operation. | |
Resolving this issue is beyond the scope of this PR, | |
but it will be addressed in a separate task specifically dedicated to it. | |
*/ | |
package schema | |
import ( | |
"context" | |
"encoding/json" | |
"fmt" | |
"strings" | |
"github.com/pkg/errors" | |
"github.com/prometheus/client_golang/prometheus" | |
"github.com/weaviate/weaviate/adapters/repos/db/inverted/stopwords" | |
"github.com/weaviate/weaviate/entities/backup" | |
"github.com/weaviate/weaviate/entities/models" | |
"github.com/weaviate/weaviate/entities/schema" | |
"github.com/weaviate/weaviate/usecases/config" | |
"github.com/weaviate/weaviate/usecases/monitoring" | |
"github.com/weaviate/weaviate/usecases/replica" | |
"github.com/weaviate/weaviate/usecases/sharding" | |
) | |
// AddClass to the schema | |
func (m *Manager) AddClass(ctx context.Context, principal *models.Principal, | |
class *models.Class, | |
) error { | |
err := m.Authorizer.Authorize(principal, "create", "schema/objects") | |
if err != nil { | |
return err | |
} | |
shardState, err := m.addClass(ctx, class) | |
if err != nil { | |
return err | |
} | |
// call to migrator needs to be outside the lock that is set in addClass | |
return m.migrator.AddClass(ctx, class, shardState) | |
// TODO gh-846: Rollback state update if migration fails | |
} | |
func (m *Manager) RestoreClass(ctx context.Context, d *backup.ClassDescriptor, nodeMapping map[string]string) error { | |
// get schema and sharding state | |
class := &models.Class{} | |
if err := json.Unmarshal(d.Schema, &class); err != nil { | |
return fmt.Errorf("marshal class schema: %w", err) | |
} | |
var shardingState sharding.State | |
if d.ShardingState != nil { | |
err := json.Unmarshal(d.ShardingState, &shardingState) | |
if err != nil { | |
return fmt.Errorf("marshal sharding state: %w", err) | |
} | |
} | |
m.Lock() | |
defer m.Unlock() | |
metric, err := monitoring.GetMetrics().BackupRestoreClassDurations.GetMetricWithLabelValues(class.Class) | |
if err == nil { | |
timer := prometheus.NewTimer(metric) | |
defer timer.ObserveDuration() | |
} | |
class.Class = schema.UppercaseClassName(class.Class) | |
class.Properties = schema.LowercaseAllPropertyNames(class.Properties) | |
m.setClassDefaults(class) | |
err = m.validateCanAddClass(ctx, class, true) | |
if err != nil { | |
return err | |
} | |
// migrate only after validation in completed | |
m.migrateClassSettings(class) | |
err = m.parseShardingConfig(ctx, class) | |
if err != nil { | |
return err | |
} | |
err = m.parseVectorIndexConfig(ctx, class) | |
if err != nil { | |
return err | |
} | |
err = m.invertedConfigValidator(class.InvertedIndexConfig) | |
if err != nil { | |
return err | |
} | |
shardingState.MigrateFromOldFormat() | |
shardingState.ApplyNodeMapping(nodeMapping) | |
payload, err := CreateClassPayload(class, &shardingState) | |
if err != nil { | |
return err | |
} | |
shardingState.SetLocalName(m.clusterState.LocalName()) | |
m.schemaCache.addClass(class, &shardingState) | |
if err := m.repo.NewClass(ctx, payload); err != nil { | |
return err | |
} | |
m.logger. | |
WithField("action", "schema_restore_class"). | |
Debugf("restore class %q from schema", class.Class) | |
m.triggerSchemaUpdateCallbacks() | |
out := m.migrator.AddClass(ctx, class, &shardingState) | |
return out | |
} | |
func (m *Manager) addClass(ctx context.Context, class *models.Class, | |
) (*sharding.State, error) { | |
m.Lock() | |
defer m.Unlock() | |
class.Class = schema.UppercaseClassName(class.Class) | |
class.Properties = schema.LowercaseAllPropertyNames(class.Properties) | |
if class.ShardingConfig != nil && schema.MultiTenancyEnabled(class) { | |
return nil, fmt.Errorf("cannot have both shardingConfig and multiTenancyConfig") | |
} else if class.MultiTenancyConfig == nil { | |
class.MultiTenancyConfig = &models.MultiTenancyConfig{} | |
} else if class.MultiTenancyConfig.Enabled { | |
class.ShardingConfig = sharding.Config{DesiredCount: 0} // tenant shards will be created dynamically | |
} | |
m.setClassDefaults(class) | |
err := m.validateCanAddClass(ctx, class, false) | |
if err != nil { | |
return nil, err | |
} | |
// migrate only after validation in completed | |
m.migrateClassSettings(class) | |
err = m.parseShardingConfig(ctx, class) | |
if err != nil { | |
return nil, err | |
} | |
err = m.parseVectorIndexConfig(ctx, class) | |
if err != nil { | |
return nil, err | |
} | |
err = m.invertedConfigValidator(class.InvertedIndexConfig) | |
if err != nil { | |
return nil, err | |
} | |
shardState, err := sharding.InitState(class.Class, | |
class.ShardingConfig.(sharding.Config), | |
m.clusterState, class.ReplicationConfig.Factor, | |
schema.MultiTenancyEnabled(class)) | |
if err != nil { | |
return nil, errors.Wrap(err, "init sharding state") | |
} | |
tx, err := m.cluster.BeginTransaction(ctx, AddClass, | |
AddClassPayload{class, shardState}, DefaultTxTTL) | |
if err != nil { | |
// possible causes for errors could be nodes down (we expect every node to | |
// the up for a schema transaction) or concurrent transactions from other | |
// nodes | |
return nil, errors.Wrap(err, "open cluster-wide transaction") | |
} | |
if err := m.cluster.CommitWriteTransaction(ctx, tx); err != nil { | |
// Only log the commit error, but do not abort the changes locally. Once | |
// we've told others to commit, we also need to commit ourselves! | |
// | |
// The idea is that if we abort our changes we are guaranteed to create an | |
// inconsistency as soon as any other node honored the commit. This would | |
// for example be the case in a 3-node cluster where node 1 is the | |
// coordinator, node 2 honored the commit and node 3 died during the commit | |
// phase. | |
// | |
// In this scenario it is far more desirable to make sure that node 1 and | |
// node 2 stay in sync, as node 3 - who may or may not have missed the | |
// update - can use a local WAL from the first TX phase to replay any | |
// missing changes once it's back. | |
m.logger.WithError(err).Errorf("not every node was able to commit") | |
} | |
if err := m.addClassApplyChanges(ctx, class, shardState); err != nil { | |
return nil, err | |
} | |
return shardState, nil | |
} | |
func (m *Manager) addClassApplyChanges(ctx context.Context, class *models.Class, | |
shardingState *sharding.State, | |
) error { | |
payload, err := CreateClassPayload(class, shardingState) | |
if err != nil { | |
return err | |
} | |
if err := m.repo.NewClass(ctx, payload); err != nil { | |
return err | |
} | |
m.logger. | |
WithField("action", "schema_add_class"). | |
Debugf("add class %q from schema", class.Class) | |
m.schemaCache.addClass(class, shardingState) | |
m.triggerSchemaUpdateCallbacks() | |
return nil | |
} | |
func (m *Manager) setClassDefaults(class *models.Class) { | |
if class.Vectorizer == "" { | |
class.Vectorizer = m.config.DefaultVectorizerModule | |
} | |
if class.VectorIndexType == "" { | |
class.VectorIndexType = "hnsw" | |
} | |
if m.config.DefaultVectorDistanceMetric != "" { | |
if class.VectorIndexConfig == nil { | |
class.VectorIndexConfig = map[string]interface{}{"distance": m.config.DefaultVectorDistanceMetric} | |
} else if class.VectorIndexConfig.(map[string]interface{})["distance"] == nil { | |
class.VectorIndexConfig.(map[string]interface{})["distance"] = m.config.DefaultVectorDistanceMetric | |
} | |
} | |
setInvertedConfigDefaults(class) | |
for _, prop := range class.Properties { | |
setPropertyDefaults(prop) | |
} | |
m.moduleConfig.SetClassDefaults(class) | |
} | |
func setPropertyDefaults(prop *models.Property) { | |
setPropertyDefaultTokenization(prop) | |
setPropertyDefaultIndexing(prop) | |
setNestedPropertiesDefaults(prop.NestedProperties) | |
} | |
func setPropertyDefaultTokenization(prop *models.Property) { | |
switch dataType, _ := schema.AsPrimitive(prop.DataType); dataType { | |
case schema.DataTypeString, schema.DataTypeStringArray: | |
// deprecated as of v1.19, default tokenization was word | |
// which will be migrated to text+whitespace | |
if prop.Tokenization == "" { | |
prop.Tokenization = models.PropertyTokenizationWord | |
} | |
case schema.DataTypeText, schema.DataTypeTextArray: | |
if prop.Tokenization == "" { | |
prop.Tokenization = models.PropertyTokenizationWord | |
} | |
default: | |
// tokenization not supported for other data types | |
} | |
} | |
func setPropertyDefaultIndexing(prop *models.Property) { | |
// if IndexInverted is set but IndexFilterable and IndexSearchable are not | |
// migrate IndexInverted later. | |
if prop.IndexInverted != nil && | |
prop.IndexFilterable == nil && | |
prop.IndexSearchable == nil { | |
return | |
} | |
vTrue := true | |
vFalse := false | |
if prop.IndexFilterable == nil { | |
prop.IndexFilterable = &vTrue | |
primitiveDataType, isPrimitive := schema.AsPrimitive(prop.DataType) | |
if isPrimitive && primitiveDataType == schema.DataTypeBlob { | |
prop.IndexFilterable = &vFalse | |
} | |
} | |
if prop.IndexSearchable == nil { | |
prop.IndexSearchable = &vFalse | |
if dataType, isPrimitive := schema.AsPrimitive(prop.DataType); isPrimitive { | |
switch dataType { | |
case schema.DataTypeString, schema.DataTypeStringArray: | |
// string/string[] are migrated to text/text[] later, | |
// at this point they are still valid data types, therefore should be handled here | |
prop.IndexSearchable = &vTrue | |
case schema.DataTypeText, schema.DataTypeTextArray: | |
prop.IndexSearchable = &vTrue | |
default: | |
// do nothing | |
} | |
} | |
} | |
} | |
func setNestedPropertiesDefaults(properties []*models.NestedProperty) { | |
for _, property := range properties { | |
primitiveDataType, isPrimitive := schema.AsPrimitive(property.DataType) | |
nestedDataType, isNested := schema.AsNested(property.DataType) | |
setNestedPropertyDefaultTokenization(property, primitiveDataType, nestedDataType, isPrimitive, isNested) | |
setNestedPropertyDefaultIndexing(property, primitiveDataType, nestedDataType, isPrimitive, isNested) | |
if isNested { | |
setNestedPropertiesDefaults(property.NestedProperties) | |
} | |
} | |
} | |
func setNestedPropertyDefaultTokenization(property *models.NestedProperty, | |
primitiveDataType, nestedDataType schema.DataType, | |
isPrimitive, isNested bool, | |
) { | |
if property.Tokenization == "" && isPrimitive { | |
switch primitiveDataType { | |
case schema.DataTypeText, schema.DataTypeTextArray: | |
property.Tokenization = models.NestedPropertyTokenizationWord | |
default: | |
// do nothing | |
} | |
} | |
} | |
func setNestedPropertyDefaultIndexing(property *models.NestedProperty, | |
primitiveDataType, nestedDataType schema.DataType, | |
isPrimitive, isNested bool, | |
) { | |
vTrue := true | |
vFalse := false | |
if property.IndexFilterable == nil { | |
property.IndexFilterable = &vTrue | |
if isPrimitive && primitiveDataType == schema.DataTypeBlob { | |
property.IndexFilterable = &vFalse | |
} | |
} | |
if property.IndexSearchable == nil { | |
property.IndexSearchable = &vFalse | |
if isPrimitive { | |
switch primitiveDataType { | |
case schema.DataTypeText, schema.DataTypeTextArray: | |
property.IndexSearchable = &vTrue | |
default: | |
// do nothing | |
} | |
} | |
} | |
} | |
func (m *Manager) migrateClassSettings(class *models.Class) { | |
for _, prop := range class.Properties { | |
migratePropertySettings(prop) | |
} | |
} | |
func migratePropertySettings(prop *models.Property) { | |
migratePropertyDataTypeAndTokenization(prop) | |
migratePropertyIndexInverted(prop) | |
} | |
// as of v1.19 DataTypeString and DataTypeStringArray are deprecated | |
// here both are changed to Text/TextArray | |
// and proper, backward compatible tokenization | |
func migratePropertyDataTypeAndTokenization(prop *models.Property) { | |
switch dataType, _ := schema.AsPrimitive(prop.DataType); dataType { | |
case schema.DataTypeString: | |
prop.DataType = schema.DataTypeText.PropString() | |
case schema.DataTypeStringArray: | |
prop.DataType = schema.DataTypeTextArray.PropString() | |
default: | |
// other types need no migration and do not support tokenization | |
return | |
} | |
switch prop.Tokenization { | |
case models.PropertyTokenizationWord: | |
prop.Tokenization = models.PropertyTokenizationWhitespace | |
case models.PropertyTokenizationField: | |
// stays field | |
} | |
} | |
// as of v1.19 IndexInverted is deprecated and replaced with | |
// IndexFilterable (set inverted index) | |
// and IndexSearchable (map inverted index with term frequencies; | |
// therefore applicable only to text/text[] data types) | |
func migratePropertyIndexInverted(prop *models.Property) { | |
// if none of new options is set, use inverted settings | |
if prop.IndexInverted != nil && | |
prop.IndexFilterable == nil && | |
prop.IndexSearchable == nil { | |
prop.IndexFilterable = prop.IndexInverted | |
switch dataType, _ := schema.AsPrimitive(prop.DataType); dataType { | |
// string/string[] are already migrated into text/text[], can be skipped here | |
case schema.DataTypeText, schema.DataTypeTextArray: | |
prop.IndexSearchable = prop.IndexInverted | |
default: | |
vFalse := false | |
prop.IndexSearchable = &vFalse | |
} | |
} | |
// new options have precedence so inverted can be reset | |
prop.IndexInverted = nil | |
} | |
func (m *Manager) validateCanAddClass( | |
ctx context.Context, class *models.Class, | |
relaxCrossRefValidation bool, | |
) error { | |
if err := m.validateClassNameUniqueness(class.Class); err != nil { | |
return err | |
} | |
if err := m.validateClassName(ctx, class.Class); err != nil { | |
return err | |
} | |
existingPropertyNames := map[string]bool{} | |
for _, property := range class.Properties { | |
if err := m.validateProperty(property, class.Class, existingPropertyNames, relaxCrossRefValidation); err != nil { | |
return err | |
} | |
existingPropertyNames[strings.ToLower(property.Name)] = true | |
} | |
if err := m.validateVectorSettings(ctx, class); err != nil { | |
return err | |
} | |
if err := m.moduleConfig.ValidateClass(ctx, class); err != nil { | |
return err | |
} | |
if err := replica.ValidateConfig(class, m.config.Replication); err != nil { | |
return err | |
} | |
// all is fine! | |
return nil | |
} | |
func (m *Manager) validateProperty( | |
property *models.Property, className string, | |
existingPropertyNames map[string]bool, relaxCrossRefValidation bool, | |
) error { | |
if _, err := schema.ValidatePropertyName(property.Name); err != nil { | |
return err | |
} | |
if err := schema.ValidateReservedPropertyName(property.Name); err != nil { | |
return err | |
} | |
if existingPropertyNames[strings.ToLower(property.Name)] { | |
return fmt.Errorf("class %q: conflict for property %q: already in use or provided multiple times", property.Name, className) | |
} | |
// Validate data type of property. | |
sch := m.getSchema() | |
propertyDataType, err := (&sch).FindPropertyDataTypeWithRefs(property.DataType, | |
relaxCrossRefValidation, schema.ClassName(className)) | |
if err != nil { | |
return fmt.Errorf("property '%s': invalid dataType: %v", property.Name, err) | |
} | |
if propertyDataType.IsNested() { | |
if err := validateNestedProperties(property.NestedProperties, property.Name); err != nil { | |
return err | |
} | |
} else { | |
if len(property.NestedProperties) > 0 { | |
return fmt.Errorf("property '%s': nestedProperties not allowed for data types other than object/object[]", | |
property.Name) | |
} | |
} | |
if err := m.validatePropertyTokenization(property.Tokenization, propertyDataType); err != nil { | |
return err | |
} | |
if err := m.validatePropertyIndexing(property); err != nil { | |
return err | |
} | |
// all is fine! | |
return nil | |
} | |
func (m *Manager) parseVectorIndexConfig(ctx context.Context, | |
class *models.Class, | |
) error { | |
if class.VectorIndexType != "hnsw" && class.VectorIndexType != "flat" { | |
return errors.Errorf( | |
"parse vector index config: unsupported vector index type: %q", | |
class.VectorIndexType) | |
} | |
parsed, err := m.configParser(class.VectorIndexConfig, class.VectorIndexType) | |
if err != nil { | |
return errors.Wrap(err, "parse vector index config") | |
} | |
class.VectorIndexConfig = parsed | |
return nil | |
} | |
func (m *Manager) parseShardingConfig(ctx context.Context, class *models.Class) (err error) { | |
// multiTenancyConfig and shardingConfig are mutually exclusive | |
cfg := sharding.Config{} // cfg is empty in case of MT | |
if !schema.MultiTenancyEnabled(class) { | |
cfg, err = sharding.ParseConfig(class.ShardingConfig, | |
m.clusterState.NodeCount()) | |
if err != nil { | |
return fmt.Errorf("parse sharding config: %w", err) | |
} | |
} | |
class.ShardingConfig = cfg | |
return nil | |
} | |
func setInvertedConfigDefaults(class *models.Class) { | |
if class.InvertedIndexConfig == nil { | |
class.InvertedIndexConfig = &models.InvertedIndexConfig{} | |
} | |
if class.InvertedIndexConfig.CleanupIntervalSeconds == 0 { | |
class.InvertedIndexConfig.CleanupIntervalSeconds = config.DefaultCleanupIntervalSeconds | |
} | |
if class.InvertedIndexConfig.Bm25 == nil { | |
class.InvertedIndexConfig.Bm25 = &models.BM25Config{ | |
K1: config.DefaultBM25k1, | |
B: config.DefaultBM25b, | |
} | |
} | |
if class.InvertedIndexConfig.Stopwords == nil { | |
class.InvertedIndexConfig.Stopwords = &models.StopwordConfig{ | |
Preset: stopwords.EnglishPreset, | |
} | |
} | |
} | |
func CreateClassPayload(class *models.Class, | |
shardingState *sharding.State, | |
) (pl ClassPayload, err error) { | |
pl.Name = class.Class | |
if pl.Metadata, err = json.Marshal(class); err != nil { | |
return pl, fmt.Errorf("marshal class %q metadata: %w", pl.Name, err) | |
} | |
if shardingState != nil { | |
ss := *shardingState | |
pl.Shards = make([]KeyValuePair, len(ss.Physical)) | |
i := 0 | |
for name, shard := range ss.Physical { | |
data, err := json.Marshal(shard) | |
if err != nil { | |
return pl, fmt.Errorf("marshal shard %q metadata: %w", name, err) | |
} | |
pl.Shards[i] = KeyValuePair{Key: name, Value: data} | |
i++ | |
} | |
ss.Physical = nil | |
if pl.ShardingState, err = json.Marshal(&ss); err != nil { | |
return pl, fmt.Errorf("marshal class %q sharding state: %w", pl.Name, err) | |
} | |
} | |
return pl, nil | |
} | |