KevinStephenson
Adding in weaviate code
b110593
raw
history blame
10.5 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package modstggcs
import (
"context"
"fmt"
"io"
"os"
"path"
"strings"
"time"
"cloud.google.com/go/storage"
"github.com/googleapis/gax-go/v2"
"github.com/pkg/errors"
"github.com/weaviate/weaviate/entities/backup"
"github.com/weaviate/weaviate/usecases/monitoring"
"golang.org/x/oauth2/google"
"google.golang.org/api/option"
)
type gcsClient struct {
client *storage.Client
config clientConfig
projectID string
dataPath string
}
func newClient(ctx context.Context, config *clientConfig, dataPath string) (*gcsClient, error) {
options := []option.ClientOption{}
useAuth := strings.ToLower(os.Getenv("BACKUP_GCS_USE_AUTH")) != "false"
if useAuth {
scopes := []string{
"https://www.googleapis.com/auth/devstorage.read_write",
}
creds, err := google.FindDefaultCredentials(ctx, scopes...)
if err != nil {
return nil, errors.Wrap(err, "find default credentials")
}
options = append(options, option.WithCredentials(creds))
} else {
options = append(options, option.WithoutAuthentication())
}
projectID := os.Getenv("GOOGLE_CLOUD_PROJECT")
if len(projectID) == 0 {
projectID = os.Getenv("GCLOUD_PROJECT")
if len(projectID) == 0 {
projectID = os.Getenv("GCP_PROJECT")
}
}
client, err := storage.NewClient(ctx, options...)
if err != nil {
return nil, errors.Wrap(err, "create client")
}
client.SetRetry(storage.WithBackoff(gax.Backoff{
Initial: 2 * time.Second, // Note: the client uses a jitter internally
Max: 60 * time.Second,
Multiplier: 3,
}),
storage.WithPolicy(storage.RetryAlways),
)
return &gcsClient{client, *config, projectID, dataPath}, nil
}
func (g *gcsClient) getObject(ctx context.Context, bucket *storage.BucketHandle,
backupID, objectName string,
) ([]byte, error) {
// Create bucket reader
obj := bucket.Object(objectName)
reader, err := obj.NewReader(ctx)
if err != nil {
if errors.Is(err, storage.ErrObjectNotExist) {
return nil, err
}
return nil, errors.Wrapf(err, "new reader: %v", objectName)
}
// Read file contents
content, err := io.ReadAll(reader)
if err != nil {
return nil, errors.Wrapf(err, "read object: %v", objectName)
}
metric, err := monitoring.GetMetrics().BackupRestoreDataTransferred.GetMetricWithLabelValues(Name, "class")
if err == nil {
metric.Add(float64(len(content)))
}
return content, nil
}
func (g *gcsClient) HomeDir(backupID string) string {
return "gs://" + path.Join(g.config.Bucket,
g.makeObjectName(backupID))
}
func (g *gcsClient) findBucket(ctx context.Context) (*storage.BucketHandle, error) {
bucket := g.client.Bucket(g.config.Bucket)
if _, err := bucket.Attrs(ctx); err != nil {
return nil, err
}
return bucket, nil
}
func (g *gcsClient) makeObjectName(parts ...string) string {
base := path.Join(parts...)
return path.Join(g.config.BackupPath, base)
}
func (g *gcsClient) GetObject(ctx context.Context, backupID, key string) ([]byte, error) {
objectName := g.makeObjectName(backupID, key)
if err := ctx.Err(); err != nil {
return nil, backup.NewErrContextExpired(errors.Wrapf(err, "get object '%s'", objectName))
}
bucket, err := g.findBucket(ctx)
if err != nil {
if errors.Is(err, storage.ErrBucketNotExist) {
return nil, backup.NewErrNotFound(errors.Wrapf(err, "get object '%s'", objectName))
}
return nil, backup.NewErrInternal(errors.Wrapf(err, "get object '%s'", objectName))
}
contents, err := g.getObject(ctx, bucket, backupID, objectName)
if err != nil {
if errors.Is(err, storage.ErrObjectNotExist) {
return nil, backup.NewErrNotFound(errors.Wrapf(err, "get object '%s'", objectName))
}
return nil, backup.NewErrInternal(errors.Wrapf(err, "get object '%s'", objectName))
}
return contents, nil
}
// PutFile creates an object with contents from file at filePath.
func (g *gcsClient) PutFile(ctx context.Context, backupID, key, srcPath string) error {
bucket, err := g.findBucket(ctx)
if err != nil {
return fmt.Errorf("find bucket: %w", err)
}
// open source file
filePath := path.Join(g.dataPath, srcPath)
file, err := os.Open(filePath)
if err != nil {
return fmt.Errorf("os.open %q: %w", filePath, err)
}
defer file.Close()
// create a new writer
object := g.makeObjectName(backupID, key)
writer := bucket.Object(object).NewWriter(ctx)
writer.ContentType = "application/octet-stream"
writer.Metadata = map[string]string{"backup-id": backupID}
// if we return early make sure writer is closed
closeWriter := true
defer func() {
if closeWriter {
writer.Close()
}
}()
nBytes, err := io.Copy(writer, file)
if err != nil {
return fmt.Errorf("io.copy %q %q: %w", object, filePath, err)
}
closeWriter = false
if err := writer.Close(); err != nil {
return fmt.Errorf("writer.close %q: %w", filePath, err)
}
metric, err := monitoring.GetMetrics().BackupStoreDataTransferred.GetMetricWithLabelValues("backup-gcs", "class")
if err == nil {
metric.Add(float64(nBytes))
}
return nil
}
func (g *gcsClient) PutObject(ctx context.Context, backupID, key string, byes []byte) error {
bucket, err := g.findBucket(ctx)
if err != nil {
return errors.Wrap(err, "find bucket")
}
objectName := g.makeObjectName(backupID, key)
obj := bucket.Object(objectName)
writer := obj.NewWriter(ctx)
writer.ContentType = "application/octet-stream"
writer.Metadata = map[string]string{
"backup-id": backupID,
}
if _, err := writer.Write(byes); err != nil {
return errors.Wrapf(err, "write file: %v", objectName)
}
if err := writer.Close(); err != nil {
return errors.Wrapf(err, "close writer for file: %v", objectName)
}
metric, err := monitoring.GetMetrics().BackupStoreDataTransferred.GetMetricWithLabelValues("backup-gcs", "class")
if err == nil {
metric.Add(float64(len(byes)))
}
return nil
}
func (g *gcsClient) Initialize(ctx context.Context, backupID string) error {
key := "access-check"
if err := g.PutObject(ctx, backupID, key, []byte("")); err != nil {
return errors.Wrap(err, "failed to access-check gcs backup module")
}
bucket, err := g.findBucket(ctx)
if err != nil {
return errors.Wrap(err, "find bucket")
}
objectName := g.makeObjectName(backupID, key)
if err := bucket.Object(objectName).Delete(ctx); err != nil {
return errors.Wrap(err, "failed to remove access-check gcs backup module")
}
return nil
}
// WriteToFile downloads an object and store its content in destPath
// The file destPath will be created if it doesn't exit
func (g *gcsClient) WriteToFile(ctx context.Context, backupID, key, destPath string) (err error) {
bucket, err := g.findBucket(ctx)
if err != nil {
return fmt.Errorf("find bucket: %w", err)
}
// validate destination path
if st, err := os.Stat(destPath); err == nil {
if st.IsDir() {
return fmt.Errorf("file is a directory")
}
} else if !os.IsNotExist(err) {
return err
}
// create empty file
dir := path.Dir(destPath)
if err := os.MkdirAll(dir, os.ModePerm); err != nil {
return fmt.Errorf("os.mkdir %q: %w", dir, err)
}
file, err := os.Create(destPath)
if err != nil {
return fmt.Errorf("os.create %q: %w", destPath, err)
}
// make sure to close and delete in case we return early
closeAndRemove := true
defer func() {
if closeAndRemove {
file.Close()
os.Remove(destPath)
}
}()
// create reader
object := g.makeObjectName(backupID, key)
rc, err := bucket.Object(object).NewReader(ctx)
if err != nil {
return fmt.Errorf("find object %q: %w", object, err)
}
defer rc.Close()
// transfer content to the file
if _, err := io.Copy(file, rc); err != nil {
return fmt.Errorf("io.Copy:%q %q: %w", destPath, object, err)
}
closeAndRemove = false
if err = file.Close(); err != nil {
return fmt.Errorf("f.Close %q: %w", destPath, err)
}
return nil
}
func (g *gcsClient) Write(ctx context.Context, backupID, key string, r io.ReadCloser) (int64, error) {
defer r.Close()
bucket, err := g.findBucket(ctx)
if err != nil {
return 0, fmt.Errorf("find bucket: %w", err)
}
// create a new writer
path := g.makeObjectName(backupID, key)
writer := bucket.Object(path).NewWriter(ctx)
writer.ContentType = "application/octet-stream"
writer.Metadata = map[string]string{"backup-id": backupID}
// if we return early make sure writer is closed
closeWriter := true
defer func() {
if closeWriter {
writer.Close()
}
}()
// copy
written, err := io.Copy(writer, r)
if err != nil {
return 0, fmt.Errorf("io.copy %q: %w", path, err)
}
closeWriter = false
if err := writer.Close(); err != nil {
return 0, fmt.Errorf("writer.close %q: %w", path, err)
}
if metric, err := monitoring.GetMetrics().BackupStoreDataTransferred.
GetMetricWithLabelValues(Name, "class"); err == nil {
metric.Add(float64(written))
}
return written, nil
}
func (g *gcsClient) Read(ctx context.Context, backupID, key string, w io.WriteCloser) (int64, error) {
defer w.Close()
bucket, err := g.findBucket(ctx)
if err != nil {
err = fmt.Errorf("find bucket: %w", err)
if errors.Is(err, storage.ErrObjectNotExist) {
err = backup.NewErrNotFound(err)
}
return 0, err
}
// create reader
path := g.makeObjectName(backupID, key)
rc, err := bucket.Object(path).NewReader(ctx)
if err != nil {
err = fmt.Errorf("find object %s: %v", path, err)
if errors.Is(err, storage.ErrObjectNotExist) {
err = backup.NewErrNotFound(err)
}
return 0, err
}
defer rc.Close()
// copy
read, err := io.Copy(w, rc)
if err != nil {
return read, fmt.Errorf("io.copy %q: %w", path, err)
}
if metric, err := monitoring.GetMetrics().BackupRestoreDataTransferred.
GetMetricWithLabelValues(Name, "class"); err == nil {
metric.Add(float64(float64(read)))
}
return read, nil
}
func (g *gcsClient) SourceDataPath() string {
return g.dataPath
}