Spaces:
Running
Running
// _ _ | |
// __ _____ __ ___ ___ __ _| |_ ___ | |
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
// \ V V / __/ (_| |\ V /| | (_| | || __/ | |
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
// | |
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. | |
// | |
// CONTACT: [email protected] | |
// | |
package classifications | |
import ( | |
"context" | |
"sync" | |
"time" | |
"github.com/go-openapi/strfmt" | |
"github.com/pkg/errors" | |
"github.com/sirupsen/logrus" | |
"github.com/weaviate/weaviate/entities/models" | |
"github.com/weaviate/weaviate/usecases/classification" | |
"github.com/weaviate/weaviate/usecases/cluster" | |
) | |
const DefaultTxTTL = 60 * time.Second | |
type DistributedRepo struct { | |
sync.RWMutex | |
txRemote *cluster.TxManager | |
localRepo localRepo | |
} | |
type localRepo interface { | |
Get(ctx context.Context, id strfmt.UUID) (*models.Classification, error) | |
Put(ctx context.Context, classification models.Classification) error | |
} | |
func NewDistributeRepo(remoteClient cluster.Client, | |
memberLister cluster.MemberLister, localRepo localRepo, | |
logger logrus.FieldLogger, | |
) *DistributedRepo { | |
broadcaster := cluster.NewTxBroadcaster(memberLister, remoteClient) | |
txRemote := cluster.NewTxManager(broadcaster, &dummyTxPersistence{}, logger) | |
txRemote.StartAcceptIncoming() | |
repo := &DistributedRepo{ | |
txRemote: txRemote, | |
localRepo: localRepo, | |
} | |
repo.txRemote.SetCommitFn(repo.incomingCommit) | |
return repo | |
} | |
func (r *DistributedRepo) Get(ctx context.Context, | |
id strfmt.UUID, | |
) (*models.Classification, error) { | |
r.RLock() | |
defer r.RUnlock() | |
return r.localRepo.Get(ctx, id) | |
} | |
func (r *DistributedRepo) Put(ctx context.Context, | |
pl models.Classification, | |
) error { | |
r.Lock() | |
defer r.Unlock() | |
tx, err := r.txRemote.BeginTransaction(ctx, classification.TransactionPut, | |
classification.TransactionPutPayload{ | |
Classification: pl, | |
}, DefaultTxTTL) | |
if err != nil { | |
return errors.Wrap(err, "open cluster-wide transaction") | |
} | |
err = r.txRemote.CommitWriteTransaction(ctx, tx) | |
if err != nil { | |
return errors.Wrap(err, "commit cluster-wide transaction") | |
} | |
return r.localRepo.Put(ctx, pl) | |
} | |
func (r *DistributedRepo) incomingCommit(ctx context.Context, | |
tx *cluster.Transaction, | |
) error { | |
if tx.Type != classification.TransactionPut { | |
return errors.Errorf("unrecognized tx type: %s", tx.Type) | |
} | |
return r.localRepo.Put(ctx, tx.Payload.(classification.TransactionPutPayload). | |
Classification) | |
} | |
func (r *DistributedRepo) TxManager() *cluster.TxManager { | |
return r.txRemote | |
} | |
// NOTE: classifications do not yet make use of the new durability guarantees | |
// introduced by the txManager as part of v1.21.3. The reasoning behind this is | |
// that the classification itself is not crash-safe anyway, so there is no | |
// point. We need to decide down the line what to do with this? It is a rarely | |
// used, but not used feature. For now we are not aware of anyone having any | |
// issues with its stability. | |
type dummyTxPersistence struct{} | |
func (d *dummyTxPersistence) StoreTx(ctx context.Context, tx *cluster.Transaction) error { | |
return nil | |
} | |
func (d *dummyTxPersistence) DeleteTx(ctx context.Context, txID string) error { | |
return nil | |
} | |
func (d *dummyTxPersistence) IterateAll(ctx context.Context, cb func(tx *cluster.Transaction)) error { | |
return nil | |
} | |