KevinStephenson
Adding in weaviate code
b110593
raw
history blame
33.1 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package rest
import (
"context"
"encoding/json"
"fmt"
"net"
"net/http"
"os"
goruntime "runtime"
"runtime/debug"
"strings"
"time"
_ "net/http/pprof"
"github.com/KimMachineGun/automemlimit/memlimit"
openapierrors "github.com/go-openapi/errors"
"github.com/go-openapi/runtime"
"github.com/go-openapi/swag"
"github.com/pbnjay/memory"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
"github.com/weaviate/weaviate/adapters/clients"
"github.com/weaviate/weaviate/adapters/handlers/rest/clusterapi"
"github.com/weaviate/weaviate/adapters/handlers/rest/operations"
"github.com/weaviate/weaviate/adapters/handlers/rest/state"
"github.com/weaviate/weaviate/adapters/repos/classifications"
"github.com/weaviate/weaviate/adapters/repos/db"
"github.com/weaviate/weaviate/adapters/repos/db/inverted"
modulestorage "github.com/weaviate/weaviate/adapters/repos/modules"
schemarepo "github.com/weaviate/weaviate/adapters/repos/schema"
txstore "github.com/weaviate/weaviate/adapters/repos/transactions"
"github.com/weaviate/weaviate/entities/moduletools"
"github.com/weaviate/weaviate/entities/replication"
vectorIndex "github.com/weaviate/weaviate/entities/vectorindex"
modstgazure "github.com/weaviate/weaviate/modules/backup-azure"
modstgfs "github.com/weaviate/weaviate/modules/backup-filesystem"
modstggcs "github.com/weaviate/weaviate/modules/backup-gcs"
modstgs3 "github.com/weaviate/weaviate/modules/backup-s3"
modgenerativeanyscale "github.com/weaviate/weaviate/modules/generative-anyscale"
modgenerativeaws "github.com/weaviate/weaviate/modules/generative-aws"
modgenerativecohere "github.com/weaviate/weaviate/modules/generative-cohere"
modgenerativeopenai "github.com/weaviate/weaviate/modules/generative-openai"
modgenerativepalm "github.com/weaviate/weaviate/modules/generative-palm"
modimage "github.com/weaviate/weaviate/modules/img2vec-neural"
modbind "github.com/weaviate/weaviate/modules/multi2vec-bind"
modclip "github.com/weaviate/weaviate/modules/multi2vec-clip"
modner "github.com/weaviate/weaviate/modules/ner-transformers"
modqnaopenai "github.com/weaviate/weaviate/modules/qna-openai"
modqna "github.com/weaviate/weaviate/modules/qna-transformers"
modcentroid "github.com/weaviate/weaviate/modules/ref2vec-centroid"
modrerankercohere "github.com/weaviate/weaviate/modules/reranker-cohere"
modrerankertransformers "github.com/weaviate/weaviate/modules/reranker-transformers"
modsum "github.com/weaviate/weaviate/modules/sum-transformers"
modspellcheck "github.com/weaviate/weaviate/modules/text-spellcheck"
modtext2vecaws "github.com/weaviate/weaviate/modules/text2vec-aws"
modcohere "github.com/weaviate/weaviate/modules/text2vec-cohere"
modcontextionary "github.com/weaviate/weaviate/modules/text2vec-contextionary"
modgpt4all "github.com/weaviate/weaviate/modules/text2vec-gpt4all"
modhuggingface "github.com/weaviate/weaviate/modules/text2vec-huggingface"
modjinaai "github.com/weaviate/weaviate/modules/text2vec-jinaai"
modopenai "github.com/weaviate/weaviate/modules/text2vec-openai"
modtext2vecpalm "github.com/weaviate/weaviate/modules/text2vec-palm"
modtransformers "github.com/weaviate/weaviate/modules/text2vec-transformers"
"github.com/weaviate/weaviate/usecases/auth/authentication/composer"
"github.com/weaviate/weaviate/usecases/backup"
"github.com/weaviate/weaviate/usecases/classification"
"github.com/weaviate/weaviate/usecases/cluster"
"github.com/weaviate/weaviate/usecases/config"
"github.com/weaviate/weaviate/usecases/modules"
"github.com/weaviate/weaviate/usecases/monitoring"
"github.com/weaviate/weaviate/usecases/objects"
"github.com/weaviate/weaviate/usecases/replica"
"github.com/weaviate/weaviate/usecases/scaler"
schemaUC "github.com/weaviate/weaviate/usecases/schema"
"github.com/weaviate/weaviate/usecases/schema/migrate"
"github.com/weaviate/weaviate/usecases/sharding"
"github.com/weaviate/weaviate/usecases/traverser"
)
const MinimumRequiredContextionaryVersion = "1.0.2"
func makeConfigureServer(appState *state.State) func(*http.Server, string, string) {
return func(s *http.Server, scheme, addr string) {
// Add properties to the config
appState.ServerConfig.Hostname = addr
appState.ServerConfig.Scheme = scheme
}
}
type vectorRepo interface {
objects.BatchVectorRepo
traverser.VectorSearcher
classification.VectorRepo
scaler.BackUpper
SetSchemaGetter(schemaUC.SchemaGetter)
WaitForStartup(ctx context.Context) error
Shutdown(ctx context.Context) error
}
func getCores() (int, error) {
cpuset, err := os.ReadFile("/sys/fs/cgroup/cpuset/cpuset.cpus")
if err != nil {
return 0, errors.Wrap(err, "read cpuset")
}
cores := strings.Split(strings.TrimSpace(string(cpuset)), ",")
return len(cores), nil
}
func MakeAppState(ctx context.Context, options *swag.CommandLineOptionsGroup) *state.State {
appState := startupRoutine(ctx, options)
setupGoProfiling(appState.ServerConfig.Config)
if appState.ServerConfig.Config.Monitoring.Enabled {
// only monitoring tool supported at the moment is prometheus
go func() {
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())
http.ListenAndServe(fmt.Sprintf(":%d", appState.ServerConfig.Config.Monitoring.Port), mux)
}()
}
limitResources(appState)
err := registerModules(appState)
if err != nil {
appState.Logger.
WithField("action", "startup").WithError(err).
Fatal("modules didn't load")
}
// now that modules are loaded we can run the remaining config validation
// which is module dependent
if err := appState.ServerConfig.Config.Validate(appState.Modules); err != nil {
appState.Logger.
WithField("action", "startup").WithError(err).
Fatal("invalid config")
}
appState.ClusterHttpClient = reasonableHttpClient(appState.ServerConfig.Config.Cluster.AuthConfig)
var vectorRepo vectorRepo
var vectorMigrator migrate.Migrator
var migrator migrate.Migrator
if appState.ServerConfig.Config.Monitoring.Enabled {
promMetrics := monitoring.GetMetrics()
appState.Metrics = promMetrics
}
// TODO: configure http transport for efficient intra-cluster comm
remoteIndexClient := clients.NewRemoteIndex(appState.ClusterHttpClient)
remoteNodesClient := clients.NewRemoteNode(appState.ClusterHttpClient)
replicationClient := clients.NewReplicationClient(appState.ClusterHttpClient)
repo, err := db.New(appState.Logger, db.Config{
ServerVersion: config.ServerVersion,
GitHash: config.GitHash,
MemtablesFlushIdleAfter: appState.ServerConfig.Config.Persistence.FlushIdleMemtablesAfter,
MemtablesInitialSizeMB: 10,
MemtablesMaxSizeMB: appState.ServerConfig.Config.Persistence.MemtablesMaxSizeMB,
MemtablesMinActiveSeconds: appState.ServerConfig.Config.Persistence.MemtablesMinActiveDurationSeconds,
MemtablesMaxActiveSeconds: appState.ServerConfig.Config.Persistence.MemtablesMaxActiveDurationSeconds,
RootPath: appState.ServerConfig.Config.Persistence.DataPath,
QueryLimit: appState.ServerConfig.Config.QueryDefaults.Limit,
QueryMaximumResults: appState.ServerConfig.Config.QueryMaximumResults,
QueryNestedRefLimit: appState.ServerConfig.Config.QueryNestedCrossReferenceLimit,
MaxImportGoroutinesFactor: appState.ServerConfig.Config.MaxImportGoroutinesFactor,
TrackVectorDimensions: appState.ServerConfig.Config.TrackVectorDimensions,
ResourceUsage: appState.ServerConfig.Config.ResourceUsage,
AvoidMMap: appState.ServerConfig.Config.AvoidMmap,
DisableLazyLoadShards: appState.ServerConfig.Config.DisableLazyLoadShards,
// Pass dummy replication config with minimum factor 1. Otherwise the
// setting is not backward-compatible. The user may have created a class
// with factor=1 before the change was introduced. Now their setup would no
// longer start up if the required minimum is now higher than 1. We want
// the required minimum to only apply to newly created classes - not block
// loading existing ones.
Replication: replication.GlobalConfig{MinimumFactor: 1},
}, remoteIndexClient, appState.Cluster, remoteNodesClient, replicationClient, appState.Metrics) // TODO client
if err != nil {
appState.Logger.
WithField("action", "startup").WithError(err).
Fatal("invalid new DB")
}
appState.DB = repo
vectorMigrator = db.NewMigrator(repo, appState.Logger)
vectorRepo = repo
migrator = vectorMigrator
explorer := traverser.NewExplorer(repo, appState.Logger, appState.Modules, traverser.NewMetrics(appState.Metrics), appState.ServerConfig.Config)
schemaRepo := schemarepo.NewStore(appState.ServerConfig.Config.Persistence.DataPath, appState.Logger)
if err = schemaRepo.Open(); err != nil {
appState.Logger.
WithField("action", "startup").WithError(err).
Fatal("could not initialize schema repo")
os.Exit(1)
}
localClassifierRepo, err := classifications.NewRepo(
appState.ServerConfig.Config.Persistence.DataPath, appState.Logger)
if err != nil {
appState.Logger.
WithField("action", "startup").WithError(err).
Fatal("could not initialize classifications repo")
os.Exit(1)
}
// TODO: configure http transport for efficient intra-cluster comm
classificationsTxClient := clients.NewClusterClassifications(appState.ClusterHttpClient)
classifierRepo := classifications.NewDistributeRepo(classificationsTxClient,
appState.Cluster, localClassifierRepo, appState.Logger)
appState.ClassificationRepo = classifierRepo
scaler := scaler.New(appState.Cluster, vectorRepo,
remoteIndexClient, appState.Logger, appState.ServerConfig.Config.Persistence.DataPath)
appState.Scaler = scaler
// TODO: configure http transport for efficient intra-cluster comm
schemaTxClient := clients.NewClusterSchema(appState.ClusterHttpClient)
schemaTxPersistence := txstore.NewStore(
appState.ServerConfig.Config.Persistence.DataPath, appState.Logger)
schemaTxPersistence.SetUmarshalFn(schemaUC.UnmarshalTransaction)
if err := schemaTxPersistence.Open(); err != nil {
appState.Logger.
WithField("action", "startup").WithError(err).
Fatal("could not open tx repo")
os.Exit(1)
}
schemaManager, err := schemaUC.NewManager(migrator, schemaRepo,
appState.Logger, appState.Authorizer, appState.ServerConfig.Config,
vectorIndex.ParseAndValidateConfig, appState.Modules, inverted.ValidateConfig,
appState.Modules, appState.Cluster, schemaTxClient,
schemaTxPersistence, scaler,
)
if err != nil {
appState.Logger.
WithField("action", "startup").WithError(err).
Fatal("could not initialize schema manager")
os.Exit(1)
}
appState.SchemaManager = schemaManager
appState.RemoteIndexIncoming = sharding.NewRemoteIndexIncoming(repo)
appState.RemoteNodeIncoming = sharding.NewRemoteNodeIncoming(repo)
appState.RemoteReplicaIncoming = replica.NewRemoteReplicaIncoming(repo)
backupManager := backup.NewHandler(appState.Logger, appState.Authorizer,
schemaManager, repo, appState.Modules)
appState.BackupManager = backupManager
go clusterapi.Serve(appState)
vectorRepo.SetSchemaGetter(schemaManager)
explorer.SetSchemaGetter(schemaManager)
appState.Modules.SetSchemaGetter(schemaManager)
err = vectorRepo.WaitForStartup(ctx)
if err != nil {
appState.Logger.
WithError(err).
WithField("action", "startup").
Fatal("db didn't start up")
os.Exit(1)
}
if err := schemaManager.StartServing(ctx); err != nil {
appState.Logger.
WithError(err).
WithField("action", "startup").
Fatal("schema manager: resume dangling txs")
os.Exit(1)
}
batchManager := objects.NewBatchManager(vectorRepo, appState.Modules,
appState.Locks, schemaManager, appState.ServerConfig, appState.Logger,
appState.Authorizer, appState.Metrics)
appState.BatchManager = batchManager
objectsTraverser := traverser.NewTraverser(appState.ServerConfig, appState.Locks,
appState.Logger, appState.Authorizer, vectorRepo, explorer, schemaManager,
appState.Modules, traverser.NewMetrics(appState.Metrics),
appState.ServerConfig.Config.MaximumConcurrentGetRequests)
appState.Traverser = objectsTraverser
updateSchemaCallback := makeUpdateSchemaCall(appState.Logger, appState, objectsTraverser)
schemaManager.RegisterSchemaUpdateCallback(updateSchemaCallback)
err = migrator.AdjustFilterablePropSettings(ctx)
if err != nil {
appState.Logger.
WithError(err).
WithField("action", "adjustFilterablePropSettings").
Fatal("migration failed")
os.Exit(1)
}
// FIXME to avoid import cycles, tasks are passed as strings
reindexTaskNames := []string{}
var reindexCtx context.Context
reindexCtx, appState.ReindexCtxCancel = context.WithCancel(context.Background())
reindexFinished := make(chan error, 1)
if appState.ServerConfig.Config.ReindexSetToRoaringsetAtStartup {
reindexTaskNames = append(reindexTaskNames, "ShardInvertedReindexTaskSetToRoaringSet")
}
if appState.ServerConfig.Config.IndexMissingTextFilterableAtStartup {
reindexTaskNames = append(reindexTaskNames, "ShardInvertedReindexTaskMissingTextFilterable")
}
if len(reindexTaskNames) > 0 {
// start reindexing inverted indexes (if requested by user) in the background
// allowing db to complete api configuration and start handling requests
go func() {
appState.Logger.
WithField("action", "startup").
Info("Reindexing inverted indexes")
reindexFinished <- migrator.InvertedReindex(reindexCtx, reindexTaskNames...)
}()
}
configureServer = makeConfigureServer(appState)
// while we accept an overall longer startup, e.g. due to a recovery, we
// still want to limit the module startup context, as that's mostly service
// discovery / dependency checking
moduleCtx, cancel := context.WithTimeout(ctx, 120*time.Second)
defer cancel()
err = initModules(moduleCtx, appState)
if err != nil {
appState.Logger.
WithField("action", "startup").WithError(err).
Fatal("modules didn't initialize")
}
// manually update schema once
schema := schemaManager.GetSchemaSkipAuth()
updateSchemaCallback(schema)
// Add dimensions to all the objects in the database, if requested by the user
if appState.ServerConfig.Config.ReindexVectorDimensionsAtStartup {
appState.Logger.
WithField("action", "startup").
Info("Reindexing dimensions")
migrator.RecalculateVectorDimensions(ctx)
}
// Add recount properties of all the objects in the database, if requested by the user
if appState.ServerConfig.Config.RecountPropertiesAtStartup {
migrator.RecountProperties(ctx)
}
return appState
}
func configureAPI(api *operations.WeaviateAPI) http.Handler {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 60*time.Minute)
defer cancel()
config.ServerVersion = parseVersionFromSwaggerSpec()
appState := MakeAppState(ctx, connectorOptionGroup)
api.ServeError = openapierrors.ServeError
api.JSONConsumer = runtime.JSONConsumer()
api.OidcAuth = composer.New(
appState.ServerConfig.Config.Authentication,
appState.APIKey, appState.OIDC)
api.Logger = func(msg string, args ...interface{}) {
appState.Logger.WithField("action", "restapi_management").Infof(msg, args...)
}
classifier := classification.New(appState.SchemaManager, appState.ClassificationRepo, appState.DB, // the DB is the vectorrepo
appState.Authorizer,
appState.Logger, appState.Modules)
setupSchemaHandlers(api, appState.SchemaManager, appState.Metrics, appState.Logger)
objectsManager := objects.NewManager(appState.Locks,
appState.SchemaManager, appState.ServerConfig, appState.Logger,
appState.Authorizer, appState.DB, appState.Modules,
objects.NewMetrics(appState.Metrics))
setupObjectHandlers(api, objectsManager, appState.ServerConfig.Config, appState.Logger,
appState.Modules, appState.Metrics)
setupObjectBatchHandlers(api, appState.BatchManager, appState.Metrics, appState.Logger)
setupGraphQLHandlers(api, appState, appState.SchemaManager, appState.ServerConfig.Config.DisableGraphQL,
appState.Metrics, appState.Logger)
setupMiscHandlers(api, appState.ServerConfig, appState.SchemaManager, appState.Modules,
appState.Metrics, appState.Logger)
setupClassificationHandlers(api, classifier, appState.Metrics, appState.Logger)
backupScheduler := backup.NewScheduler(
appState.Authorizer,
clients.NewClusterBackups(appState.ClusterHttpClient),
appState.DB, appState.Modules,
appState.Cluster,
appState.Logger)
setupBackupHandlers(api, backupScheduler, appState.Metrics, appState.Logger)
setupNodesHandlers(api, appState.SchemaManager, appState.DB, appState)
grpcServer := createGrpcServer(appState)
setupMiddlewares := makeSetupMiddlewares(appState)
setupGlobalMiddleware := makeSetupGlobalMiddleware(appState)
api.ServerShutdown = func() {
// stop reindexing on server shutdown
appState.ReindexCtxCancel()
// gracefully stop gRPC server
grpcServer.GracefulStop()
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
if err := appState.SchemaManager.Shutdown(ctx); err != nil {
panic(err)
}
if err := appState.DB.Shutdown(ctx); err != nil {
panic(err)
}
}
startGrpcServer(grpcServer, appState)
return setupGlobalMiddleware(api.Serve(setupMiddlewares))
}
// TODO: Split up and don't write into global variables. Instead return an appState
func startupRoutine(ctx context.Context, options *swag.CommandLineOptionsGroup) *state.State {
appState := &state.State{}
logger := logger()
appState.Logger = logger
logger.WithField("action", "startup").WithField("startup_time_left", timeTillDeadline(ctx)).
Debug("created startup context, nothing done so far")
// Load the config using the flags
serverConfig := &config.WeaviateConfig{}
appState.ServerConfig = serverConfig
err := serverConfig.LoadConfig(options, logger)
if err != nil {
logger.WithField("action", "startup").WithError(err).Error("could not load config")
logger.Exit(1)
}
monitoring.InitConfig(serverConfig.Config.Monitoring)
if serverConfig.Config.DisableGraphQL {
logger.WithFields(logrus.Fields{
"action": "startup",
"disable_graphql": true,
}).Warnf("GraphQL API disabled, relying only on gRPC API for querying. " +
"This is considered experimental and will likely experience breaking changes " +
"before reaching general availability")
}
logger.WithFields(logrus.Fields{
"action": "startup",
"default_vectorizer_module": serverConfig.Config.DefaultVectorizerModule,
}).Infof("the default vectorizer modules is set to %q, as a result all new "+
"schema classes without an explicit vectorizer setting, will use this "+
"vectorizer", serverConfig.Config.DefaultVectorizerModule)
logger.WithFields(logrus.Fields{
"action": "startup",
"auto_schema_enabled": serverConfig.Config.AutoSchema.Enabled,
}).Infof("auto schema enabled setting is set to \"%v\"", serverConfig.Config.AutoSchema.Enabled)
logger.WithField("action", "startup").WithField("startup_time_left", timeTillDeadline(ctx)).
Debug("config loaded")
appState.OIDC = configureOIDC(appState)
appState.APIKey = configureAPIKey(appState)
appState.AnonymousAccess = configureAnonymousAccess(appState)
appState.Authorizer = configureAuthorizer(appState)
logger.WithField("action", "startup").WithField("startup_time_left", timeTillDeadline(ctx)).
Debug("configured OIDC and anonymous access client")
appState.Locks = &dummyLock{}
logger.WithField("action", "startup").WithField("startup_time_left", timeTillDeadline(ctx)).
Debug("initialized schema")
clusterState, err := cluster.Init(serverConfig.Config.Cluster, serverConfig.Config.Persistence.DataPath, logger)
if err != nil {
logger.WithField("action", "startup").WithError(err).
Error("could not init cluster state")
logger.Exit(1)
}
appState.Cluster = clusterState
appState.Logger.
WithField("action", "startup").
Debug("startup routine complete")
return appState
}
// logger does not parse the regular config object, as logging needs to be
// configured before the configuration is even loaded/parsed. We are thus
// "manually" reading the desired env vars and set reasonable defaults if they
// are not set.
//
// Defaults to log level info and json format
func logger() *logrus.Logger {
logger := logrus.New()
if os.Getenv("LOG_FORMAT") != "text" {
logger.SetFormatter(&logrus.JSONFormatter{})
}
switch os.Getenv("LOG_LEVEL") {
case "debug":
logger.SetLevel(logrus.DebugLevel)
case "trace":
logger.SetLevel(logrus.TraceLevel)
default:
logger.SetLevel(logrus.InfoLevel)
}
return logger
}
type dummyLock struct{}
func (d *dummyLock) LockConnector() (func() error, error) {
return func() error { return nil }, nil
}
func (d *dummyLock) LockSchema() (func() error, error) {
return func() error { return nil }, nil
}
// everything hard-coded right now, to be made dynamic (from go plugins later)
func registerModules(appState *state.State) error {
appState.Logger.
WithField("action", "startup").
Debug("start registering modules")
appState.Modules = modules.NewProvider()
enabledModules := map[string]bool{}
if len(appState.ServerConfig.Config.EnableModules) > 0 {
modules := strings.Split(appState.ServerConfig.Config.EnableModules, ",")
for _, module := range modules {
enabledModules[strings.TrimSpace(module)] = true
}
}
if _, ok := enabledModules["text2vec-contextionary"]; ok {
appState.Modules.Register(modcontextionary.New())
appState.Logger.
WithField("action", "startup").
WithField("module", "text2vec-contextionary").
Debug("enabled module")
}
if _, ok := enabledModules["text2vec-transformers"]; ok {
appState.Modules.Register(modtransformers.New())
appState.Logger.
WithField("action", "startup").
WithField("module", "text2vec-transformers").
Debug("enabled module")
}
if _, ok := enabledModules[modgpt4all.Name]; ok {
appState.Modules.Register(modgpt4all.New())
appState.Logger.
WithField("action", "startup").
WithField("module", modgpt4all.Name).
Debug("enabled module")
}
if _, ok := enabledModules[modrerankertransformers.Name]; ok {
appState.Modules.Register(modrerankertransformers.New())
appState.Logger.
WithField("action", "startup").
WithField("module", modrerankertransformers.Name).
Debug("enabled module")
}
if _, ok := enabledModules[modrerankercohere.Name]; ok {
appState.Modules.Register(modrerankercohere.New())
appState.Logger.
WithField("action", "startup").
WithField("module", modrerankercohere.Name).
Debug("enabled module")
}
if _, ok := enabledModules["qna-transformers"]; ok {
appState.Modules.Register(modqna.New())
appState.Logger.
WithField("action", "startup").
WithField("module", "qna-transformers").
Debug("enabled module")
}
if _, ok := enabledModules["sum-transformers"]; ok {
appState.Modules.Register(modsum.New())
appState.Logger.
WithField("action", "startup").
WithField("module", "sum-transformers").
Debug("enabled module")
}
if _, ok := enabledModules["img2vec-neural"]; ok {
appState.Modules.Register(modimage.New())
appState.Logger.
WithField("action", "startup").
WithField("module", "img2vec-neural").
Debug("enabled module")
}
if _, ok := enabledModules["ner-transformers"]; ok {
appState.Modules.Register(modner.New())
appState.Logger.
WithField("action", "startup").
WithField("module", "ner-transformers").
Debug("enabled module")
}
if _, ok := enabledModules["text-spellcheck"]; ok {
appState.Modules.Register(modspellcheck.New())
appState.Logger.
WithField("action", "startup").
WithField("module", "text-spellcheck").
Debug("enabled module")
}
if _, ok := enabledModules["multi2vec-clip"]; ok {
appState.Modules.Register(modclip.New())
appState.Logger.
WithField("action", "startup").
WithField("module", "multi2vec-clip").
Debug("enabled module")
}
if _, ok := enabledModules["text2vec-openai"]; ok {
appState.Modules.Register(modopenai.New())
appState.Logger.
WithField("action", "startup").
WithField("module", "text2vec-openai").
Debug("enabled module")
}
if _, ok := enabledModules["qna-openai"]; ok {
appState.Modules.Register(modqnaopenai.New())
appState.Logger.
WithField("action", "startup").
WithField("module", "qna-openai").
Debug("enabled module")
}
if _, ok := enabledModules[modgenerativecohere.Name]; ok {
appState.Modules.Register(modgenerativecohere.New())
appState.Logger.
WithField("action", "startup").
WithField("module", modgenerativecohere.Name).
Debug("enabled module")
}
if _, ok := enabledModules[modgenerativeopenai.Name]; ok {
appState.Modules.Register(modgenerativeopenai.New())
appState.Logger.
WithField("action", "startup").
WithField("module", modgenerativeopenai.Name).
Debug("enabled module")
}
if _, ok := enabledModules[modgenerativeaws.Name]; ok {
appState.Modules.Register(modgenerativeaws.New())
appState.Logger.
WithField("action", "startup").
WithField("module", modgenerativeaws.Name).
Debug("enabled module")
}
if _, ok := enabledModules[modhuggingface.Name]; ok {
appState.Modules.Register(modhuggingface.New())
appState.Logger.
WithField("action", "startup").
WithField("module", modhuggingface.Name).
Debug("enabled module")
}
if _, ok := enabledModules[modgenerativepalm.Name]; ok {
appState.Modules.Register(modgenerativepalm.New())
appState.Logger.
WithField("action", "startup").
WithField("module", modgenerativepalm.Name).
Debug("enabled module")
}
if _, ok := enabledModules[modgenerativeanyscale.Name]; ok {
appState.Modules.Register(modgenerativeanyscale.New())
appState.Logger.
WithField("action", "startup").
WithField("module", modgenerativeanyscale.Name).
Debug("enabled module")
}
if _, ok := enabledModules[modtext2vecpalm.Name]; ok {
appState.Modules.Register(modtext2vecpalm.New())
appState.Logger.
WithField("action", "startup").
WithField("module", modtext2vecpalm.Name).
Debug("enabled module")
}
if _, ok := enabledModules[modtext2vecaws.Name]; ok {
appState.Modules.Register(modtext2vecaws.New())
appState.Logger.
WithField("action", "startup").
WithField("module", modtext2vecaws.Name).
Debug("enabled module")
}
if _, ok := enabledModules[modstgfs.Name]; ok {
appState.Modules.Register(modstgfs.New())
appState.Logger.
WithField("action", "startup").
WithField("module", modstgfs.Name).
Debug("enabled module")
}
if _, ok := enabledModules[modstgs3.Name]; ok {
appState.Modules.Register(modstgs3.New())
appState.Logger.
WithField("action", "startup").
WithField("module", modstgs3.Name).
Debug("enabled module")
}
if _, ok := enabledModules[modstggcs.Name]; ok {
appState.Modules.Register(modstggcs.New())
appState.Logger.
WithField("action", "startup").
WithField("module", modstggcs.Name).
Debug("enabled module")
}
if _, ok := enabledModules[modstgazure.Name]; ok {
appState.Modules.Register(modstgazure.New())
appState.Logger.
WithField("action", "startup").
WithField("module", modstgazure.Name).
Debug("enabled module")
}
if _, ok := enabledModules[modcentroid.Name]; ok {
appState.Modules.Register(modcentroid.New())
appState.Logger.
WithField("action", "startup").
WithField("module", modcentroid.Name).
Debug("enabled module")
}
if _, ok := enabledModules[modcohere.Name]; ok {
appState.Modules.Register(modcohere.New())
appState.Logger.
WithField("action", "startup").
WithField("module", modcohere.Name).
Debug("enabled module")
}
if _, ok := enabledModules[modbind.Name]; ok {
appState.Modules.Register(modbind.New())
appState.Logger.
WithField("action", "startup").
WithField("module", modbind.Name).
Debug("enabled module")
}
if _, ok := enabledModules[modjinaai.Name]; ok {
appState.Modules.Register(modjinaai.New())
appState.Logger.
WithField("action", "startup").
WithField("module", modjinaai.Name).
Debug("enabled module")
}
appState.Logger.
WithField("action", "startup").
Debug("completed registering modules")
return nil
}
func initModules(ctx context.Context, appState *state.State) error {
storageProvider, err := modulestorage.NewRepo(
appState.ServerConfig.Config.Persistence.DataPath, appState.Logger)
if err != nil {
return errors.Wrap(err, "init storage provider")
}
// TODO: gh-1481 don't pass entire appState in, but only what's needed. Probably only
// config?
moduleParams := moduletools.NewInitParams(storageProvider, appState,
appState.ServerConfig.Config, appState.Logger)
appState.Logger.
WithField("action", "startup").
Debug("start initializing modules")
if err := appState.Modules.Init(ctx, moduleParams, appState.Logger); err != nil {
return errors.Wrap(err, "init modules")
}
appState.Logger.
WithField("action", "startup").
Debug("finished initializing modules")
return nil
}
type clientWithAuth struct {
r http.RoundTripper
basicAuth cluster.BasicAuth
}
func (c clientWithAuth) RoundTrip(r *http.Request) (*http.Response, error) {
r.SetBasicAuth(c.basicAuth.Username, c.basicAuth.Password)
return c.r.RoundTrip(r)
}
func reasonableHttpClient(authConfig cluster.AuthConfig) *http.Client {
t := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 120 * time.Second,
}).DialContext,
MaxIdleConnsPerHost: 100,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
if authConfig.BasicAuth.Enabled() {
return &http.Client{Transport: clientWithAuth{r: t, basicAuth: authConfig.BasicAuth}}
}
return &http.Client{Transport: t}
}
func setupGoProfiling(config config.Config) {
go func() {
fmt.Println(http.ListenAndServe(":6060", nil))
}()
if config.Profiling.BlockProfileRate > 0 {
goruntime.SetBlockProfileRate(config.Profiling.BlockProfileRate)
}
if config.Profiling.MutexProfileFraction > 0 {
goruntime.SetMutexProfileFraction(config.Profiling.MutexProfileFraction)
}
}
func parseVersionFromSwaggerSpec() string {
spec := struct {
Info struct {
Version string `json:"version"`
} `json:"info"`
}{}
err := json.Unmarshal(SwaggerJSON, &spec)
if err != nil {
panic(err)
}
return spec.Info.Version
}
func limitResources(appState *state.State) {
if os.Getenv("LIMIT_RESOURCES") == "true" {
appState.Logger.Info("Limiting resources: memory: 80%, cores: all but one")
if os.Getenv("GOMAXPROCS") == "" {
// Fetch the number of cores from the cgroups cpuset
// and parse it into an int
cores, err := getCores()
if err == nil {
appState.Logger.WithField("cores", cores).
Warn("GOMAXPROCS not set, and unable to read from cgroups, setting to number of cores")
goruntime.GOMAXPROCS(cores)
} else {
cores = goruntime.NumCPU() - 1
if cores > 0 {
appState.Logger.WithField("cores", cores).
Warnf("Unable to read from cgroups: %v, setting to max cores to: %v", err, cores)
goruntime.GOMAXPROCS(cores)
}
}
}
limit, err := memlimit.SetGoMemLimit(0.8)
if err != nil {
appState.Logger.WithError(err).Warnf("Unable to set memory limit from cgroups: %v", err)
// Set memory limit to 90% of the available memory
limit := int64(float64(memory.TotalMemory()) * 0.8)
debug.SetMemoryLimit(limit)
appState.Logger.WithField("limit", limit).Info("Set memory limit based on available memory")
} else {
appState.Logger.WithField("limit", limit).Info("Set memory limit")
}
} else {
appState.Logger.Info("No resource limits set, weaviate will use all available memory and CPU. " +
"To limit resources, set LIMIT_RESOURCES=true")
}
}