File size: 4,761 Bytes
b110593
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
//                           _       _
// __      _____  __ ___   ___  __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
//  \ V  V /  __/ (_| |\ V /| | (_| | ||  __/
//   \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
//  Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
//  CONTACT: [email protected]
//

package schema

import (
	"encoding/json"
	"time"

	"github.com/pkg/errors"
	"github.com/weaviate/weaviate/entities/models"
	"github.com/weaviate/weaviate/usecases/cluster"
	"github.com/weaviate/weaviate/usecases/sharding"
)

const (
	// write-only
	AddClass            cluster.TransactionType = "add_class"
	AddProperty         cluster.TransactionType = "add_property"
	mergeObjectProperty cluster.TransactionType = "merge_object_property"

	// tenant types
	addTenants    cluster.TransactionType = "add_tenants"
	updateTenants cluster.TransactionType = "update_tenants"
	deleteTenants cluster.TransactionType = "delete_tenants"

	DeleteClass cluster.TransactionType = "delete_class"
	UpdateClass cluster.TransactionType = "update_class"

	// read-only
	ReadSchema cluster.TransactionType = "read_schema"

	// repairs
	RepairClass    cluster.TransactionType = "repair_class"
	RepairProperty cluster.TransactionType = "repair_property"
	RepairTenant   cluster.TransactionType = "repair_tenant"

	DefaultTxTTL = 60 * time.Second
)

// any tx that is listed here will be tried to commit if it is still open on
// startup
var resumableTxs = []cluster.TransactionType{
	addTenants,
}

// any tx that is listed here will bypass the ready-check, i.e. they will
// execute even if the DB is unready.
var allowUnreadyTxs = []cluster.TransactionType{
	ReadSchema, // required at startup, does not write
	RepairClass,
	RepairProperty,
	RepairTenant,
}

type AddClassPayload struct {
	Class *models.Class   `json:"class"`
	State *sharding.State `json:"state"`
}

type AddPropertyPayload struct {
	ClassName string           `json:"className"`
	Property  *models.Property `json:"property"`
}

type MergeObjectPropertyPayload struct {
	ClassName string           `json:"className"`
	Property  *models.Property `json:"property"`
}

// TenantCreate represents properties of a specific tenant (physical shard)
type TenantCreate struct {
	Name   string   `json:"name"`
	Nodes  []string `json:"nodes"`
	Status string   `json:"status"`
}

type TenantUpdate struct {
	Name   string `json:"name"`
	Status string `json:"status"`
}

// AddTenantsPayload allows for adding multiple tenants to a class
type AddTenantsPayload struct {
	Class   string         `json:"class_name"`
	Tenants []TenantCreate `json:"tenants"`
}

type UpdateTenantsPayload struct {
	Class   string         `json:"class_name"`
	Tenants []TenantUpdate `json:"tenants"`
}

// DeleteTenantsPayload allows for removing multiple tenants from a class
type DeleteTenantsPayload struct {
	Class   string   `json:"class_name"`
	Tenants []string `json:"tenants"`
}

type DeleteClassPayload struct {
	ClassName string `json:"className"`
}

type UpdateClassPayload struct {
	ClassName string        `json:"className"`
	Class     *models.Class `json:"class"`

	// For now, the state cannot be updated yet, but this will be a requirement
	// in the future, for example, with dynamically changing replication, so we
	// should already make sure that state is part of the transaction payload
	State *sharding.State `json:"state"`
}

type ReadSchemaPayload struct {
	Schema *State `json:"schema"`
}

func UnmarshalTransaction(txType cluster.TransactionType,

	payload json.RawMessage,

) (interface{}, error) {
	switch txType {
	case AddClass, RepairClass:
		return unmarshalRawJson[AddClassPayload](payload)
	case AddProperty, RepairProperty:
		return unmarshalRawJson[AddPropertyPayload](payload)
	case mergeObjectProperty:
		return unmarshalRawJson[MergeObjectPropertyPayload](payload)
	case DeleteClass:
		return unmarshalRawJson[DeleteClassPayload](payload)
	case UpdateClass:
		return unmarshalRawJson[UpdateClassPayload](payload)
	case ReadSchema:
		return unmarshalRawJson[ReadSchemaPayload](payload)
	case addTenants, RepairTenant:
		return unmarshalRawJson[AddTenantsPayload](payload)
	case updateTenants:
		return unmarshalRawJson[UpdateTenantsPayload](payload)
	case deleteTenants:
		return unmarshalRawJson[DeleteTenantsPayload](payload)
	default:
		return nil, errors.Errorf("unrecognized schema transaction type %q", txType)

	}
}

// unmarshalRawJson returns the result of marshalling json payload
func unmarshalRawJson[T any](payload json.RawMessage) (T, error) {
	var v T
	err := json.Unmarshal(payload, &v)

	return v, err
}