KevinStephenson
Adding in weaviate code
b110593
raw
history blame
2.34 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package connstate
import (
"context"
"encoding/json"
"fmt"
"github.com/sirupsen/logrus"
)
// Manager can save and load a connector's internal state into a remote storage
type Manager struct {
repo Repo
state json.RawMessage
logger logrus.FieldLogger
}
// Repo describes the dependencies of the connector state manager to an
// external storage
type Repo interface {
Save(ctx context.Context, state json.RawMessage) error
Load(ctx context.Context) (json.RawMessage, error)
}
// NewManager for Connector State
func NewManager(repo Repo, logger logrus.FieldLogger) (*Manager, error) {
m := &Manager{repo: repo, logger: logger}
if err := m.loadOrInitialize(context.Background()); err != nil {
return nil, fmt.Errorf("could not load or initialize: %v", err)
}
return m, nil
}
// GetInitialState is only supposed to be used during initialization of the
// connector.
func (m *Manager) GetInitialState() json.RawMessage {
return m.state
}
// SetState form outside (i.e. from the connector)
func (m *Manager) SetState(ctx context.Context, state json.RawMessage) error {
m.state = state
return m.save(ctx)
}
// func (l *etcdSchemaManager) SetStateConnector(stateConnector connector_state.Connector) {
// l.connectorStateSetter = stateConnector
// }
func (m *Manager) loadOrInitialize(ctx context.Context) error {
state, err := m.repo.Load(ctx)
if err != nil {
return fmt.Errorf("could not load connector state: %v", err)
}
if state == nil {
m.state = json.RawMessage([]byte("{}"))
return m.save(ctx)
}
m.state = state
return nil
}
func (m *Manager) save(ctx context.Context) error {
m.logger.
WithField("action", "connector_state_update").
WithField("configuration_store", "etcd").
Debug("saving updated connector state to configuration store")
err := m.repo.Save(ctx, m.state)
if err != nil {
return fmt.Errorf("could not save connector state: %v", err)
}
return nil
}