KevinStephenson
Adding in weaviate code
b110593
raw
history blame
7.87 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package backup
import (
"archive/tar"
"bytes"
"compress/gzip"
"context"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
"sync/atomic"
"time"
"github.com/weaviate/weaviate/entities/backup"
)
// CompressionLevel represents supported compression level
type CompressionLevel int
const (
DefaultCompression CompressionLevel = iota
BestSpeed
BestCompression
)
type zip struct {
sourcePath string
w *tar.Writer
gzw *gzip.Writer
pipeWriter *io.PipeWriter
counter func() int64
}
func NewZip(sourcePath string, level int) (zip, io.ReadCloser) {
pr, pw := io.Pipe()
gzw, _ := gzip.NewWriterLevel(pw, zipLevel(level))
reader := &readCloser{src: pr, n: 0}
return zip{
sourcePath: sourcePath,
gzw: gzw,
w: tar.NewWriter(gzw),
pipeWriter: pw,
counter: reader.counter(),
}, reader
}
func (z *zip) Close() error {
var err1, err2, err3 error
err1 = z.w.Close()
err2 = z.gzw.Close()
if err := z.pipeWriter.Close(); err != nil && err != io.ErrClosedPipe {
err3 = err
}
if err1 != nil || err2 != nil || err3 != nil {
return fmt.Errorf("tar: %w, gzip: %w, pw: %w", err1, err2, err3)
}
return nil
}
// WriteShard writes shard internal files including in memory files stored in sd
func (z *zip) WriteShard(ctx context.Context, sd *backup.ShardDescriptor) (written int64, err error) {
var n int64 // temporary written bytes
for _, x := range [3]struct {
relPath string
data []byte
modTime time.Time
}{
{relPath: sd.DocIDCounterPath, data: sd.DocIDCounter},
{relPath: sd.PropLengthTrackerPath, data: sd.PropLengthTracker},
{relPath: sd.ShardVersionPath, data: sd.Version},
} {
info := vFileInfo{
name: filepath.Base(x.relPath),
size: len(x.data),
}
if n, err = z.writeOne(info, x.relPath, bytes.NewReader(x.data)); err != nil {
return written, err
}
written += n
}
n, err = z.WriteRegulars(ctx, sd.Files)
written += n
return
}
func (z *zip) WriteRegulars(ctx context.Context, relPaths []string) (written int64, err error) {
for _, relPath := range relPaths {
if filepath.Base(relPath) == ".DS_Store" {
continue
}
if err := ctx.Err(); err != nil {
return written, err
}
n, err := z.WriteRegular(relPath)
if err != nil {
return written, err
}
written += n
}
return written, nil
}
func (z *zip) WriteRegular(relPath string) (written int64, err error) {
// open file for read
absPath := filepath.Join(z.sourcePath, relPath)
info, err := os.Stat(absPath)
if err != nil {
return written, fmt.Errorf("stat: %w", err)
}
if !info.Mode().IsRegular() {
return 0, nil // ignore directories
}
f, err := os.Open(absPath)
if err != nil {
return written, fmt.Errorf("open: %w", err)
}
defer f.Close()
return z.writeOne(info, relPath, f)
}
func (z *zip) writeOne(info fs.FileInfo, relPath string, r io.Reader) (written int64, err error) {
// write info header
header, err := tar.FileInfoHeader(info, info.Name())
if err != nil {
return written, fmt.Errorf("file header: %w", err)
}
header.Name = relPath
header.ChangeTime = info.ModTime()
if err := z.w.WriteHeader(header); err != nil {
return written, fmt.Errorf("write header %s: %w", relPath, err)
}
// write bytes
written, err = io.Copy(z.w, r)
if err != nil {
return written, fmt.Errorf("copy: %s %w", relPath, err)
}
return
}
// lastWritten number of bytes
func (z *zip) lastWritten() int64 {
return z.counter()
}
type unzip struct {
destPath string
gzr *gzip.Reader
r *tar.Reader
pipeReader *io.PipeReader
}
func NewUnzip(dst string) (unzip, io.WriteCloser) {
pr, pw := io.Pipe()
return unzip{
destPath: dst,
pipeReader: pr,
}, pw
}
func (u *unzip) init() error {
if u.gzr != nil {
return nil
}
gz, err := gzip.NewReader(u.pipeReader)
if err != nil {
return fmt.Errorf("gzip.NewReader: %w", err)
}
u.gzr = gz
u.r = tar.NewReader(gz)
return nil
}
func (u *unzip) Close() (err error) {
var err1, err2 error
if err := u.pipeReader.Close(); err != nil && err != io.ErrClosedPipe {
err1 = err
}
if u.gzr != nil {
err2 = u.gzr.Close()
}
if err1 != nil || err2 != nil {
return fmt.Errorf("close pr: %w, gunzip: %w", err1, err2)
}
return nil
}
func (u *unzip) ReadChunk() (written int64, err error) {
if err := u.init(); err != nil {
return 0, err
}
parentPath := ""
for {
header, err := u.r.Next()
if err != nil {
if err == io.EOF { // end of the loop
return written, nil
}
return written, fmt.Errorf("fetch next: %w", err)
}
if header == nil {
continue
}
// target file
target := filepath.Join(u.destPath, header.Name)
switch header.Typeflag {
case tar.TypeDir:
if err := os.MkdirAll(target, 0o755); err != nil {
return written, fmt.Errorf("crateDir %s: %w", target, err)
}
case tar.TypeReg:
if pp := filepath.Dir(target); pp != parentPath {
parentPath = pp
if err := os.MkdirAll(parentPath, 0o755); err != nil {
return written, fmt.Errorf("crateDir %s: %w", target, err)
}
}
n, err := copyFile(target, header, u.r)
if err != nil {
return written, fmt.Errorf("copy file %s: %w", target, err)
}
written += n
}
}
}
func copyFile(target string, h *tar.Header, r io.Reader) (written int64, err error) {
f, err := os.OpenFile(target, os.O_CREATE|os.O_RDWR, os.FileMode(h.Mode))
if err != nil {
return written, fmt.Errorf("create: %w", err)
}
defer f.Close()
written, err = io.Copy(f, r)
if err != nil {
return written, fmt.Errorf("copy: %w", err)
}
return written, nil
}
type vFileInfo struct {
name string
size int
modTime time.Time // TODO: get it when parsing source files
}
func (v vFileInfo) Name() string { return v.name }
func (v vFileInfo) Size() int64 { return int64(v.size) }
func (v vFileInfo) Mode() os.FileMode { return 0o644 }
func (v vFileInfo) ModTime() time.Time { return v.modTime }
func (v vFileInfo) IsDir() bool { return false }
func (v vFileInfo) Sys() interface{} { return nil }
type readCloser struct {
src io.ReadCloser
n int64
}
func (r *readCloser) Read(p []byte) (n int, err error) {
n, err = r.src.Read(p)
atomic.AddInt64(&r.n, int64(n))
return
}
func (r *readCloser) Close() error { return r.src.Close() }
func (r *readCloser) counter() func() int64 {
return func() int64 {
return atomic.LoadInt64(&r.n)
}
}
func zipLevel(level int) int {
if level < 0 || level > 3 {
return gzip.DefaultCompression
}
switch CompressionLevel(level) {
case BestSpeed:
return gzip.BestSpeed
case BestCompression:
return gzip.BestCompression
default:
return gzip.DefaultCompression
}
}
type zipConfig struct {
Level int
GoPoolSize int
ChunkSize int
}
func newZipConfig(c Compression) zipConfig {
// convert from MB to byte because input already
// in MB and validated against min:2 max:512
switch c.ChunkSize = c.ChunkSize * 1024 * 1024; {
case c.ChunkSize == 0:
c.ChunkSize = DefaultChunkSize
case c.ChunkSize > maxChunkSize:
c.ChunkSize = maxChunkSize
case c.ChunkSize < minChunkSize:
c.ChunkSize = minChunkSize
}
return zipConfig{
Level: int(c.Level),
GoPoolSize: routinePoolSize(c.CPUPercentage),
ChunkSize: c.ChunkSize,
}
}