Spaces:
Sleeping
Sleeping
| // _ _ | |
| // __ _____ __ ___ ___ __ _| |_ ___ | |
| // \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
| // \ V V / __/ (_| |\ V /| | (_| | || __/ | |
| // \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
| // | |
| // Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. | |
| // | |
| // CONTACT: [email protected] | |
| // | |
| package test | |
| import ( | |
| "context" | |
| "encoding/json" | |
| "fmt" | |
| "os" | |
| "path/filepath" | |
| "testing" | |
| "time" | |
| "github.com/pkg/errors" | |
| "github.com/sirupsen/logrus" | |
| logrustest "github.com/sirupsen/logrus/hooks/test" | |
| "github.com/stretchr/testify/assert" | |
| "github.com/stretchr/testify/require" | |
| "github.com/weaviate/weaviate/entities/backup" | |
| "github.com/weaviate/weaviate/entities/moduletools" | |
| mod "github.com/weaviate/weaviate/modules/backup-gcs" | |
| "github.com/weaviate/weaviate/test/docker" | |
| moduleshelper "github.com/weaviate/weaviate/test/helper/modules" | |
| ubak "github.com/weaviate/weaviate/usecases/backup" | |
| "github.com/weaviate/weaviate/usecases/config" | |
| ) | |
| func Test_GCSBackend_Backup(t *testing.T) { | |
| ctx := context.Background() | |
| compose, err := docker.New().WithGCS().Start(ctx) | |
| if err != nil { | |
| t.Fatal(errors.Wrapf(err, "cannot start")) | |
| } | |
| t.Setenv(envGCSEndpoint, compose.GetGCS().URI()) | |
| t.Run("store backup meta", moduleLevelStoreBackupMeta) | |
| t.Run("copy objects", moduleLevelCopyObjects) | |
| t.Run("copy files", moduleLevelCopyFiles) | |
| if err := compose.Terminate(ctx); err != nil { | |
| t.Fatal(errors.Wrapf(err, "failed to terminate test containers")) | |
| } | |
| } | |
| func moduleLevelStoreBackupMeta(t *testing.T) { | |
| testCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) | |
| defer cancel() | |
| dataDir := t.TempDir() | |
| className := "BackupClass" | |
| backupID := "backup_id" | |
| bucketName := "bucket" | |
| projectID := "project-id" | |
| endpoint := os.Getenv(envGCSEndpoint) | |
| metadataFilename := "backup.json" | |
| gcsUseAuth := "false" | |
| t.Log("setup env") | |
| t.Setenv(envGCSEndpoint, endpoint) | |
| t.Setenv(envGCSStorageEmulatorHost, endpoint) | |
| t.Setenv(envGCSCredentials, "") | |
| t.Setenv(envGCSProjectID, projectID) | |
| t.Setenv(envGCSBucket, bucketName) | |
| t.Setenv(envGCSUseAuth, gcsUseAuth) | |
| moduleshelper.CreateGCSBucket(testCtx, t, projectID, bucketName) | |
| t.Run("store backup meta in gcs", func(t *testing.T) { | |
| t.Setenv("BACKUP_GCS_BUCKET", bucketName) | |
| gcs := mod.New() | |
| err := gcs.Init(testCtx, newFakeModuleParams(dataDir)) | |
| require.Nil(t, err) | |
| t.Run("access permissions", func(t *testing.T) { | |
| err := gcs.Initialize(testCtx, backupID) | |
| assert.Nil(t, err) | |
| }) | |
| t.Run("backup meta does not exist yet", func(t *testing.T) { | |
| meta, err := gcs.GetObject(testCtx, backupID, metadataFilename) | |
| assert.Nil(t, meta) | |
| assert.NotNil(t, err) | |
| assert.IsType(t, backup.ErrNotFound{}, err) | |
| }) | |
| t.Run("put backup meta on backend", func(t *testing.T) { | |
| desc := &backup.BackupDescriptor{ | |
| StartedAt: time.Now(), | |
| CompletedAt: time.Time{}, | |
| ID: backupID, | |
| Classes: []backup.ClassDescriptor{ | |
| { | |
| Name: className, | |
| }, | |
| }, | |
| Status: string(backup.Started), | |
| Version: ubak.Version, | |
| } | |
| b, err := json.Marshal(desc) | |
| require.Nil(t, err) | |
| err = gcs.PutObject(testCtx, backupID, metadataFilename, b) | |
| require.Nil(t, err) | |
| dest := gcs.HomeDir(backupID) | |
| expected := fmt.Sprintf("gs://%s/%s", bucketName, backupID) | |
| assert.Equal(t, expected, dest) | |
| }) | |
| t.Run("assert backup meta contents", func(t *testing.T) { | |
| obj, err := gcs.GetObject(testCtx, backupID, metadataFilename) | |
| require.Nil(t, err) | |
| var meta backup.BackupDescriptor | |
| err = json.Unmarshal(obj, &meta) | |
| require.Nil(t, err) | |
| assert.NotEmpty(t, meta.StartedAt) | |
| assert.Empty(t, meta.CompletedAt) | |
| assert.Equal(t, meta.Status, string(backup.Started)) | |
| assert.Empty(t, meta.Error) | |
| assert.Len(t, meta.Classes, 1) | |
| assert.Equal(t, meta.Classes[0].Name, className) | |
| assert.Equal(t, meta.Version, ubak.Version) | |
| assert.Nil(t, meta.Classes[0].Error) | |
| }) | |
| }) | |
| } | |
| func moduleLevelCopyObjects(t *testing.T) { | |
| testCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) | |
| defer cancel() | |
| dataDir := t.TempDir() | |
| key := "moduleLevelCopyObjects" | |
| backupID := "backup_id" | |
| bucketName := "bucket" | |
| projectID := "project-id" | |
| endpoint := os.Getenv(envGCSEndpoint) | |
| gcsUseAuth := "false" | |
| t.Log("setup env") | |
| t.Setenv(envGCSEndpoint, endpoint) | |
| t.Setenv(envGCSStorageEmulatorHost, endpoint) | |
| t.Setenv(envGCSCredentials, "") | |
| t.Setenv(envGCSProjectID, projectID) | |
| t.Setenv(envGCSBucket, bucketName) | |
| t.Setenv(envGCSUseAuth, gcsUseAuth) | |
| moduleshelper.CreateGCSBucket(testCtx, t, projectID, bucketName) | |
| t.Run("copy objects", func(t *testing.T) { | |
| t.Setenv("BACKUP_GCS_BUCKET", bucketName) | |
| gcs := mod.New() | |
| err := gcs.Init(testCtx, newFakeModuleParams(dataDir)) | |
| require.Nil(t, err) | |
| t.Run("put object to bucket", func(t *testing.T) { | |
| err := gcs.PutObject(testCtx, backupID, key, []byte("hello")) | |
| assert.Nil(t, err) | |
| }) | |
| t.Run("get object from bucket", func(t *testing.T) { | |
| meta, err := gcs.GetObject(testCtx, backupID, key) | |
| assert.Nil(t, err) | |
| assert.Equal(t, []byte("hello"), meta) | |
| }) | |
| }) | |
| } | |
| func moduleLevelCopyFiles(t *testing.T) { | |
| testCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) | |
| defer cancel() | |
| dataDir := t.TempDir() | |
| key := "moduleLevelCopyFiles" | |
| backupID := "backup_id" | |
| bucketName := "bucket" | |
| projectID := "project-id" | |
| endpoint := os.Getenv(envGCSEndpoint) | |
| gcsUseAuth := "false" | |
| t.Log("setup env") | |
| t.Setenv(envGCSEndpoint, endpoint) | |
| t.Setenv(envGCSStorageEmulatorHost, endpoint) | |
| t.Setenv(envGCSCredentials, "") | |
| t.Setenv(envGCSProjectID, projectID) | |
| t.Setenv(envGCSBucket, bucketName) | |
| t.Setenv(envGCSUseAuth, gcsUseAuth) | |
| moduleshelper.CreateGCSBucket(testCtx, t, projectID, bucketName) | |
| t.Run("copy files", func(t *testing.T) { | |
| fpaths := moduleshelper.CreateTestFiles(t, dataDir) | |
| fpath := fpaths[0] | |
| expectedContents, err := os.ReadFile(fpath) | |
| require.Nil(t, err) | |
| require.NotNil(t, expectedContents) | |
| t.Setenv("BACKUP_GCS_BUCKET", bucketName) | |
| gcs := mod.New() | |
| err = gcs.Init(testCtx, newFakeModuleParams(dataDir)) | |
| require.Nil(t, err) | |
| t.Run("verify source data path", func(t *testing.T) { | |
| assert.Equal(t, dataDir, gcs.SourceDataPath()) | |
| }) | |
| t.Run("copy file to backend", func(t *testing.T) { | |
| srcPath, _ := filepath.Rel(dataDir, fpath) | |
| err := gcs.PutFile(testCtx, backupID, key, srcPath) | |
| require.Nil(t, err) | |
| contents, err := gcs.GetObject(testCtx, backupID, key) | |
| require.Nil(t, err) | |
| assert.Equal(t, expectedContents, contents) | |
| }) | |
| t.Run("fetch file from backend", func(t *testing.T) { | |
| destPath := dataDir + "/file_0.copy.db" | |
| err := gcs.WriteToFile(testCtx, backupID, key, destPath) | |
| require.Nil(t, err) | |
| contents, err := os.ReadFile(destPath) | |
| require.Nil(t, err) | |
| assert.Equal(t, expectedContents, contents) | |
| }) | |
| }) | |
| } | |
| type fakeModuleParams struct { | |
| logger logrus.FieldLogger | |
| provider fakeStorageProvider | |
| config config.Config | |
| } | |
| func newFakeModuleParams(dataPath string) *fakeModuleParams { | |
| logger, _ := logrustest.NewNullLogger() | |
| return &fakeModuleParams{ | |
| logger: logger, | |
| provider: fakeStorageProvider{dataPath: dataPath}, | |
| } | |
| } | |
| func (f *fakeModuleParams) GetStorageProvider() moduletools.StorageProvider { | |
| return &f.provider | |
| } | |
| func (f *fakeModuleParams) GetAppState() interface{} { | |
| return nil | |
| } | |
| func (f *fakeModuleParams) GetLogger() logrus.FieldLogger { | |
| return f.logger | |
| } | |
| func (f *fakeModuleParams) GetConfig() config.Config { | |
| return f.config | |
| } | |
| type fakeStorageProvider struct { | |
| dataPath string | |
| } | |
| func (f *fakeStorageProvider) Storage(name string) (moduletools.Storage, error) { | |
| return nil, nil | |
| } | |
| func (f *fakeStorageProvider) DataPath() string { | |
| return f.dataPath | |
| } | |