KevinStephenson
Adding in weaviate code
b110593
raw
history blame
6.74 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package modstgfs
import (
"context"
"fmt"
"io"
"os"
"path"
"path/filepath"
"github.com/pkg/errors"
"github.com/weaviate/weaviate/entities/backup"
"github.com/weaviate/weaviate/usecases/monitoring"
)
func (m *Module) GetObject(ctx context.Context, backupID, key string) ([]byte, error) {
metaPath, err := m.getObjectPath(ctx, backupID, key)
if err != nil {
return nil, err
}
contents, err := os.ReadFile(metaPath)
if err != nil {
return nil, backup.NewErrInternal(errors.Wrapf(err, "get object '%s'", metaPath))
}
metric, err := monitoring.GetMetrics().BackupRestoreDataTransferred.GetMetricWithLabelValues(m.Name(), "class")
if err == nil {
metric.Add(float64(len(contents)))
}
return contents, nil
}
func (m *Module) getObjectPath(ctx context.Context, backupID, key string) (string, error) {
metaPath := filepath.Join(m.backupsPath, backupID, key)
if err := ctx.Err(); err != nil {
return "", backup.NewErrContextExpired(errors.Wrapf(err, "get object '%s'", metaPath))
}
if _, err := os.Stat(metaPath); errors.Is(err, os.ErrNotExist) {
return "", backup.NewErrNotFound(errors.Wrapf(err, "get object '%s'", metaPath))
} else if err != nil {
return "", backup.NewErrInternal(errors.Wrapf(err, "get object '%s'", metaPath))
}
return metaPath, nil
}
func (m *Module) PutFile(ctx context.Context, backupID, key, srcPath string) error {
sourcePath := path.Join(m.dataPath, srcPath)
backupPath := path.Join(m.makeBackupDirPath(backupID), key)
bytesWritten, err := m.copyFile(sourcePath, backupPath)
if err != nil {
return err
}
metric, err := monitoring.GetMetrics().BackupStoreDataTransferred.GetMetricWithLabelValues(m.Name(), "class")
if err == nil {
metric.Add(float64(bytesWritten))
}
return nil
}
func (m *Module) copyFile(sourcePath, destinationPath string) (int64, error) {
source, err := os.Open(sourcePath)
defer func() error {
return source.Close()
}()
if err != nil {
return 0, errors.Wrapf(err, "open file '%s'", sourcePath)
}
if _, err := os.Stat(destinationPath); err != nil {
if err := os.MkdirAll(path.Dir(destinationPath), os.ModePerm); err != nil {
return 0, errors.Wrapf(err, "make dir '%s'", destinationPath)
}
}
destination, err := os.Create(destinationPath)
defer func() error {
return destination.Close()
}()
if err != nil {
return 0, errors.Wrapf(err, "create destination file '%s'", destinationPath)
}
written, err := io.Copy(destination, source)
if err != nil {
return 0, errors.Wrapf(err, "copy file from '%s' to '%s'", sourcePath, destinationPath)
}
return written, nil
}
func (m *Module) PutObject(ctx context.Context, backupID, key string, byes []byte) error {
backupPath := path.Join(m.makeBackupDirPath(backupID), key)
dir := path.Dir(backupPath)
if err := os.MkdirAll(dir, os.ModePerm); err != nil {
return errors.Wrapf(err, "make dir '%s'", dir)
}
if err := os.WriteFile(backupPath, byes, os.ModePerm); err != nil {
return errors.Wrapf(err, "write file '%s'", backupPath)
}
metric, err := monitoring.GetMetrics().BackupStoreDataTransferred.GetMetricWithLabelValues(m.Name(), "class")
if err == nil {
metric.Add(float64(len(byes)))
}
return nil
}
func (m *Module) Initialize(ctx context.Context, backupID string) error {
// TODO: does anything need to be done here?
return nil
}
func (m *Module) WriteToFile(ctx context.Context, backupID, key, destPath string) error {
sourcePath, err := m.getObjectPath(ctx, backupID, key)
if err != nil {
return err
}
bytesWritten, err := m.copyFile(sourcePath, destPath)
if err != nil {
return err
}
metric, err := monitoring.GetMetrics().BackupRestoreDataTransferred.GetMetricWithLabelValues(m.Name(), "class")
if err == nil {
metric.Add(float64(bytesWritten))
}
return nil
}
func (m *Module) Write(ctx context.Context, backupID, key string, r io.ReadCloser) (int64, error) {
defer r.Close()
backupPath := path.Join(m.makeBackupDirPath(backupID), key)
dir := path.Dir(backupPath)
if err := os.MkdirAll(dir, os.ModePerm); err != nil {
return 0, fmt.Errorf("make dir %q: %w", dir, err)
}
f, err := os.OpenFile(backupPath, os.O_RDWR|os.O_CREATE, os.ModePerm)
if err != nil {
return 0, fmt.Errorf("open file %q: %w", backupPath, err)
}
defer f.Close()
written, err := io.Copy(f, r)
if err != nil {
return 0, fmt.Errorf("write file %q: %w", backupPath, err)
}
if metric, err := monitoring.GetMetrics().BackupStoreDataTransferred.
GetMetricWithLabelValues(m.Name(), "class"); err == nil {
metric.Add(float64(written))
}
return written, err
}
func (m *Module) Read(ctx context.Context, backupID, key string, w io.WriteCloser) (int64, error) {
defer w.Close()
sourcePath, err := m.getObjectPath(ctx, backupID, key)
if err != nil {
return 0, fmt.Errorf("source path %s/%s: %w", backupID, key, err)
}
// open file
f, err := os.Open(sourcePath)
if err != nil {
return 0, fmt.Errorf("open file %q: %w", sourcePath, err)
}
defer f.Close()
// copy file
read, err := io.Copy(w, f)
if err != nil {
return 0, fmt.Errorf("write : %w", err)
}
if metric, err := monitoring.GetMetrics().BackupRestoreDataTransferred.
GetMetricWithLabelValues(m.Name(), "class"); err == nil {
metric.Add(float64(read))
}
return read, err
}
func (m *Module) SourceDataPath() string {
return m.dataPath
}
func (m *Module) initBackupBackend(ctx context.Context, backupsPath string) error {
if backupsPath == "" {
return fmt.Errorf("empty backup path provided")
}
backupsPath = filepath.Clean(backupsPath)
if !filepath.IsAbs(backupsPath) {
return fmt.Errorf("relative backup path provided")
}
if err := m.createBackupsDir(backupsPath); err != nil {
return errors.Wrap(err, "invalid backup path provided")
}
m.backupsPath = backupsPath
return nil
}
func (m *Module) createBackupsDir(backupsPath string) error {
if err := os.MkdirAll(backupsPath, os.ModePerm); err != nil {
m.logger.WithField("module", m.Name()).
WithField("action", "create_backups_dir").
WithError(err).
Errorf("failed creating backups directory %v", backupsPath)
return backup.NewErrInternal(errors.Wrap(err, "make backups dir"))
}
return nil
}