KevinStephenson
Adding in weaviate code
b110593
raw
history blame
4.18 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package hnsw
import (
"os"
"sync/atomic"
"github.com/pkg/errors"
"github.com/weaviate/weaviate/entities/schema"
ent "github.com/weaviate/weaviate/entities/vectorindex/hnsw"
"github.com/weaviate/weaviate/usecases/config"
)
func ValidateUserConfigUpdate(initial, updated schema.VectorIndexConfig) error {
initialParsed, ok := initial.(ent.UserConfig)
if !ok {
return errors.Errorf("initial is not UserConfig, but %T", initial)
}
updatedParsed, ok := updated.(ent.UserConfig)
if !ok {
return errors.Errorf("updated is not UserConfig, but %T", updated)
}
immutableFields := []immutableParameter{
{
name: "efConstruction",
accessor: func(c ent.UserConfig) interface{} { return c.EFConstruction },
},
{
name: "maxConnections",
accessor: func(c ent.UserConfig) interface{} { return c.MaxConnections },
},
{
// NOTE: There isn't a technical reason for this to be immutable, it
// simply hasn't been implemented yet. It would require to stop the
// current timer and start a new one. Certainly possible, but let's see
// if anyone actually needs this before implementing it.
name: "cleanupIntervalSeconds",
accessor: func(c ent.UserConfig) interface{} { return c.CleanupIntervalSeconds },
},
{
name: "distance",
accessor: func(c ent.UserConfig) interface{} { return c.Distance },
},
}
for _, u := range immutableFields {
if err := validateImmutableField(u, initialParsed, updatedParsed); err != nil {
return err
}
}
return nil
}
type immutableParameter struct {
accessor func(c ent.UserConfig) interface{}
name string
}
func validateImmutableField(u immutableParameter,
previous, next ent.UserConfig,
) error {
oldField := u.accessor(previous)
newField := u.accessor(next)
if oldField != newField {
return errors.Errorf("%s is immutable: attempted change from \"%v\" to \"%v\"",
u.name, oldField, newField)
}
return nil
}
func (h *hnsw) UpdateUserConfig(updated schema.VectorIndexConfig, callback func()) error {
parsed, ok := updated.(ent.UserConfig)
if !ok {
callback()
return errors.Errorf("config is not UserConfig, but %T", updated)
}
// Store automatically as a lock here would be very expensive, this value is
// read on every single user-facing search, which can be highly concurrent
atomic.StoreInt64(&h.ef, int64(parsed.EF))
atomic.StoreInt64(&h.efMin, int64(parsed.DynamicEFMin))
atomic.StoreInt64(&h.efMax, int64(parsed.DynamicEFMax))
atomic.StoreInt64(&h.efFactor, int64(parsed.DynamicEFFactor))
atomic.StoreInt64(&h.flatSearchCutoff, int64(parsed.FlatSearchCutoff))
if !parsed.PQ.Enabled && !parsed.BQ.Enabled {
callback()
return nil
}
h.pqConfig = parsed.PQ
if asyncEnabled() {
callback()
return nil
}
if !h.compressed.Load() {
// the compression will fire the callback once it's complete
return h.TurnOnCompression(callback)
} else {
h.compressor.SetCacheMaxSize(int64(parsed.VectorCacheMaxObjects))
callback()
return nil
}
}
func asyncEnabled() bool {
return config.Enabled(os.Getenv("ASYNC_INDEXING"))
}
func (h *hnsw) TurnOnCompression(callback func()) error {
h.logger.WithField("action", "compress").Info("switching to compressed vectors")
err := ent.ValidatePQConfig(h.pqConfig)
if err != nil {
callback()
return err
}
go h.compressThenCallback(callback)
return nil
}
func (h *hnsw) compressThenCallback(callback func()) {
defer callback()
uc := ent.UserConfig{
PQ: h.pqConfig,
BQ: ent.BQConfig{
Enabled: !h.pqConfig.Enabled,
},
}
if err := h.compress(uc); err != nil {
h.logger.Error(err)
return
}
h.logger.WithField("action", "compress").Info("vector compression complete")
}