KevinStephenson
Adding in weaviate code
b110593
raw
history blame
4.75 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package schema
import (
"context"
"encoding/json"
"fmt"
"strings"
"github.com/weaviate/weaviate/entities/models"
"github.com/weaviate/weaviate/entities/schema"
)
// AddClassProperty to an existing Class
func (m *Manager) AddClassProperty(ctx context.Context, principal *models.Principal,
class string, property *models.Property,
) error {
err := m.Authorizer.Authorize(principal, "update", "schema/objects")
if err != nil {
return err
}
if property.Name == "" {
return fmt.Errorf("property must contain name")
}
if property.DataType == nil {
return fmt.Errorf("property must contain dataType")
}
return m.addClassProperty(ctx, class, property)
}
func (m *Manager) addClassProperty(ctx context.Context,
className string, prop *models.Property,
) error {
m.Lock()
defer m.Unlock()
class, err := m.schemaCache.readOnlyClass(className)
if err != nil {
return err
}
prop.Name = schema.LowercaseFirstLetter(prop.Name)
existingPropertyNames := map[string]bool{}
for _, existingProperty := range class.Properties {
existingPropertyNames[strings.ToLower(existingProperty.Name)] = true
}
if err := m.setNewPropDefaults(class, prop); err != nil {
return err
}
if err := m.validateProperty(prop, className, existingPropertyNames, false); err != nil {
return err
}
// migrate only after validation in completed
migratePropertySettings(prop)
tx, err := m.cluster.BeginTransaction(ctx, AddProperty,
AddPropertyPayload{className, prop}, DefaultTxTTL)
if err != nil {
// possible causes for errors could be nodes down (we expect every node to
// the up for a schema transaction) or concurrent transactions from other
// nodes
return fmt.Errorf("open cluster-wide transaction: %w", err)
}
if err = m.cluster.CommitWriteTransaction(ctx, tx); err != nil {
// Only log the commit error, but do not abort the changes locally. Once
// we've told others to commit, we also need to commit ourselves!
//
// The idea is that if we abort our changes we are guaranteed to create an
// inconsistency as soon as any other node honored the commit. This would
// for example be the case in a 3-node cluster where node 1 is the
// coordinator, node 2 honored the commit and node 3 died during the commit
// phase.
//
// In this scenario it is far more desirable to make sure that node 1 and
// node 2 stay in sync, as node 3 - who may or may not have missed the
// update - can use a local WAL from the first TX phase to replay any
// missing changes once it's back.
m.logger.WithError(err).Errorf("not every node was able to commit")
}
return m.addClassPropertyApplyChanges(ctx, className, prop)
}
func (m *Manager) setNewPropDefaults(class *models.Class, prop *models.Property) error {
setPropertyDefaults(prop)
if err := validateUserProp(class, prop); err != nil {
return err
}
m.moduleConfig.SetSinglePropertyDefaults(class, prop)
return nil
}
func validateUserProp(class *models.Class, prop *models.Property) error {
if prop.ModuleConfig == nil {
return nil
} else {
modconfig, ok := prop.ModuleConfig.(map[string]interface{})
if !ok {
return fmt.Errorf("%v property config invalid", prop.Name)
}
vectorizerConfig, ok := modconfig[class.Vectorizer]
if !ok {
return fmt.Errorf("%v vectorizer module not part of the property", class.Vectorizer)
}
_, ok = vectorizerConfig.(map[string]interface{})
if !ok {
return fmt.Errorf("vectorizer config for vectorizer %v, not of type map[string]interface{}", class.Vectorizer)
}
}
return nil
}
func (m *Manager) addClassPropertyApplyChanges(ctx context.Context,
className string, prop *models.Property,
) error {
class, err := m.schemaCache.addProperty(className, prop)
if err != nil {
return err
}
metadata, err := json.Marshal(&class)
if err != nil {
return fmt.Errorf("marshal class %s: %w", className, err)
}
m.logger.
WithField("action", "schema.add_property").
Debug("saving updated schema to configuration store")
err = m.repo.UpdateClass(ctx, ClassPayload{Name: className, Metadata: metadata})
if err != nil {
return err
}
m.triggerSchemaUpdateCallbacks()
// will result in a mismatch between schema and index if function below fails
return m.migrator.AddProperty(ctx, className, prop)
}