Spaces:
Sleeping
Sleeping
| // _ _ | |
| // __ _____ __ ___ ___ __ _| |_ ___ | |
| // \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
| // \ V V / __/ (_| |\ V /| | (_| | || __/ | |
| // \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
| // | |
| // Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. | |
| // | |
| // CONTACT: [email protected] | |
| // | |
| package schema | |
| import ( | |
| "context" | |
| "fmt" | |
| "reflect" | |
| "github.com/pkg/errors" | |
| "github.com/weaviate/weaviate/entities/models" | |
| "github.com/weaviate/weaviate/entities/schema" | |
| "github.com/weaviate/weaviate/usecases/replica" | |
| "github.com/weaviate/weaviate/usecases/sharding" | |
| ) | |
| func (m *Manager) UpdateClass(ctx context.Context, principal *models.Principal, | |
| className string, updated *models.Class, | |
| ) error { | |
| m.Lock() | |
| defer m.Unlock() | |
| err := m.Authorizer.Authorize(principal, "update", "schema/objects") | |
| if err != nil { | |
| return err | |
| } | |
| initial := m.getClassByName(className) | |
| if initial == nil { | |
| return ErrNotFound | |
| } | |
| mtEnabled, err := validateUpdatingMT(initial, updated) | |
| if err != nil { | |
| return err | |
| } | |
| // make sure unset optionals on 'updated' don't lead to an error, as all | |
| // optionals would have been set with defaults on the initial already | |
| m.setClassDefaults(updated) | |
| if err := m.validateImmutableFields(initial, updated); err != nil { | |
| return err | |
| } | |
| if err := m.parseVectorIndexConfig(ctx, updated); err != nil { | |
| return err | |
| } | |
| if err := m.parseShardingConfig(ctx, updated); err != nil { | |
| return err | |
| } | |
| if err := m.migrator.ValidateVectorIndexConfigUpdate(ctx, | |
| initial.VectorIndexConfig.(schema.VectorIndexConfig), | |
| updated.VectorIndexConfig.(schema.VectorIndexConfig)); err != nil { | |
| return errors.Wrap(err, "vector index config") | |
| } | |
| if err := m.migrator.ValidateInvertedIndexConfigUpdate(ctx, | |
| initial.InvertedIndexConfig, updated.InvertedIndexConfig); err != nil { | |
| return errors.Wrap(err, "inverted index config") | |
| } | |
| if err := validateShardingConfig(initial, updated, mtEnabled, m.clusterState); err != nil { | |
| return fmt.Errorf("validate sharding config: %w", err) | |
| } | |
| if err := replica.ValidateConfigUpdate(initial, updated, m.clusterState); err != nil { | |
| return fmt.Errorf("replication config: %w", err) | |
| } | |
| updatedSharding := updated.ShardingConfig.(sharding.Config) | |
| initialRF := initial.ReplicationConfig.Factor | |
| updatedRF := updated.ReplicationConfig.Factor | |
| var updatedState *sharding.State | |
| if initialRF != updatedRF { | |
| uss, err := m.scaleOut.Scale(ctx, className, updatedSharding, initialRF, updatedRF) | |
| if err != nil { | |
| return errors.Wrapf(err, "scale out from %d to %d replicas", | |
| initialRF, updatedRF) | |
| } | |
| updatedState = uss | |
| } | |
| tx, err := m.cluster.BeginTransaction(ctx, UpdateClass, | |
| UpdateClassPayload{className, updated, updatedState}, DefaultTxTTL) | |
| if err != nil { | |
| // possible causes for errors could be nodes down (we expect every node to | |
| // the up for a schema transaction) or concurrent transactions from other | |
| // nodes | |
| return errors.Wrap(err, "open cluster-wide transaction") | |
| } | |
| if err := m.cluster.CommitWriteTransaction(ctx, tx); err != nil { | |
| return errors.Wrap(err, "commit cluster-wide transaction") | |
| } | |
| return m.updateClassApplyChanges(ctx, className, updated, updatedState) | |
| } | |
| // validateUpdatingMT validates toggling MT and returns whether mt is enabled | |
| func validateUpdatingMT(current, update *models.Class) (enabled bool, err error) { | |
| enabled = schema.MultiTenancyEnabled(current) | |
| if schema.MultiTenancyEnabled(update) != enabled { | |
| if enabled { | |
| err = fmt.Errorf("disabling multi-tenancy for an existing class is not supported") | |
| } else { | |
| err = fmt.Errorf("enabling multi-tenancy for an existing class is not supported") | |
| } | |
| } | |
| return | |
| } | |
| func validateShardingConfig(current, update *models.Class, mtEnabled bool, cl clusterState) error { | |
| if mtEnabled { | |
| return nil | |
| } | |
| first, ok := current.ShardingConfig.(sharding.Config) | |
| if !ok { | |
| return fmt.Errorf("current config is not well-formed") | |
| } | |
| second, ok := update.ShardingConfig.(sharding.Config) | |
| if !ok { | |
| return fmt.Errorf("updated config is not well-formed") | |
| } | |
| if err := sharding.ValidateConfigUpdate(first, second, cl); err != nil { | |
| return err | |
| } | |
| return nil | |
| } | |
| func (m *Manager) updateClassApplyChanges(ctx context.Context, className string, | |
| updated *models.Class, updatedShardingState *sharding.State, | |
| ) error { | |
| if updatedShardingState != nil { | |
| // the sharding state caches the node name, we must therefore set this | |
| // explicitly now. | |
| updatedShardingState.SetLocalName(m.clusterState.LocalName()) | |
| } | |
| if err := m.migrator.UpdateVectorIndexConfig(ctx, | |
| className, updated.VectorIndexConfig.(schema.VectorIndexConfig)); err != nil { | |
| return errors.Wrap(err, "vector index config") | |
| } | |
| if err := m.migrator.UpdateInvertedIndexConfig(ctx, className, | |
| updated.InvertedIndexConfig); err != nil { | |
| return errors.Wrap(err, "inverted index config") | |
| } | |
| if !m.schemaCache.classExist(className) { | |
| return ErrNotFound | |
| } | |
| payload, err := CreateClassPayload(updated, updatedShardingState) | |
| if err != nil { | |
| return err | |
| } | |
| payload.ReplaceShards = updatedShardingState != nil | |
| // can be improved by updating the diff | |
| m.schemaCache.updateClass(updated, updatedShardingState) | |
| m.logger. | |
| WithField("action", "schema.update_class"). | |
| Debug("saving updated schema to configuration store") | |
| // payload.Shards | |
| if err := m.repo.UpdateClass(ctx, payload); err != nil { | |
| return err | |
| } | |
| m.triggerSchemaUpdateCallbacks() | |
| return nil | |
| } | |
| func (m *Manager) validateImmutableFields(initial, updated *models.Class) error { | |
| immutableFields := []immutableText{ | |
| { | |
| name: "class name", | |
| accessor: func(c *models.Class) string { return c.Class }, | |
| }, | |
| { | |
| name: "vectorizer", | |
| accessor: func(c *models.Class) string { return c.Vectorizer }, | |
| }, | |
| { | |
| name: "vector index type", | |
| accessor: func(c *models.Class) string { return c.VectorIndexType }, | |
| }, | |
| } | |
| for _, u := range immutableFields { | |
| if err := m.validateImmutableTextField(u, initial, updated); err != nil { | |
| return err | |
| } | |
| } | |
| if !reflect.DeepEqual(initial.Properties, updated.Properties) { | |
| return errors.Errorf( | |
| "properties cannot be updated through updating the class. Use the add " + | |
| "property feature (e.g. \"POST /v1/schema/{className}/properties\") " + | |
| "to add additional properties") | |
| } | |
| if !reflect.DeepEqual(initial.ModuleConfig, updated.ModuleConfig) { | |
| return errors.Errorf("module config is immutable") | |
| } | |
| return nil | |
| } | |
| type immutableText struct { | |
| accessor func(c *models.Class) string | |
| name string | |
| } | |
| func (m *Manager) validateImmutableTextField(u immutableText, | |
| previous, next *models.Class, | |
| ) error { | |
| oldField := u.accessor(previous) | |
| newField := u.accessor(next) | |
| if oldField != newField { | |
| return errors.Errorf("%s is immutable: attempted change from %q to %q", | |
| u.name, oldField, newField) | |
| } | |
| return nil | |
| } | |
| func (m *Manager) UpdateShardStatus(ctx context.Context, principal *models.Principal, | |
| className, shardName, targetStatus string, | |
| ) error { | |
| err := m.Authorizer.Authorize(principal, "update", | |
| fmt.Sprintf("schema/%s/shards/%s", className, shardName)) | |
| if err != nil { | |
| return err | |
| } | |
| return m.migrator.UpdateShardStatus(ctx, className, shardName, targetStatus) | |
| } | |