Spaces:
Running
Running
File size: 3,400 Bytes
b110593 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package classifications
import (
"context"
"sync"
"time"
"github.com/go-openapi/strfmt"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/weaviate/weaviate/entities/models"
"github.com/weaviate/weaviate/usecases/classification"
"github.com/weaviate/weaviate/usecases/cluster"
)
const DefaultTxTTL = 60 * time.Second
type DistributedRepo struct {
sync.RWMutex
txRemote *cluster.TxManager
localRepo localRepo
}
type localRepo interface {
Get(ctx context.Context, id strfmt.UUID) (*models.Classification, error)
Put(ctx context.Context, classification models.Classification) error
}
func NewDistributeRepo(remoteClient cluster.Client,
memberLister cluster.MemberLister, localRepo localRepo,
logger logrus.FieldLogger,
) *DistributedRepo {
broadcaster := cluster.NewTxBroadcaster(memberLister, remoteClient)
txRemote := cluster.NewTxManager(broadcaster, &dummyTxPersistence{}, logger)
txRemote.StartAcceptIncoming()
repo := &DistributedRepo{
txRemote: txRemote,
localRepo: localRepo,
}
repo.txRemote.SetCommitFn(repo.incomingCommit)
return repo
}
func (r *DistributedRepo) Get(ctx context.Context,
id strfmt.UUID,
) (*models.Classification, error) {
r.RLock()
defer r.RUnlock()
return r.localRepo.Get(ctx, id)
}
func (r *DistributedRepo) Put(ctx context.Context,
pl models.Classification,
) error {
r.Lock()
defer r.Unlock()
tx, err := r.txRemote.BeginTransaction(ctx, classification.TransactionPut,
classification.TransactionPutPayload{
Classification: pl,
}, DefaultTxTTL)
if err != nil {
return errors.Wrap(err, "open cluster-wide transaction")
}
err = r.txRemote.CommitWriteTransaction(ctx, tx)
if err != nil {
return errors.Wrap(err, "commit cluster-wide transaction")
}
return r.localRepo.Put(ctx, pl)
}
func (r *DistributedRepo) incomingCommit(ctx context.Context,
tx *cluster.Transaction,
) error {
if tx.Type != classification.TransactionPut {
return errors.Errorf("unrecognized tx type: %s", tx.Type)
}
return r.localRepo.Put(ctx, tx.Payload.(classification.TransactionPutPayload).
Classification)
}
func (r *DistributedRepo) TxManager() *cluster.TxManager {
return r.txRemote
}
// NOTE: classifications do not yet make use of the new durability guarantees
// introduced by the txManager as part of v1.21.3. The reasoning behind this is
// that the classification itself is not crash-safe anyway, so there is no
// point. We need to decide down the line what to do with this? It is a rarely
// used, but not used feature. For now we are not aware of anyone having any
// issues with its stability.
type dummyTxPersistence struct{}
func (d *dummyTxPersistence) StoreTx(ctx context.Context, tx *cluster.Transaction) error {
return nil
}
func (d *dummyTxPersistence) DeleteTx(ctx context.Context, txID string) error {
return nil
}
func (d *dummyTxPersistence) IterateAll(ctx context.Context, cb func(tx *cluster.Transaction)) error {
return nil
}
|