Spaces:
Running
Running
// _ _ | |
// __ _____ __ ___ ___ __ _| |_ ___ | |
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
// \ V V / __/ (_| |\ V /| | (_| | || __/ | |
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
// | |
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. | |
// | |
// CONTACT: [email protected] | |
// | |
package cluster | |
import ( | |
"context" | |
"github.com/pkg/errors" | |
"github.com/sirupsen/logrus" | |
) | |
func (c *TxManager) CloseReadTransaction(ctx context.Context, | |
tx *Transaction, | |
) error { | |
c.Lock() | |
if c.currentTransaction == nil || c.currentTransaction.ID != tx.ID { | |
c.Unlock() | |
return ErrInvalidTransaction | |
} | |
c.Unlock() | |
// now that we know we are dealing with a valid transaction: no matter the | |
// outcome, after this call, we should not have a local transaction anymore | |
defer func() { | |
c.Lock() | |
c.currentTransaction = nil | |
c.Unlock() | |
}() | |
if err := c.remote.BroadcastCommitTransaction(ctx, tx); err != nil { | |
// we could not open the transaction on every node, therefore we need to | |
// abort it everywhere. | |
if err := c.remote.BroadcastAbortTransaction(ctx, tx); err != nil { | |
c.logger.WithFields(logrus.Fields{ | |
"action": "broadcast_abort_transaction", | |
"id": tx.ID, | |
}).WithError(err).Errorf("broadcast tx abort failed") | |
} | |
return errors.Wrap(err, "broadcast commit transaction") | |
} | |
return nil | |
} | |