KevinStephenson
Adding in weaviate code
b110593
raw
history blame
15.7 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package schema
import (
"context"
"fmt"
"sync"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/weaviate/weaviate/entities/models"
"github.com/weaviate/weaviate/entities/replication"
"github.com/weaviate/weaviate/entities/schema"
"github.com/weaviate/weaviate/usecases/cluster"
"github.com/weaviate/weaviate/usecases/config"
"github.com/weaviate/weaviate/usecases/replica"
"github.com/weaviate/weaviate/usecases/scaler"
"github.com/weaviate/weaviate/usecases/schema/migrate"
"github.com/weaviate/weaviate/usecases/sharding"
)
// Manager Manages schema changes at a use-case level, i.e. agnostic of
// underlying databases or storage providers
type Manager struct {
migrator migrate.Migrator
repo SchemaStore
callbacks []func(updatedSchema schema.Schema)
logger logrus.FieldLogger
Authorizer authorizer
config config.Config
vectorizerValidator VectorizerValidator
moduleConfig ModuleConfig
cluster *cluster.TxManager
clusterState clusterState
configParser VectorConfigParser
invertedConfigValidator InvertedConfigValidator
scaleOut scaleOut
RestoreStatus sync.Map
RestoreError sync.Map
sync.RWMutex
// As outlined in [*cluster.TxManager.TryResumeDanglingTxs] the current
// implementation isn't perfect. It does not actually know if a tx was meant
// to be committed or not. Instead we do a simple workaround. We check if the
// schema is out of sync and only then do we try to resume transactions.
shouldTryToResumeTx bool
schemaCache
}
type VectorConfigParser func(in interface{}, vectorIndexType string) (schema.VectorIndexConfig, error)
type InvertedConfigValidator func(in *models.InvertedIndexConfig) error
type SchemaGetter interface {
GetSchemaSkipAuth() schema.Schema
Nodes() []string
NodeName() string
ClusterHealthScore() int
ResolveParentNodes(string, string) (map[string]string, error)
CopyShardingState(class string) *sharding.State
ShardOwner(class, shard string) (string, error)
TenantShard(class, tenant string) (string, string)
ShardFromUUID(class string, uuid []byte) string
ShardReplicas(class, shard string) ([]string, error)
}
type VectorizerValidator interface {
ValidateVectorizer(moduleName string) error
}
type ModuleConfig interface {
SetClassDefaults(class *models.Class)
SetSinglePropertyDefaults(class *models.Class, prop *models.Property)
ValidateClass(ctx context.Context, class *models.Class) error
}
// SchemaStore is responsible for persisting the schema
// by providing support for both partial and complete schema updates
type SchemaStore interface {
// Save saves the complete schema to the persistent storage
Save(ctx context.Context, schema State) error
// Load loads the complete schema from the persistent storage
Load(context.Context) (State, error)
// NewClass creates a new class if it doesn't exists, otherwise return an error
NewClass(context.Context, ClassPayload) error
// UpdateClass if it exists, otherwise return an error
UpdateClass(context.Context, ClassPayload) error
// DeleteClass deletes class
DeleteClass(ctx context.Context, class string) error
// NewShards creates new shards of an existing class
NewShards(ctx context.Context, class string, shards []KeyValuePair) error
// UpdateShards updates (replaces) shards of on existing class
// Error is returned if class or shard does not exist
UpdateShards(ctx context.Context, class string, shards []KeyValuePair) error
// DeleteShards deletes shards from a class
// If the class or a shard does not exist then nothing is done and a nil error is returned
DeleteShards(ctx context.Context, class string, shards []string) error
}
// KeyValuePair is used to serialize shards updates
type KeyValuePair struct {
Key string
Value []byte
}
// ClassPayload is used to serialize class updates
type ClassPayload struct {
Name string
Metadata []byte
ShardingState []byte
Shards []KeyValuePair
ReplaceShards bool
Error error
}
type clusterState interface {
// Hostnames initializes a broadcast
Hostnames() []string
// AllNames initializes shard distribution across nodes
AllNames() []string
Candidates() []string
LocalName() string
NodeCount() int
NodeHostname(nodeName string) (string, bool)
// ClusterHealthScore gets the whole cluster health, the lower number the better
ClusterHealthScore() int
SchemaSyncIgnored() bool
SkipSchemaRepair() bool
}
type scaleOut interface {
SetSchemaManager(sm scaler.SchemaManager)
Scale(ctx context.Context, className string,
updated sharding.Config, prevReplFactor, newReplFactor int64) (*sharding.State, error)
}
// NewManager creates a new manager
func NewManager(migrator migrate.Migrator, repo SchemaStore,
logger logrus.FieldLogger, authorizer authorizer, config config.Config,
configParser VectorConfigParser, vectorizerValidator VectorizerValidator,
invertedConfigValidator InvertedConfigValidator,
moduleConfig ModuleConfig, clusterState clusterState,
txClient cluster.Client, txPersistence cluster.Persistence,
scaleoutManager scaleOut,
) (*Manager, error) {
txBroadcaster := cluster.NewTxBroadcaster(clusterState, txClient)
m := &Manager{
config: config,
migrator: migrator,
repo: repo,
schemaCache: schemaCache{State: State{}},
logger: logger,
Authorizer: authorizer,
configParser: configParser,
vectorizerValidator: vectorizerValidator,
invertedConfigValidator: invertedConfigValidator,
moduleConfig: moduleConfig,
cluster: cluster.NewTxManager(txBroadcaster, txPersistence, logger),
clusterState: clusterState,
scaleOut: scaleoutManager,
}
m.scaleOut.SetSchemaManager(m)
m.cluster.SetCommitFn(m.handleCommit)
m.cluster.SetResponseFn(m.handleTxResponse)
m.cluster.SetAllowUnready(allowUnreadyTxs)
txBroadcaster.SetConsensusFunction(newReadConsensus(m.parseConfigs, m.logger))
err := m.loadOrInitializeSchema(context.Background())
if err != nil {
return nil, fmt.Errorf("could not load or initialize schema: %v", err)
}
return m, nil
}
func (m *Manager) Shutdown(ctx context.Context) error {
allCommitsDone := make(chan struct{})
go func() {
m.cluster.Shutdown()
allCommitsDone <- struct{}{}
}()
select {
case <-ctx.Done():
return fmt.Errorf("waiting for transactions to commit: %w", ctx.Err())
case <-allCommitsDone:
return nil
}
}
func (m *Manager) TxManager() *cluster.TxManager {
return m.cluster
}
type authorizer interface {
Authorize(principal *models.Principal, verb, resource string) error
}
func (m *Manager) saveSchema(ctx context.Context, st State) error {
m.logger.
WithField("action", "schema.save").
Debug("saving updated schema to configuration store")
if err := m.repo.Save(ctx, st); err != nil {
return err
}
m.triggerSchemaUpdateCallbacks()
return nil
}
// RegisterSchemaUpdateCallback allows other usecases to register a primitive
// type update callback. The callbacks will be called any time we persist a
// schema update
func (m *Manager) RegisterSchemaUpdateCallback(callback func(updatedSchema schema.Schema)) {
m.callbacks = append(m.callbacks, callback)
}
func (m *Manager) triggerSchemaUpdateCallbacks() {
schema := m.getSchema()
for _, cb := range m.callbacks {
cb(schema)
}
}
func (m *Manager) loadOrInitializeSchema(ctx context.Context) error {
localSchema, err := m.repo.Load(ctx)
if err != nil {
return fmt.Errorf("could not load schema: %v", err)
}
if err := m.parseConfigs(ctx, &localSchema); err != nil {
return errors.Wrap(err, "load schema")
}
if err := m.migrateSchemaIfNecessary(ctx, &localSchema); err != nil {
return fmt.Errorf("migrate schema: %w", err)
}
// There was a bug that allowed adding the same prop multiple times. This
// leads to a race at startup. If an instance is already affected by this,
// this step can remove the duplicate ones.
//
// See https://github.com/weaviate/weaviate/issues/2609
for _, c := range localSchema.ObjectSchema.Classes {
c.Properties = m.deduplicateProps(c.Properties, c.Class)
}
// set internal state since it is used by startupClusterSync
m.schemaCache.setState(localSchema)
// make sure that all migrations have completed before checking sync,
// otherwise two identical schemas might fail the check based on form rather
// than content
if err := m.startupClusterSync(ctx); err != nil {
return errors.Wrap(err, "sync schema with other nodes in the cluster")
}
// store in persistent storage
// TODO: investigate if save() is redundant because it is called in startupClusterSync()
err = m.RLockGuard(func() error { return m.repo.Save(ctx, m.schemaCache.State) })
if err != nil {
return fmt.Errorf("store to persistent storage: %v", err)
}
return nil
}
// StartServing indicates that the schema manager is ready to accept incoming
// connections in cluster mode, i.e. it will accept opening transactions.
//
// Some transactions are exempt, such as ReadSchema which is required for nodes
// to start up.
//
// This method should be called when all backends, primarily the DB, are ready
// to serve.
func (m *Manager) StartServing(ctx context.Context) error {
if err := m.resumeDanglingTransactions(ctx); err != nil {
return err
}
// only start accepting incoming connections when dangling txs have been
// resumed, otherwise there is potential for conflict
m.cluster.StartAcceptIncoming()
return nil
}
// resumeDanglingTransactions iterates over any transaction that may have been left
// dangling after a restart and retries to commit them if appropriate.
//
// This can only be called when all areas responding to side effects of
// committing a transaction are ready. In practice this means, the DB must be
// ready to try and call this method.
func (m *Manager) resumeDanglingTransactions(ctx context.Context) error {
var shouldResume bool
m.RLockGuard(func() error {
shouldResume = m.shouldTryToResumeTx
return nil
})
if !shouldResume {
// nothing to do for us
return nil
}
ok, err := m.cluster.TryResumeDanglingTxs(ctx, resumableTxs)
if err != nil {
return fmt.Errorf("try resuming dangling transactions: %w", err)
}
if !ok {
// no tx was applied, we are done
return nil
}
// a tx was applied which means the previous schema check was skipped, we
// now need to check the schema again
err = m.validateSchemaCorruption(ctx)
if err == nil {
// all is fine, continue as normal
return nil
}
if m.clusterState.SchemaSyncIgnored() {
m.logger.WithError(err).WithFields(logrusStartupSyncFields()).
Warning("schema out of sync, but ignored because " +
"CLUSTER_IGNORE_SCHEMA_SYNC=true")
return nil
}
return fmt.Errorf(
"applied dangling tx, but schema still out of sync: %w", err)
}
func (m *Manager) migrateSchemaIfNecessary(ctx context.Context, localSchema *State) error {
// introduced when Weaviate started supporting multi-shards per class in v1.8
if err := m.checkSingleShardMigration(ctx, localSchema); err != nil {
return errors.Wrap(err, "migrating sharding state from previous version")
}
// introduced when Weaviate started supporting replication in v1.17
if err := m.checkShardingStateForReplication(ctx, localSchema); err != nil {
return errors.Wrap(err, "migrating sharding state from previous version (before replication)")
}
// if other migrations become necessary in the future, you can add them here.
return nil
}
func (m *Manager) checkSingleShardMigration(ctx context.Context, localSchema *State) error {
for _, c := range localSchema.ObjectSchema.Classes {
if _, ok := localSchema.ShardingState[c.Class]; ok { // there is sharding state for this class. Nothing to do
continue
}
m.logger.WithField("className", c.Class).WithField("action", "initialize_schema").
Warningf("No sharding state found for class %q, initializing new state. "+
"This is expected behavior if the schema was created with an older Weaviate "+
"version, prior to supporting multi-shard indices.", c.Class)
// there is no sharding state for this class, let's create the correct
// config. This class must have been created prior to the sharding feature,
// so we now that the shardCount==1 - we do not care about any of the other
// parameters and simply use the defaults for those
c.ShardingConfig = map[string]interface{}{
"desiredCount": 1,
}
if err := m.parseShardingConfig(ctx, c); err != nil {
return err
}
if err := replica.ValidateConfig(c, m.config.Replication); err != nil {
return fmt.Errorf("validate replication config: %w", err)
}
shardState, err := sharding.InitState(c.Class,
c.ShardingConfig.(sharding.Config),
m.clusterState, c.ReplicationConfig.Factor,
schema.MultiTenancyEnabled(c))
if err != nil {
return errors.Wrap(err, "init sharding state")
}
if localSchema.ShardingState == nil {
localSchema.ShardingState = map[string]*sharding.State{}
}
localSchema.ShardingState[c.Class] = shardState
}
return nil
}
func (m *Manager) checkShardingStateForReplication(ctx context.Context, localSchema *State) error {
for _, classState := range localSchema.ShardingState {
classState.MigrateFromOldFormat()
}
return nil
}
func newSchema() *State {
return &State{
ObjectSchema: &models.Schema{
Classes: []*models.Class{},
},
ShardingState: map[string]*sharding.State{},
}
}
func (m *Manager) parseConfigs(ctx context.Context, schema *State) error {
for _, class := range schema.ObjectSchema.Classes {
for _, prop := range class.Properties {
setPropertyDefaults(prop)
migratePropertySettings(prop)
}
if err := m.parseVectorIndexConfig(ctx, class); err != nil {
return errors.Wrapf(err, "class %s: vector index config", class.Class)
}
if err := m.parseShardingConfig(ctx, class); err != nil {
return errors.Wrapf(err, "class %s: sharding config", class.Class)
}
// Pass dummy replication config with minimum factor 1. Otherwise the
// setting is not backward-compatible. The user may have created a class
// with factor=1 before the change was introduced. Now their setup would no
// longer start up if the required minimum is now higher than 1. We want
// the required minimum to only apply to newly created classes - not block
// loading existing ones.
if err := replica.ValidateConfig(class, replication.GlobalConfig{MinimumFactor: 1}); err != nil {
return fmt.Errorf("replication config: %w", err)
}
}
m.schemaCache.LockGuard(func() {
for _, shardState := range schema.ShardingState {
shardState.SetLocalName(m.clusterState.LocalName())
}
})
return nil
}