Spaces:
Running
Running
// _ _ | |
// __ _____ __ ___ ___ __ _| |_ ___ | |
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
// \ V V / __/ (_| |\ V /| | (_| | || __/ | |
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
// | |
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. | |
// | |
// CONTACT: [email protected] | |
// | |
package rest | |
import ( | |
"context" | |
"encoding/json" | |
"fmt" | |
"net" | |
"net/http" | |
"os" | |
goruntime "runtime" | |
"runtime/debug" | |
"strings" | |
"time" | |
_ "net/http/pprof" | |
"github.com/KimMachineGun/automemlimit/memlimit" | |
openapierrors "github.com/go-openapi/errors" | |
"github.com/go-openapi/runtime" | |
"github.com/go-openapi/swag" | |
"github.com/pbnjay/memory" | |
"github.com/pkg/errors" | |
"github.com/prometheus/client_golang/prometheus/promhttp" | |
"github.com/sirupsen/logrus" | |
"github.com/weaviate/weaviate/adapters/clients" | |
"github.com/weaviate/weaviate/adapters/handlers/rest/clusterapi" | |
"github.com/weaviate/weaviate/adapters/handlers/rest/operations" | |
"github.com/weaviate/weaviate/adapters/handlers/rest/state" | |
"github.com/weaviate/weaviate/adapters/repos/classifications" | |
"github.com/weaviate/weaviate/adapters/repos/db" | |
"github.com/weaviate/weaviate/adapters/repos/db/inverted" | |
modulestorage "github.com/weaviate/weaviate/adapters/repos/modules" | |
schemarepo "github.com/weaviate/weaviate/adapters/repos/schema" | |
txstore "github.com/weaviate/weaviate/adapters/repos/transactions" | |
"github.com/weaviate/weaviate/entities/moduletools" | |
"github.com/weaviate/weaviate/entities/replication" | |
vectorIndex "github.com/weaviate/weaviate/entities/vectorindex" | |
modstgazure "github.com/weaviate/weaviate/modules/backup-azure" | |
modstgfs "github.com/weaviate/weaviate/modules/backup-filesystem" | |
modstggcs "github.com/weaviate/weaviate/modules/backup-gcs" | |
modstgs3 "github.com/weaviate/weaviate/modules/backup-s3" | |
modgenerativeanyscale "github.com/weaviate/weaviate/modules/generative-anyscale" | |
modgenerativeaws "github.com/weaviate/weaviate/modules/generative-aws" | |
modgenerativecohere "github.com/weaviate/weaviate/modules/generative-cohere" | |
modgenerativeopenai "github.com/weaviate/weaviate/modules/generative-openai" | |
modgenerativepalm "github.com/weaviate/weaviate/modules/generative-palm" | |
modimage "github.com/weaviate/weaviate/modules/img2vec-neural" | |
modbind "github.com/weaviate/weaviate/modules/multi2vec-bind" | |
modclip "github.com/weaviate/weaviate/modules/multi2vec-clip" | |
modner "github.com/weaviate/weaviate/modules/ner-transformers" | |
modqnaopenai "github.com/weaviate/weaviate/modules/qna-openai" | |
modqna "github.com/weaviate/weaviate/modules/qna-transformers" | |
modcentroid "github.com/weaviate/weaviate/modules/ref2vec-centroid" | |
modrerankercohere "github.com/weaviate/weaviate/modules/reranker-cohere" | |
modrerankertransformers "github.com/weaviate/weaviate/modules/reranker-transformers" | |
modsum "github.com/weaviate/weaviate/modules/sum-transformers" | |
modspellcheck "github.com/weaviate/weaviate/modules/text-spellcheck" | |
modtext2vecaws "github.com/weaviate/weaviate/modules/text2vec-aws" | |
modcohere "github.com/weaviate/weaviate/modules/text2vec-cohere" | |
modcontextionary "github.com/weaviate/weaviate/modules/text2vec-contextionary" | |
modgpt4all "github.com/weaviate/weaviate/modules/text2vec-gpt4all" | |
modhuggingface "github.com/weaviate/weaviate/modules/text2vec-huggingface" | |
modjinaai "github.com/weaviate/weaviate/modules/text2vec-jinaai" | |
modopenai "github.com/weaviate/weaviate/modules/text2vec-openai" | |
modtext2vecpalm "github.com/weaviate/weaviate/modules/text2vec-palm" | |
modtransformers "github.com/weaviate/weaviate/modules/text2vec-transformers" | |
"github.com/weaviate/weaviate/usecases/auth/authentication/composer" | |
"github.com/weaviate/weaviate/usecases/backup" | |
"github.com/weaviate/weaviate/usecases/classification" | |
"github.com/weaviate/weaviate/usecases/cluster" | |
"github.com/weaviate/weaviate/usecases/config" | |
"github.com/weaviate/weaviate/usecases/modules" | |
"github.com/weaviate/weaviate/usecases/monitoring" | |
"github.com/weaviate/weaviate/usecases/objects" | |
"github.com/weaviate/weaviate/usecases/replica" | |
"github.com/weaviate/weaviate/usecases/scaler" | |
schemaUC "github.com/weaviate/weaviate/usecases/schema" | |
"github.com/weaviate/weaviate/usecases/schema/migrate" | |
"github.com/weaviate/weaviate/usecases/sharding" | |
"github.com/weaviate/weaviate/usecases/traverser" | |
) | |
const MinimumRequiredContextionaryVersion = "1.0.2" | |
func makeConfigureServer(appState *state.State) func(*http.Server, string, string) { | |
return func(s *http.Server, scheme, addr string) { | |
// Add properties to the config | |
appState.ServerConfig.Hostname = addr | |
appState.ServerConfig.Scheme = scheme | |
} | |
} | |
type vectorRepo interface { | |
objects.BatchVectorRepo | |
traverser.VectorSearcher | |
classification.VectorRepo | |
scaler.BackUpper | |
SetSchemaGetter(schemaUC.SchemaGetter) | |
WaitForStartup(ctx context.Context) error | |
Shutdown(ctx context.Context) error | |
} | |
func getCores() (int, error) { | |
cpuset, err := os.ReadFile("/sys/fs/cgroup/cpuset/cpuset.cpus") | |
if err != nil { | |
return 0, errors.Wrap(err, "read cpuset") | |
} | |
cores := strings.Split(strings.TrimSpace(string(cpuset)), ",") | |
return len(cores), nil | |
} | |
func MakeAppState(ctx context.Context, options *swag.CommandLineOptionsGroup) *state.State { | |
appState := startupRoutine(ctx, options) | |
setupGoProfiling(appState.ServerConfig.Config) | |
if appState.ServerConfig.Config.Monitoring.Enabled { | |
// only monitoring tool supported at the moment is prometheus | |
go func() { | |
mux := http.NewServeMux() | |
mux.Handle("/metrics", promhttp.Handler()) | |
http.ListenAndServe(fmt.Sprintf(":%d", appState.ServerConfig.Config.Monitoring.Port), mux) | |
}() | |
} | |
limitResources(appState) | |
err := registerModules(appState) | |
if err != nil { | |
appState.Logger. | |
WithField("action", "startup").WithError(err). | |
Fatal("modules didn't load") | |
} | |
// now that modules are loaded we can run the remaining config validation | |
// which is module dependent | |
if err := appState.ServerConfig.Config.Validate(appState.Modules); err != nil { | |
appState.Logger. | |
WithField("action", "startup").WithError(err). | |
Fatal("invalid config") | |
} | |
appState.ClusterHttpClient = reasonableHttpClient(appState.ServerConfig.Config.Cluster.AuthConfig) | |
var vectorRepo vectorRepo | |
var vectorMigrator migrate.Migrator | |
var migrator migrate.Migrator | |
if appState.ServerConfig.Config.Monitoring.Enabled { | |
promMetrics := monitoring.GetMetrics() | |
appState.Metrics = promMetrics | |
} | |
// TODO: configure http transport for efficient intra-cluster comm | |
remoteIndexClient := clients.NewRemoteIndex(appState.ClusterHttpClient) | |
remoteNodesClient := clients.NewRemoteNode(appState.ClusterHttpClient) | |
replicationClient := clients.NewReplicationClient(appState.ClusterHttpClient) | |
repo, err := db.New(appState.Logger, db.Config{ | |
ServerVersion: config.ServerVersion, | |
GitHash: config.GitHash, | |
MemtablesFlushIdleAfter: appState.ServerConfig.Config.Persistence.FlushIdleMemtablesAfter, | |
MemtablesInitialSizeMB: 10, | |
MemtablesMaxSizeMB: appState.ServerConfig.Config.Persistence.MemtablesMaxSizeMB, | |
MemtablesMinActiveSeconds: appState.ServerConfig.Config.Persistence.MemtablesMinActiveDurationSeconds, | |
MemtablesMaxActiveSeconds: appState.ServerConfig.Config.Persistence.MemtablesMaxActiveDurationSeconds, | |
RootPath: appState.ServerConfig.Config.Persistence.DataPath, | |
QueryLimit: appState.ServerConfig.Config.QueryDefaults.Limit, | |
QueryMaximumResults: appState.ServerConfig.Config.QueryMaximumResults, | |
QueryNestedRefLimit: appState.ServerConfig.Config.QueryNestedCrossReferenceLimit, | |
MaxImportGoroutinesFactor: appState.ServerConfig.Config.MaxImportGoroutinesFactor, | |
TrackVectorDimensions: appState.ServerConfig.Config.TrackVectorDimensions, | |
ResourceUsage: appState.ServerConfig.Config.ResourceUsage, | |
AvoidMMap: appState.ServerConfig.Config.AvoidMmap, | |
DisableLazyLoadShards: appState.ServerConfig.Config.DisableLazyLoadShards, | |
// Pass dummy replication config with minimum factor 1. Otherwise the | |
// setting is not backward-compatible. The user may have created a class | |
// with factor=1 before the change was introduced. Now their setup would no | |
// longer start up if the required minimum is now higher than 1. We want | |
// the required minimum to only apply to newly created classes - not block | |
// loading existing ones. | |
Replication: replication.GlobalConfig{MinimumFactor: 1}, | |
}, remoteIndexClient, appState.Cluster, remoteNodesClient, replicationClient, appState.Metrics) // TODO client | |
if err != nil { | |
appState.Logger. | |
WithField("action", "startup").WithError(err). | |
Fatal("invalid new DB") | |
} | |
appState.DB = repo | |
vectorMigrator = db.NewMigrator(repo, appState.Logger) | |
vectorRepo = repo | |
migrator = vectorMigrator | |
explorer := traverser.NewExplorer(repo, appState.Logger, appState.Modules, traverser.NewMetrics(appState.Metrics), appState.ServerConfig.Config) | |
schemaRepo := schemarepo.NewStore(appState.ServerConfig.Config.Persistence.DataPath, appState.Logger) | |
if err = schemaRepo.Open(); err != nil { | |
appState.Logger. | |
WithField("action", "startup").WithError(err). | |
Fatal("could not initialize schema repo") | |
os.Exit(1) | |
} | |
localClassifierRepo, err := classifications.NewRepo( | |
appState.ServerConfig.Config.Persistence.DataPath, appState.Logger) | |
if err != nil { | |
appState.Logger. | |
WithField("action", "startup").WithError(err). | |
Fatal("could not initialize classifications repo") | |
os.Exit(1) | |
} | |
// TODO: configure http transport for efficient intra-cluster comm | |
classificationsTxClient := clients.NewClusterClassifications(appState.ClusterHttpClient) | |
classifierRepo := classifications.NewDistributeRepo(classificationsTxClient, | |
appState.Cluster, localClassifierRepo, appState.Logger) | |
appState.ClassificationRepo = classifierRepo | |
scaler := scaler.New(appState.Cluster, vectorRepo, | |
remoteIndexClient, appState.Logger, appState.ServerConfig.Config.Persistence.DataPath) | |
appState.Scaler = scaler | |
// TODO: configure http transport for efficient intra-cluster comm | |
schemaTxClient := clients.NewClusterSchema(appState.ClusterHttpClient) | |
schemaTxPersistence := txstore.NewStore( | |
appState.ServerConfig.Config.Persistence.DataPath, appState.Logger) | |
schemaTxPersistence.SetUmarshalFn(schemaUC.UnmarshalTransaction) | |
if err := schemaTxPersistence.Open(); err != nil { | |
appState.Logger. | |
WithField("action", "startup").WithError(err). | |
Fatal("could not open tx repo") | |
os.Exit(1) | |
} | |
schemaManager, err := schemaUC.NewManager(migrator, schemaRepo, | |
appState.Logger, appState.Authorizer, appState.ServerConfig.Config, | |
vectorIndex.ParseAndValidateConfig, appState.Modules, inverted.ValidateConfig, | |
appState.Modules, appState.Cluster, schemaTxClient, | |
schemaTxPersistence, scaler, | |
) | |
if err != nil { | |
appState.Logger. | |
WithField("action", "startup").WithError(err). | |
Fatal("could not initialize schema manager") | |
os.Exit(1) | |
} | |
appState.SchemaManager = schemaManager | |
appState.RemoteIndexIncoming = sharding.NewRemoteIndexIncoming(repo) | |
appState.RemoteNodeIncoming = sharding.NewRemoteNodeIncoming(repo) | |
appState.RemoteReplicaIncoming = replica.NewRemoteReplicaIncoming(repo) | |
backupManager := backup.NewHandler(appState.Logger, appState.Authorizer, | |
schemaManager, repo, appState.Modules) | |
appState.BackupManager = backupManager | |
go clusterapi.Serve(appState) | |
vectorRepo.SetSchemaGetter(schemaManager) | |
explorer.SetSchemaGetter(schemaManager) | |
appState.Modules.SetSchemaGetter(schemaManager) | |
err = vectorRepo.WaitForStartup(ctx) | |
if err != nil { | |
appState.Logger. | |
WithError(err). | |
WithField("action", "startup"). | |
Fatal("db didn't start up") | |
os.Exit(1) | |
} | |
if err := schemaManager.StartServing(ctx); err != nil { | |
appState.Logger. | |
WithError(err). | |
WithField("action", "startup"). | |
Fatal("schema manager: resume dangling txs") | |
os.Exit(1) | |
} | |
batchManager := objects.NewBatchManager(vectorRepo, appState.Modules, | |
appState.Locks, schemaManager, appState.ServerConfig, appState.Logger, | |
appState.Authorizer, appState.Metrics) | |
appState.BatchManager = batchManager | |
objectsTraverser := traverser.NewTraverser(appState.ServerConfig, appState.Locks, | |
appState.Logger, appState.Authorizer, vectorRepo, explorer, schemaManager, | |
appState.Modules, traverser.NewMetrics(appState.Metrics), | |
appState.ServerConfig.Config.MaximumConcurrentGetRequests) | |
appState.Traverser = objectsTraverser | |
updateSchemaCallback := makeUpdateSchemaCall(appState.Logger, appState, objectsTraverser) | |
schemaManager.RegisterSchemaUpdateCallback(updateSchemaCallback) | |
err = migrator.AdjustFilterablePropSettings(ctx) | |
if err != nil { | |
appState.Logger. | |
WithError(err). | |
WithField("action", "adjustFilterablePropSettings"). | |
Fatal("migration failed") | |
os.Exit(1) | |
} | |
// FIXME to avoid import cycles, tasks are passed as strings | |
reindexTaskNames := []string{} | |
var reindexCtx context.Context | |
reindexCtx, appState.ReindexCtxCancel = context.WithCancel(context.Background()) | |
reindexFinished := make(chan error, 1) | |
if appState.ServerConfig.Config.ReindexSetToRoaringsetAtStartup { | |
reindexTaskNames = append(reindexTaskNames, "ShardInvertedReindexTaskSetToRoaringSet") | |
} | |
if appState.ServerConfig.Config.IndexMissingTextFilterableAtStartup { | |
reindexTaskNames = append(reindexTaskNames, "ShardInvertedReindexTaskMissingTextFilterable") | |
} | |
if len(reindexTaskNames) > 0 { | |
// start reindexing inverted indexes (if requested by user) in the background | |
// allowing db to complete api configuration and start handling requests | |
go func() { | |
appState.Logger. | |
WithField("action", "startup"). | |
Info("Reindexing inverted indexes") | |
reindexFinished <- migrator.InvertedReindex(reindexCtx, reindexTaskNames...) | |
}() | |
} | |
configureServer = makeConfigureServer(appState) | |
// while we accept an overall longer startup, e.g. due to a recovery, we | |
// still want to limit the module startup context, as that's mostly service | |
// discovery / dependency checking | |
moduleCtx, cancel := context.WithTimeout(ctx, 120*time.Second) | |
defer cancel() | |
err = initModules(moduleCtx, appState) | |
if err != nil { | |
appState.Logger. | |
WithField("action", "startup").WithError(err). | |
Fatal("modules didn't initialize") | |
} | |
// manually update schema once | |
schema := schemaManager.GetSchemaSkipAuth() | |
updateSchemaCallback(schema) | |
// Add dimensions to all the objects in the database, if requested by the user | |
if appState.ServerConfig.Config.ReindexVectorDimensionsAtStartup { | |
appState.Logger. | |
WithField("action", "startup"). | |
Info("Reindexing dimensions") | |
migrator.RecalculateVectorDimensions(ctx) | |
} | |
// Add recount properties of all the objects in the database, if requested by the user | |
if appState.ServerConfig.Config.RecountPropertiesAtStartup { | |
migrator.RecountProperties(ctx) | |
} | |
return appState | |
} | |
func configureAPI(api *operations.WeaviateAPI) http.Handler { | |
ctx := context.Background() | |
ctx, cancel := context.WithTimeout(ctx, 60*time.Minute) | |
defer cancel() | |
config.ServerVersion = parseVersionFromSwaggerSpec() | |
appState := MakeAppState(ctx, connectorOptionGroup) | |
api.ServeError = openapierrors.ServeError | |
api.JSONConsumer = runtime.JSONConsumer() | |
api.OidcAuth = composer.New( | |
appState.ServerConfig.Config.Authentication, | |
appState.APIKey, appState.OIDC) | |
api.Logger = func(msg string, args ...interface{}) { | |
appState.Logger.WithField("action", "restapi_management").Infof(msg, args...) | |
} | |
classifier := classification.New(appState.SchemaManager, appState.ClassificationRepo, appState.DB, // the DB is the vectorrepo | |
appState.Authorizer, | |
appState.Logger, appState.Modules) | |
setupSchemaHandlers(api, appState.SchemaManager, appState.Metrics, appState.Logger) | |
objectsManager := objects.NewManager(appState.Locks, | |
appState.SchemaManager, appState.ServerConfig, appState.Logger, | |
appState.Authorizer, appState.DB, appState.Modules, | |
objects.NewMetrics(appState.Metrics)) | |
setupObjectHandlers(api, objectsManager, appState.ServerConfig.Config, appState.Logger, | |
appState.Modules, appState.Metrics) | |
setupObjectBatchHandlers(api, appState.BatchManager, appState.Metrics, appState.Logger) | |
setupGraphQLHandlers(api, appState, appState.SchemaManager, appState.ServerConfig.Config.DisableGraphQL, | |
appState.Metrics, appState.Logger) | |
setupMiscHandlers(api, appState.ServerConfig, appState.SchemaManager, appState.Modules, | |
appState.Metrics, appState.Logger) | |
setupClassificationHandlers(api, classifier, appState.Metrics, appState.Logger) | |
backupScheduler := backup.NewScheduler( | |
appState.Authorizer, | |
clients.NewClusterBackups(appState.ClusterHttpClient), | |
appState.DB, appState.Modules, | |
appState.Cluster, | |
appState.Logger) | |
setupBackupHandlers(api, backupScheduler, appState.Metrics, appState.Logger) | |
setupNodesHandlers(api, appState.SchemaManager, appState.DB, appState) | |
grpcServer := createGrpcServer(appState) | |
setupMiddlewares := makeSetupMiddlewares(appState) | |
setupGlobalMiddleware := makeSetupGlobalMiddleware(appState) | |
api.ServerShutdown = func() { | |
// stop reindexing on server shutdown | |
appState.ReindexCtxCancel() | |
// gracefully stop gRPC server | |
grpcServer.GracefulStop() | |
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) | |
defer cancel() | |
if err := appState.SchemaManager.Shutdown(ctx); err != nil { | |
panic(err) | |
} | |
if err := appState.DB.Shutdown(ctx); err != nil { | |
panic(err) | |
} | |
} | |
startGrpcServer(grpcServer, appState) | |
return setupGlobalMiddleware(api.Serve(setupMiddlewares)) | |
} | |
// TODO: Split up and don't write into global variables. Instead return an appState | |
func startupRoutine(ctx context.Context, options *swag.CommandLineOptionsGroup) *state.State { | |
appState := &state.State{} | |
logger := logger() | |
appState.Logger = logger | |
logger.WithField("action", "startup").WithField("startup_time_left", timeTillDeadline(ctx)). | |
Debug("created startup context, nothing done so far") | |
// Load the config using the flags | |
serverConfig := &config.WeaviateConfig{} | |
appState.ServerConfig = serverConfig | |
err := serverConfig.LoadConfig(options, logger) | |
if err != nil { | |
logger.WithField("action", "startup").WithError(err).Error("could not load config") | |
logger.Exit(1) | |
} | |
monitoring.InitConfig(serverConfig.Config.Monitoring) | |
if serverConfig.Config.DisableGraphQL { | |
logger.WithFields(logrus.Fields{ | |
"action": "startup", | |
"disable_graphql": true, | |
}).Warnf("GraphQL API disabled, relying only on gRPC API for querying. " + | |
"This is considered experimental and will likely experience breaking changes " + | |
"before reaching general availability") | |
} | |
logger.WithFields(logrus.Fields{ | |
"action": "startup", | |
"default_vectorizer_module": serverConfig.Config.DefaultVectorizerModule, | |
}).Infof("the default vectorizer modules is set to %q, as a result all new "+ | |
"schema classes without an explicit vectorizer setting, will use this "+ | |
"vectorizer", serverConfig.Config.DefaultVectorizerModule) | |
logger.WithFields(logrus.Fields{ | |
"action": "startup", | |
"auto_schema_enabled": serverConfig.Config.AutoSchema.Enabled, | |
}).Infof("auto schema enabled setting is set to \"%v\"", serverConfig.Config.AutoSchema.Enabled) | |
logger.WithField("action", "startup").WithField("startup_time_left", timeTillDeadline(ctx)). | |
Debug("config loaded") | |
appState.OIDC = configureOIDC(appState) | |
appState.APIKey = configureAPIKey(appState) | |
appState.AnonymousAccess = configureAnonymousAccess(appState) | |
appState.Authorizer = configureAuthorizer(appState) | |
logger.WithField("action", "startup").WithField("startup_time_left", timeTillDeadline(ctx)). | |
Debug("configured OIDC and anonymous access client") | |
appState.Locks = &dummyLock{} | |
logger.WithField("action", "startup").WithField("startup_time_left", timeTillDeadline(ctx)). | |
Debug("initialized schema") | |
clusterState, err := cluster.Init(serverConfig.Config.Cluster, serverConfig.Config.Persistence.DataPath, logger) | |
if err != nil { | |
logger.WithField("action", "startup").WithError(err). | |
Error("could not init cluster state") | |
logger.Exit(1) | |
} | |
appState.Cluster = clusterState | |
appState.Logger. | |
WithField("action", "startup"). | |
Debug("startup routine complete") | |
return appState | |
} | |
// logger does not parse the regular config object, as logging needs to be | |
// configured before the configuration is even loaded/parsed. We are thus | |
// "manually" reading the desired env vars and set reasonable defaults if they | |
// are not set. | |
// | |
// Defaults to log level info and json format | |
func logger() *logrus.Logger { | |
logger := logrus.New() | |
if os.Getenv("LOG_FORMAT") != "text" { | |
logger.SetFormatter(&logrus.JSONFormatter{}) | |
} | |
switch os.Getenv("LOG_LEVEL") { | |
case "debug": | |
logger.SetLevel(logrus.DebugLevel) | |
case "trace": | |
logger.SetLevel(logrus.TraceLevel) | |
default: | |
logger.SetLevel(logrus.InfoLevel) | |
} | |
return logger | |
} | |
type dummyLock struct{} | |
func (d *dummyLock) LockConnector() (func() error, error) { | |
return func() error { return nil }, nil | |
} | |
func (d *dummyLock) LockSchema() (func() error, error) { | |
return func() error { return nil }, nil | |
} | |
// everything hard-coded right now, to be made dynamic (from go plugins later) | |
func registerModules(appState *state.State) error { | |
appState.Logger. | |
WithField("action", "startup"). | |
Debug("start registering modules") | |
appState.Modules = modules.NewProvider() | |
enabledModules := map[string]bool{} | |
if len(appState.ServerConfig.Config.EnableModules) > 0 { | |
modules := strings.Split(appState.ServerConfig.Config.EnableModules, ",") | |
for _, module := range modules { | |
enabledModules[strings.TrimSpace(module)] = true | |
} | |
} | |
if _, ok := enabledModules["text2vec-contextionary"]; ok { | |
appState.Modules.Register(modcontextionary.New()) | |
appState.Logger. | |
WithField("action", "startup"). | |
WithField("module", "text2vec-contextionary"). | |
Debug("enabled module") | |
} | |
if _, ok := enabledModules["text2vec-transformers"]; ok { | |
appState.Modules.Register(modtransformers.New()) | |
appState.Logger. | |
WithField("action", "startup"). | |
WithField("module", "text2vec-transformers"). | |
Debug("enabled module") | |
} | |
if _, ok := enabledModules[modgpt4all.Name]; ok { | |
appState.Modules.Register(modgpt4all.New()) | |
appState.Logger. | |
WithField("action", "startup"). | |
WithField("module", modgpt4all.Name). | |
Debug("enabled module") | |
} | |
if _, ok := enabledModules[modrerankertransformers.Name]; ok { | |
appState.Modules.Register(modrerankertransformers.New()) | |
appState.Logger. | |
WithField("action", "startup"). | |
WithField("module", modrerankertransformers.Name). | |
Debug("enabled module") | |
} | |
if _, ok := enabledModules[modrerankercohere.Name]; ok { | |
appState.Modules.Register(modrerankercohere.New()) | |
appState.Logger. | |
WithField("action", "startup"). | |
WithField("module", modrerankercohere.Name). | |
Debug("enabled module") | |
} | |
if _, ok := enabledModules["qna-transformers"]; ok { | |
appState.Modules.Register(modqna.New()) | |
appState.Logger. | |
WithField("action", "startup"). | |
WithField("module", "qna-transformers"). | |
Debug("enabled module") | |
} | |
if _, ok := enabledModules["sum-transformers"]; ok { | |
appState.Modules.Register(modsum.New()) | |
appState.Logger. | |
WithField("action", "startup"). | |
WithField("module", "sum-transformers"). | |
Debug("enabled module") | |
} | |
if _, ok := enabledModules["img2vec-neural"]; ok { | |
appState.Modules.Register(modimage.New()) | |
appState.Logger. | |
WithField("action", "startup"). | |
WithField("module", "img2vec-neural"). | |
Debug("enabled module") | |
} | |
if _, ok := enabledModules["ner-transformers"]; ok { | |
appState.Modules.Register(modner.New()) | |
appState.Logger. | |
WithField("action", "startup"). | |
WithField("module", "ner-transformers"). | |
Debug("enabled module") | |
} | |
if _, ok := enabledModules["text-spellcheck"]; ok { | |
appState.Modules.Register(modspellcheck.New()) | |
appState.Logger. | |
WithField("action", "startup"). | |
WithField("module", "text-spellcheck"). | |
Debug("enabled module") | |
} | |
if _, ok := enabledModules["multi2vec-clip"]; ok { | |
appState.Modules.Register(modclip.New()) | |
appState.Logger. | |
WithField("action", "startup"). | |
WithField("module", "multi2vec-clip"). | |
Debug("enabled module") | |
} | |
if _, ok := enabledModules["text2vec-openai"]; ok { | |
appState.Modules.Register(modopenai.New()) | |
appState.Logger. | |
WithField("action", "startup"). | |
WithField("module", "text2vec-openai"). | |
Debug("enabled module") | |
} | |
if _, ok := enabledModules["qna-openai"]; ok { | |
appState.Modules.Register(modqnaopenai.New()) | |
appState.Logger. | |
WithField("action", "startup"). | |
WithField("module", "qna-openai"). | |
Debug("enabled module") | |
} | |
if _, ok := enabledModules[modgenerativecohere.Name]; ok { | |
appState.Modules.Register(modgenerativecohere.New()) | |
appState.Logger. | |
WithField("action", "startup"). | |
WithField("module", modgenerativecohere.Name). | |
Debug("enabled module") | |
} | |
if _, ok := enabledModules[modgenerativeopenai.Name]; ok { | |
appState.Modules.Register(modgenerativeopenai.New()) | |
appState.Logger. | |
WithField("action", "startup"). | |
WithField("module", modgenerativeopenai.Name). | |
Debug("enabled module") | |
} | |
if _, ok := enabledModules[modgenerativeaws.Name]; ok { | |
appState.Modules.Register(modgenerativeaws.New()) | |
appState.Logger. | |
WithField("action", "startup"). | |
WithField("module", modgenerativeaws.Name). | |
Debug("enabled module") | |
} | |
if _, ok := enabledModules[modhuggingface.Name]; ok { | |
appState.Modules.Register(modhuggingface.New()) | |
appState.Logger. | |
WithField("action", "startup"). | |
WithField("module", modhuggingface.Name). | |
Debug("enabled module") | |
} | |
if _, ok := enabledModules[modgenerativepalm.Name]; ok { | |
appState.Modules.Register(modgenerativepalm.New()) | |
appState.Logger. | |
WithField("action", "startup"). | |
WithField("module", modgenerativepalm.Name). | |
Debug("enabled module") | |
} | |
if _, ok := enabledModules[modgenerativeanyscale.Name]; ok { | |
appState.Modules.Register(modgenerativeanyscale.New()) | |
appState.Logger. | |
WithField("action", "startup"). | |
WithField("module", modgenerativeanyscale.Name). | |
Debug("enabled module") | |
} | |
if _, ok := enabledModules[modtext2vecpalm.Name]; ok { | |
appState.Modules.Register(modtext2vecpalm.New()) | |
appState.Logger. | |
WithField("action", "startup"). | |
WithField("module", modtext2vecpalm.Name). | |
Debug("enabled module") | |
} | |
if _, ok := enabledModules[modtext2vecaws.Name]; ok { | |
appState.Modules.Register(modtext2vecaws.New()) | |
appState.Logger. | |
WithField("action", "startup"). | |
WithField("module", modtext2vecaws.Name). | |
Debug("enabled module") | |
} | |
if _, ok := enabledModules[modstgfs.Name]; ok { | |
appState.Modules.Register(modstgfs.New()) | |
appState.Logger. | |
WithField("action", "startup"). | |
WithField("module", modstgfs.Name). | |
Debug("enabled module") | |
} | |
if _, ok := enabledModules[modstgs3.Name]; ok { | |
appState.Modules.Register(modstgs3.New()) | |
appState.Logger. | |
WithField("action", "startup"). | |
WithField("module", modstgs3.Name). | |
Debug("enabled module") | |
} | |
if _, ok := enabledModules[modstggcs.Name]; ok { | |
appState.Modules.Register(modstggcs.New()) | |
appState.Logger. | |
WithField("action", "startup"). | |
WithField("module", modstggcs.Name). | |
Debug("enabled module") | |
} | |
if _, ok := enabledModules[modstgazure.Name]; ok { | |
appState.Modules.Register(modstgazure.New()) | |
appState.Logger. | |
WithField("action", "startup"). | |
WithField("module", modstgazure.Name). | |
Debug("enabled module") | |
} | |
if _, ok := enabledModules[modcentroid.Name]; ok { | |
appState.Modules.Register(modcentroid.New()) | |
appState.Logger. | |
WithField("action", "startup"). | |
WithField("module", modcentroid.Name). | |
Debug("enabled module") | |
} | |
if _, ok := enabledModules[modcohere.Name]; ok { | |
appState.Modules.Register(modcohere.New()) | |
appState.Logger. | |
WithField("action", "startup"). | |
WithField("module", modcohere.Name). | |
Debug("enabled module") | |
} | |
if _, ok := enabledModules[modbind.Name]; ok { | |
appState.Modules.Register(modbind.New()) | |
appState.Logger. | |
WithField("action", "startup"). | |
WithField("module", modbind.Name). | |
Debug("enabled module") | |
} | |
if _, ok := enabledModules[modjinaai.Name]; ok { | |
appState.Modules.Register(modjinaai.New()) | |
appState.Logger. | |
WithField("action", "startup"). | |
WithField("module", modjinaai.Name). | |
Debug("enabled module") | |
} | |
appState.Logger. | |
WithField("action", "startup"). | |
Debug("completed registering modules") | |
return nil | |
} | |
func initModules(ctx context.Context, appState *state.State) error { | |
storageProvider, err := modulestorage.NewRepo( | |
appState.ServerConfig.Config.Persistence.DataPath, appState.Logger) | |
if err != nil { | |
return errors.Wrap(err, "init storage provider") | |
} | |
// TODO: gh-1481 don't pass entire appState in, but only what's needed. Probably only | |
// config? | |
moduleParams := moduletools.NewInitParams(storageProvider, appState, | |
appState.ServerConfig.Config, appState.Logger) | |
appState.Logger. | |
WithField("action", "startup"). | |
Debug("start initializing modules") | |
if err := appState.Modules.Init(ctx, moduleParams, appState.Logger); err != nil { | |
return errors.Wrap(err, "init modules") | |
} | |
appState.Logger. | |
WithField("action", "startup"). | |
Debug("finished initializing modules") | |
return nil | |
} | |
type clientWithAuth struct { | |
r http.RoundTripper | |
basicAuth cluster.BasicAuth | |
} | |
func (c clientWithAuth) RoundTrip(r *http.Request) (*http.Response, error) { | |
r.SetBasicAuth(c.basicAuth.Username, c.basicAuth.Password) | |
return c.r.RoundTrip(r) | |
} | |
func reasonableHttpClient(authConfig cluster.AuthConfig) *http.Client { | |
t := &http.Transport{ | |
Proxy: http.ProxyFromEnvironment, | |
DialContext: (&net.Dialer{ | |
Timeout: 30 * time.Second, | |
KeepAlive: 120 * time.Second, | |
}).DialContext, | |
MaxIdleConnsPerHost: 100, | |
MaxIdleConns: 100, | |
IdleConnTimeout: 90 * time.Second, | |
TLSHandshakeTimeout: 10 * time.Second, | |
ExpectContinueTimeout: 1 * time.Second, | |
} | |
if authConfig.BasicAuth.Enabled() { | |
return &http.Client{Transport: clientWithAuth{r: t, basicAuth: authConfig.BasicAuth}} | |
} | |
return &http.Client{Transport: t} | |
} | |
func setupGoProfiling(config config.Config) { | |
go func() { | |
fmt.Println(http.ListenAndServe(":6060", nil)) | |
}() | |
if config.Profiling.BlockProfileRate > 0 { | |
goruntime.SetBlockProfileRate(config.Profiling.BlockProfileRate) | |
} | |
if config.Profiling.MutexProfileFraction > 0 { | |
goruntime.SetMutexProfileFraction(config.Profiling.MutexProfileFraction) | |
} | |
} | |
func parseVersionFromSwaggerSpec() string { | |
spec := struct { | |
Info struct { | |
Version string `json:"version"` | |
} `json:"info"` | |
}{} | |
err := json.Unmarshal(SwaggerJSON, &spec) | |
if err != nil { | |
panic(err) | |
} | |
return spec.Info.Version | |
} | |
func limitResources(appState *state.State) { | |
if os.Getenv("LIMIT_RESOURCES") == "true" { | |
appState.Logger.Info("Limiting resources: memory: 80%, cores: all but one") | |
if os.Getenv("GOMAXPROCS") == "" { | |
// Fetch the number of cores from the cgroups cpuset | |
// and parse it into an int | |
cores, err := getCores() | |
if err == nil { | |
appState.Logger.WithField("cores", cores). | |
Warn("GOMAXPROCS not set, and unable to read from cgroups, setting to number of cores") | |
goruntime.GOMAXPROCS(cores) | |
} else { | |
cores = goruntime.NumCPU() - 1 | |
if cores > 0 { | |
appState.Logger.WithField("cores", cores). | |
Warnf("Unable to read from cgroups: %v, setting to max cores to: %v", err, cores) | |
goruntime.GOMAXPROCS(cores) | |
} | |
} | |
} | |
limit, err := memlimit.SetGoMemLimit(0.8) | |
if err != nil { | |
appState.Logger.WithError(err).Warnf("Unable to set memory limit from cgroups: %v", err) | |
// Set memory limit to 90% of the available memory | |
limit := int64(float64(memory.TotalMemory()) * 0.8) | |
debug.SetMemoryLimit(limit) | |
appState.Logger.WithField("limit", limit).Info("Set memory limit based on available memory") | |
} else { | |
appState.Logger.WithField("limit", limit).Info("Set memory limit") | |
} | |
} else { | |
appState.Logger.Info("No resource limits set, weaviate will use all available memory and CPU. " + | |
"To limit resources, set LIMIT_RESOURCES=true") | |
} | |
} | |