KevinStephenson
Adding in weaviate code
b110593
raw
history blame
7.39 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package schema
import (
"errors"
"fmt"
"sync"
"github.com/weaviate/weaviate/entities/models"
"github.com/weaviate/weaviate/entities/schema"
"github.com/weaviate/weaviate/usecases/sharding"
)
var (
errPropertyNotFound = errors.New("property not found")
errClassNotFound = errors.New("class not found")
errShardNotFound = errors.New("shard not found")
)
// State is a cached copy of the schema that can also be saved into a remote
// storage, as specified by Repo
type State struct {
ObjectSchema *models.Schema `json:"object"`
ShardingState map[string]*sharding.State
}
// NewState returns a new state with room for nClasses classes
func NewState(nClasses int) State {
return State{
ObjectSchema: &models.Schema{
Classes: make([]*models.Class, 0, nClasses),
},
ShardingState: make(map[string]*sharding.State, nClasses),
}
}
type schemaCache struct {
sync.RWMutex
State
}
// ShardOwner returns the node owner of the specified shard
func (s *schemaCache) ShardOwner(class, shard string) (string, error) {
s.RLock()
defer s.RUnlock()
cls := s.ShardingState[class]
if cls == nil {
return "", errClassNotFound
}
x, ok := cls.Physical[shard]
if !ok {
return "", errShardNotFound
}
if len(x.BelongsToNodes) < 1 || x.BelongsToNodes[0] == "" {
return "", fmt.Errorf("owner node not found")
}
return x.BelongsToNodes[0], nil
}
// ShardReplicas returns the nodes owning shard in class
func (s *schemaCache) ShardReplicas(class, shard string) ([]string, error) {
s.RLock()
defer s.RUnlock()
cls := s.ShardingState[class]
if cls == nil {
return nil, errClassNotFound
}
x, ok := cls.Physical[shard]
if !ok {
return nil, errShardNotFound
}
return x.BelongsToNodes, nil
}
// TenantShard returns shard name for the provided tenant and its activity status
func (s *schemaCache) TenantShard(class, tenant string) (string, string) {
s.RLock()
defer s.RUnlock()
ss := s.ShardingState[class]
if ss == nil || !ss.PartitioningEnabled {
return "", ""
}
if physical, ok := ss.Physical[tenant]; ok {
return tenant, physical.ActivityStatus()
}
return "", ""
}
// ShardFromUUID returns shard name of the provided uuid
func (s *schemaCache) ShardFromUUID(class string, uuid []byte) string {
s.RLock()
defer s.RUnlock()
ss := s.ShardingState[class]
if ss == nil {
return ""
}
return ss.PhysicalShard(uuid)
}
func (s *schemaCache) CopyShardingState(className string) *sharding.State {
s.RLock()
defer s.RUnlock()
pst := s.ShardingState[className]
if pst != nil {
st := pst.DeepCopy()
pst = &st
}
return pst
}
// LockGuard provides convenient mechanism for owning mutex by function which mutates the state
func (s *schemaCache) LockGuard(mutate func()) {
s.Lock()
defer s.Unlock()
mutate()
}
// RLockGuard provides convenient mechanism for owning mutex function which doesn't mutates the state
func (s *schemaCache) RLockGuard(reader func() error) error {
s.RLock()
defer s.RUnlock()
return reader()
}
func (s *schemaCache) isEmpty() bool {
s.RLock()
defer s.RUnlock()
return s.State.ObjectSchema == nil || len(s.State.ObjectSchema.Classes) == 0
}
func (s *schemaCache) setState(st State) {
s.Lock()
defer s.Unlock()
s.State = st
}
func (s *schemaCache) detachClass(name string) bool {
s.Lock()
defer s.Unlock()
schema, ci := s.ObjectSchema, -1
for i, cls := range schema.Classes {
if cls.Class == name {
ci = i
break
}
}
if ci == -1 {
return false
}
nc := len(schema.Classes)
schema.Classes[ci] = schema.Classes[nc-1]
schema.Classes[nc-1] = nil // to prevent leaking this pointer.
schema.Classes = schema.Classes[:nc-1]
return true
}
func (s *schemaCache) deleteClassState(name string) {
s.Lock()
defer s.Unlock()
delete(s.ShardingState, name)
}
func (s *schemaCache) unsafeFindClassIf(pred func(*models.Class) bool) *models.Class {
for _, cls := range s.ObjectSchema.Classes {
if pred(cls) {
return cls
}
}
return nil
}
func (s *schemaCache) unsafeFindClass(className string) *models.Class {
for _, c := range s.ObjectSchema.Classes {
if c.Class == className {
return c
}
}
return nil
}
func (s *schemaCache) addClass(c *models.Class, ss *sharding.State) {
s.Lock()
defer s.Unlock()
s.ShardingState[c.Class] = ss
s.ObjectSchema.Classes = append(s.ObjectSchema.Classes, c)
}
func (s *schemaCache) updateClass(u *models.Class, ss *sharding.State) error {
s.Lock()
defer s.Unlock()
if c := s.unsafeFindClass(u.Class); c != nil {
*c = *u
} else {
return errClassNotFound
}
if ss != nil {
s.ShardingState[u.Class] = ss
}
return nil
}
func (s *schemaCache) addProperty(class string, p *models.Property) (models.Class, error) {
s.Lock()
defer s.Unlock()
c := s.unsafeFindClass(class)
if c == nil {
return models.Class{}, errClassNotFound
}
// update all at once to prevent race condition with concurrent readers
src := c.Properties
dest := make([]*models.Property, len(src)+1)
copy(dest, src)
dest[len(src)] = p
c.Properties = dest
return *c, nil
}
func (s *schemaCache) mergeObjectProperty(class string, p *models.Property) (models.Class, error) {
s.Lock()
defer s.Unlock()
c := s.unsafeFindClass(class)
if c == nil {
return models.Class{}, errClassNotFound
}
var prop *models.Property
for i := range c.Properties {
if c.Properties[i].Name == p.Name {
prop = c.Properties[i]
break
}
}
if prop == nil {
return models.Class{}, errPropertyNotFound
}
prop.NestedProperties, _ = schema.MergeRecursivelyNestedProperties(prop.NestedProperties, p.NestedProperties)
return *c, nil
}
// readOnlySchema returns a read only schema
// Changing the schema outside this package might lead to undefined behavior.
func (s *schemaCache) readOnlySchema() *models.Schema {
s.RLock()
defer s.RUnlock()
return shallowCopySchema(s.ObjectSchema)
}
func (s *schemaCache) readOnlyClass(name string) (*models.Class, error) {
s.RLock()
defer s.RUnlock()
c := s.unsafeFindClass(name)
if c == nil {
return nil, errClassNotFound
}
cp := *c
return &cp, nil
}
func (s *schemaCache) classExist(name string) bool {
s.RLock()
defer s.RUnlock()
return s.unsafeFindClass(name) != nil
}
// ShallowCopySchema creates a shallow copy of existing classes
//
// This function assumes that class attributes are being overwritten.
// The properties attribute is the only one that might vary in size;
// therefore, we perform a shallow copy of the existing properties.
// This implementation assumes that individual properties are overwritten rather than partially updated
func shallowCopySchema(m *models.Schema) *models.Schema {
cp := *m
cp.Classes = make([]*models.Class, len(m.Classes))
for i := 0; i < len(m.Classes); i++ {
c := *m.Classes[i]
cp.Classes[i] = &c
}
return &cp
}