File size: 2,340 Bytes
b110593
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
//                           _       _
// __      _____  __ ___   ___  __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
//  \ V  V /  __/ (_| |\ V /| | (_| | ||  __/
//   \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
//  Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
//  CONTACT: [email protected]
//

package connstate

import (
	"context"
	"encoding/json"
	"fmt"

	"github.com/sirupsen/logrus"
)

// Manager can save and load a connector's internal state into a remote storage
type Manager struct {
	repo   Repo
	state  json.RawMessage
	logger logrus.FieldLogger
}

// Repo describes the dependencies of the connector state manager to an
// external storage
type Repo interface {
	Save(ctx context.Context, state json.RawMessage) error
	Load(ctx context.Context) (json.RawMessage, error)
}

// NewManager for Connector State
func NewManager(repo Repo, logger logrus.FieldLogger) (*Manager, error) {
	m := &Manager{repo: repo, logger: logger}
	if err := m.loadOrInitialize(context.Background()); err != nil {
		return nil, fmt.Errorf("could not load or initialize: %v", err)
	}

	return m, nil
}

// GetInitialState is only supposed to be used during initialization of the
// connector.
func (m *Manager) GetInitialState() json.RawMessage {
	return m.state
}

// SetState form outside (i.e. from the connector)
func (m *Manager) SetState(ctx context.Context, state json.RawMessage) error {
	m.state = state
	return m.save(ctx)
}

// func (l *etcdSchemaManager) SetStateConnector(stateConnector connector_state.Connector) {
// 	l.connectorStateSetter = stateConnector
// }

func (m *Manager) loadOrInitialize(ctx context.Context) error {
	state, err := m.repo.Load(ctx)
	if err != nil {
		return fmt.Errorf("could not load connector state: %v", err)
	}

	if state == nil {
		m.state = json.RawMessage([]byte("{}"))
		return m.save(ctx)
	}

	m.state = state
	return nil
}

func (m *Manager) save(ctx context.Context) error {
	m.logger.
		WithField("action", "connector_state_update").
		WithField("configuration_store", "etcd").
		Debug("saving updated connector state to configuration store")

	err := m.repo.Save(ctx, m.state)
	if err != nil {
		return fmt.Errorf("could not save connector state: %v", err)
	}

	return nil
}