SemanticSearchPOC / usecases /schema /incoming_commit.go
KevinStephenson
Adding in weaviate code
b110593
raw
history blame
6.62 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package schema
import (
"context"
"encoding/json"
"fmt"
"github.com/pkg/errors"
"github.com/weaviate/weaviate/usecases/cluster"
)
func (m *Manager) handleCommit(ctx context.Context, tx *cluster.Transaction) error {
switch tx.Type {
case AddClass, RepairClass:
return m.handleAddClassCommit(ctx, tx)
case AddProperty, RepairProperty:
return m.handleAddPropertyCommit(ctx, tx)
case mergeObjectProperty:
return m.handleMergeObjectPropertyCommit(ctx, tx)
case DeleteClass:
return m.handleDeleteClassCommit(ctx, tx)
case UpdateClass:
return m.handleUpdateClassCommit(ctx, tx)
case addTenants, RepairTenant:
return m.handleAddTenantsCommit(ctx, tx)
case updateTenants:
return m.handleUpdateTenantsCommit(ctx, tx)
case deleteTenants:
return m.handleDeleteTenantsCommit(ctx, tx)
case ReadSchema:
return nil
default:
return errors.Errorf("unrecognized commit type %q", tx.Type)
}
}
func (m *Manager) handleTxResponse(ctx context.Context,
tx *cluster.Transaction,
) (data []byte, err error) {
if tx.Type != ReadSchema {
return nil, nil
}
m.schemaCache.RLockGuard(func() error {
tx.Payload = ReadSchemaPayload{
Schema: &m.schemaCache.State,
}
data, err = json.Marshal(tx)
tx.Payload = ReadSchemaPayload{}
return err
})
return
}
func (m *Manager) handleAddClassCommit(ctx context.Context,
tx *cluster.Transaction,
) error {
m.Lock()
pl, ok := tx.Payload.(AddClassPayload)
if !ok {
m.Unlock()
return errors.Errorf("expected commit payload to be AddClassPayload, but got %T",
tx.Payload)
}
err := m.handleAddClassCommitAndParse(ctx, &pl)
m.Unlock()
if err != nil {
return err
}
// call to migrator needs to be outside the lock that is set in addClass
return m.migrator.AddClass(ctx, pl.Class, pl.State)
}
func (m *Manager) handleAddClassCommitAndParse(ctx context.Context, pl *AddClassPayload) error {
if pl.Class == nil {
return fmt.Errorf("invalid tx: class is nil")
}
if pl.State == nil {
return fmt.Errorf("invalid tx: state is nil")
}
err := m.parseShardingConfig(ctx, pl.Class)
if err != nil {
return err
}
err = m.parseVectorIndexConfig(ctx, pl.Class)
if err != nil {
return err
}
pl.State.SetLocalName(m.clusterState.LocalName())
return m.addClassApplyChanges(ctx, pl.Class, pl.State)
}
func (m *Manager) handleAddPropertyCommit(ctx context.Context,
tx *cluster.Transaction,
) error {
m.Lock()
defer m.Unlock()
pl, ok := tx.Payload.(AddPropertyPayload)
if !ok {
return errors.Errorf("expected commit payload to be AddPropertyPayload, but got %T",
tx.Payload)
}
if pl.Property == nil {
return fmt.Errorf("invalid tx: property is nil")
}
return m.addClassPropertyApplyChanges(ctx, pl.ClassName, pl.Property)
}
func (m *Manager) handleMergeObjectPropertyCommit(ctx context.Context,
tx *cluster.Transaction,
) error {
m.Lock()
defer m.Unlock()
pl, ok := tx.Payload.(MergeObjectPropertyPayload)
if !ok {
return errors.Errorf("expected commit payload to be MergeObjectPropertyPayload, but got %T",
tx.Payload)
}
if pl.Property == nil {
return fmt.Errorf("invalid tx: property is nil")
}
return m.mergeClassObjectPropertyApplyChanges(ctx, pl.ClassName, pl.Property)
}
func (m *Manager) handleDeleteClassCommit(ctx context.Context,
tx *cluster.Transaction,
) error {
m.Lock()
defer m.Unlock()
pl, ok := tx.Payload.(DeleteClassPayload)
if !ok {
return errors.Errorf("expected commit payload to be DeleteClassPayload, but got %T",
tx.Payload)
}
return m.deleteClassApplyChanges(ctx, pl.ClassName)
}
func (m *Manager) handleUpdateClassCommit(ctx context.Context,
tx *cluster.Transaction,
) error {
m.Lock()
defer m.Unlock()
pl, ok := tx.Payload.(UpdateClassPayload)
if !ok {
return errors.Errorf("expected commit payload to be UpdateClassPayload, but got %T",
tx.Payload)
}
if pl.Class == nil {
return fmt.Errorf("invalid tx: class is nil")
}
// note that a nil state may be valid on an update_class tx, whereas it's not
// valid on a add_class. That's why we're not validating whether state is set
// here
if err := m.parseVectorIndexConfig(ctx, pl.Class); err != nil {
return err
}
if err := m.parseShardingConfig(ctx, pl.Class); err != nil {
return err
}
return m.updateClassApplyChanges(ctx, pl.ClassName, pl.Class, pl.State)
}
func (m *Manager) handleAddTenantsCommit(ctx context.Context,
tx *cluster.Transaction,
) error {
m.Lock()
defer m.Unlock()
req, ok := tx.Payload.(AddTenantsPayload)
if !ok {
return errors.Errorf("expected commit payload to be AddTenants, but got %T",
tx.Payload)
}
cls := m.getClassByName(req.Class)
if cls == nil {
return fmt.Errorf("class %q: %w", req.Class, ErrNotFound)
}
err := m.onAddTenants(ctx, cls, req)
if err != nil {
m.logger.WithField("action", "on_add_tenants").
WithField("n", len(req.Tenants)).
WithField("class", cls.Class).Error(err)
}
return err
}
func (m *Manager) handleUpdateTenantsCommit(ctx context.Context,
tx *cluster.Transaction,
) error {
m.Lock()
defer m.Unlock()
req, ok := tx.Payload.(UpdateTenantsPayload)
if !ok {
return errors.Errorf("expected commit payload to be UpdateTenants, but got %T",
tx.Payload)
}
cls := m.getClassByName(req.Class)
if cls == nil {
return fmt.Errorf("class %q: %w", req.Class, ErrNotFound)
}
err := m.onUpdateTenants(ctx, cls, req)
if err != nil {
m.logger.WithField("action", "on_add_tenants").
WithField("n", len(req.Tenants)).
WithField("class", cls.Class).Error(err)
}
return err
}
func (m *Manager) handleDeleteTenantsCommit(ctx context.Context,
tx *cluster.Transaction,
) error {
m.Lock()
defer m.Unlock()
req, ok := tx.Payload.(DeleteTenantsPayload)
if !ok {
return errors.Errorf("expected commit payload to be DeleteTenants, but got %T",
tx.Payload)
}
cls := m.getClassByName(req.Class)
if cls == nil {
m.logger.WithField("action", "delete_tenants").
WithField("class", req.Class).Warn("class not found")
return nil
}
return m.onDeleteTenants(ctx, cls, req)
}