Spaces:
Running
Running
File size: 4,761 Bytes
b110593 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package schema
import (
"encoding/json"
"time"
"github.com/pkg/errors"
"github.com/weaviate/weaviate/entities/models"
"github.com/weaviate/weaviate/usecases/cluster"
"github.com/weaviate/weaviate/usecases/sharding"
)
const (
// write-only
AddClass cluster.TransactionType = "add_class"
AddProperty cluster.TransactionType = "add_property"
mergeObjectProperty cluster.TransactionType = "merge_object_property"
// tenant types
addTenants cluster.TransactionType = "add_tenants"
updateTenants cluster.TransactionType = "update_tenants"
deleteTenants cluster.TransactionType = "delete_tenants"
DeleteClass cluster.TransactionType = "delete_class"
UpdateClass cluster.TransactionType = "update_class"
// read-only
ReadSchema cluster.TransactionType = "read_schema"
// repairs
RepairClass cluster.TransactionType = "repair_class"
RepairProperty cluster.TransactionType = "repair_property"
RepairTenant cluster.TransactionType = "repair_tenant"
DefaultTxTTL = 60 * time.Second
)
// any tx that is listed here will be tried to commit if it is still open on
// startup
var resumableTxs = []cluster.TransactionType{
addTenants,
}
// any tx that is listed here will bypass the ready-check, i.e. they will
// execute even if the DB is unready.
var allowUnreadyTxs = []cluster.TransactionType{
ReadSchema, // required at startup, does not write
RepairClass,
RepairProperty,
RepairTenant,
}
type AddClassPayload struct {
Class *models.Class `json:"class"`
State *sharding.State `json:"state"`
}
type AddPropertyPayload struct {
ClassName string `json:"className"`
Property *models.Property `json:"property"`
}
type MergeObjectPropertyPayload struct {
ClassName string `json:"className"`
Property *models.Property `json:"property"`
}
// TenantCreate represents properties of a specific tenant (physical shard)
type TenantCreate struct {
Name string `json:"name"`
Nodes []string `json:"nodes"`
Status string `json:"status"`
}
type TenantUpdate struct {
Name string `json:"name"`
Status string `json:"status"`
}
// AddTenantsPayload allows for adding multiple tenants to a class
type AddTenantsPayload struct {
Class string `json:"class_name"`
Tenants []TenantCreate `json:"tenants"`
}
type UpdateTenantsPayload struct {
Class string `json:"class_name"`
Tenants []TenantUpdate `json:"tenants"`
}
// DeleteTenantsPayload allows for removing multiple tenants from a class
type DeleteTenantsPayload struct {
Class string `json:"class_name"`
Tenants []string `json:"tenants"`
}
type DeleteClassPayload struct {
ClassName string `json:"className"`
}
type UpdateClassPayload struct {
ClassName string `json:"className"`
Class *models.Class `json:"class"`
// For now, the state cannot be updated yet, but this will be a requirement
// in the future, for example, with dynamically changing replication, so we
// should already make sure that state is part of the transaction payload
State *sharding.State `json:"state"`
}
type ReadSchemaPayload struct {
Schema *State `json:"schema"`
}
func UnmarshalTransaction(txType cluster.TransactionType,
payload json.RawMessage,
) (interface{}, error) {
switch txType {
case AddClass, RepairClass:
return unmarshalRawJson[AddClassPayload](payload)
case AddProperty, RepairProperty:
return unmarshalRawJson[AddPropertyPayload](payload)
case mergeObjectProperty:
return unmarshalRawJson[MergeObjectPropertyPayload](payload)
case DeleteClass:
return unmarshalRawJson[DeleteClassPayload](payload)
case UpdateClass:
return unmarshalRawJson[UpdateClassPayload](payload)
case ReadSchema:
return unmarshalRawJson[ReadSchemaPayload](payload)
case addTenants, RepairTenant:
return unmarshalRawJson[AddTenantsPayload](payload)
case updateTenants:
return unmarshalRawJson[UpdateTenantsPayload](payload)
case deleteTenants:
return unmarshalRawJson[DeleteTenantsPayload](payload)
default:
return nil, errors.Errorf("unrecognized schema transaction type %q", txType)
}
}
// unmarshalRawJson returns the result of marshalling json payload
func unmarshalRawJson[T any](payload json.RawMessage) (T, error) {
var v T
err := json.Unmarshal(payload, &v)
return v, err
}
|