KevinStephenson
Adding in weaviate code
b110593
raw
history blame
2.95 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package modulestorage
import (
"fmt"
"os"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/weaviate/weaviate/entities/moduletools"
bolt "go.etcd.io/bbolt"
)
type Repo struct {
logger logrus.FieldLogger
baseDir string
db *bolt.DB
}
func NewRepo(baseDir string, logger logrus.FieldLogger) (*Repo, error) {
r := &Repo{
baseDir: baseDir,
logger: logger,
}
err := r.init()
return r, err
}
func (r *Repo) DBPath() string {
return fmt.Sprintf("%s/modules.db", r.baseDir)
}
func (r *Repo) DataPath() string {
return r.baseDir
}
func (r *Repo) init() error {
if err := os.MkdirAll(r.baseDir, 0o777); err != nil {
return errors.Wrapf(err, "create root path directory at %s", r.baseDir)
}
boltdb, err := bolt.Open(r.DBPath(), 0o600, nil)
if err != nil {
return errors.Wrapf(err, "open bolt at %s", r.DBPath())
}
r.db = boltdb
return nil
}
type storageBucket struct {
bucketKey []byte
repo *Repo
}
func (r *Repo) Storage(bucketName string) (moduletools.Storage, error) {
storage := &storageBucket{
bucketKey: []byte(bucketName),
repo: r,
}
err := storage.init()
return storage, err
}
func (s *storageBucket) init() error {
return s.repo.db.Update(func(tx *bolt.Tx) error {
if _, err := tx.CreateBucketIfNotExists(s.bucketKey); err != nil {
return errors.Wrapf(err, "create module storage bucket '%s'",
string(s.bucketKey))
}
return nil
})
}
func (s *storageBucket) Put(key, value []byte) error {
return s.repo.db.Batch(func(tx *bolt.Tx) error {
b := tx.Bucket(s.bucketKey)
if b == nil {
return errors.Errorf("no bucket for key %s found", string(s.bucketKey))
}
if err := b.Put(key, value); err != nil {
return errors.Wrapf(err, "put value for key %s", string(key))
}
return nil
})
}
func (s *storageBucket) Get(key []byte) ([]byte, error) {
var out []byte
err := s.repo.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(s.bucketKey)
if b == nil {
return errors.Errorf("no bucket for key %s found", string(s.bucketKey))
}
out = b.Get(key)
return nil
})
return out, err
}
func (s *storageBucket) Scan(scan moduletools.ScanFn) error {
err := s.repo.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(s.bucketKey)
c := b.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
ok, err := scan(k, v)
if err != nil {
return errors.Wrapf(err, "read item %q", string(k))
}
if !ok {
break
}
}
return nil
})
return err
}