SemanticSearchPOC / usecases /sharding /remote_index_incoming.go
KevinStephenson
Adding in weaviate code
b110593
raw
history blame
11.5 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package sharding
import (
"context"
"fmt"
"io"
"github.com/go-openapi/strfmt"
"github.com/pkg/errors"
"github.com/weaviate/weaviate/entities/additional"
"github.com/weaviate/weaviate/entities/aggregation"
"github.com/weaviate/weaviate/entities/filters"
"github.com/weaviate/weaviate/entities/schema"
"github.com/weaviate/weaviate/entities/search"
"github.com/weaviate/weaviate/entities/searchparams"
"github.com/weaviate/weaviate/entities/storobj"
"github.com/weaviate/weaviate/usecases/objects"
"github.com/weaviate/weaviate/usecases/replica"
)
type RemoteIncomingRepo interface {
GetIndexForIncoming(className schema.ClassName) RemoteIndexIncomingRepo
}
type RemoteIndexIncomingRepo interface {
IncomingPutObject(ctx context.Context, shardName string,
obj *storobj.Object) error
IncomingBatchPutObjects(ctx context.Context, shardName string,
objs []*storobj.Object) []error
IncomingBatchAddReferences(ctx context.Context, shardName string,
refs objects.BatchReferences) []error
IncomingGetObject(ctx context.Context, shardName string, id strfmt.UUID,
selectProperties search.SelectProperties,
additional additional.Properties) (*storobj.Object, error)
IncomingExists(ctx context.Context, shardName string,
id strfmt.UUID) (bool, error)
IncomingDeleteObject(ctx context.Context, shardName string,
id strfmt.UUID) error
IncomingMergeObject(ctx context.Context, shardName string,
mergeDoc objects.MergeDocument) error
IncomingMultiGetObjects(ctx context.Context, shardName string,
ids []strfmt.UUID) ([]*storobj.Object, error)
IncomingSearch(ctx context.Context, shardName string,
vector []float32, distance float32, limit int, filters *filters.LocalFilter,
keywordRanking *searchparams.KeywordRanking, sort []filters.Sort,
cursor *filters.Cursor, groupBy *searchparams.GroupBy,
additional additional.Properties,
) ([]*storobj.Object, []float32, error)
IncomingAggregate(ctx context.Context, shardName string,
params aggregation.Params) (*aggregation.Result, error)
IncomingFindUUIDs(ctx context.Context, shardName string,
filters *filters.LocalFilter) ([]strfmt.UUID, error)
IncomingDeleteObjectBatch(ctx context.Context, shardName string,
uuids []strfmt.UUID, dryRun bool) objects.BatchSimpleObjects
IncomingGetShardQueueSize(ctx context.Context, shardName string) (int64, error)
IncomingGetShardStatus(ctx context.Context, shardName string) (string, error)
IncomingUpdateShardStatus(ctx context.Context, shardName, targetStatus string) error
IncomingOverwriteObjects(ctx context.Context, shard string,
vobjects []*objects.VObject) ([]replica.RepairResponse, error)
IncomingDigestObjects(ctx context.Context, shardName string,
ids []strfmt.UUID) (result []replica.RepairResponse, err error)
// Scale-Out Replication POC
IncomingFilePutter(ctx context.Context, shardName,
filePath string) (io.WriteCloser, error)
IncomingCreateShard(ctx context.Context, className string, shardName string) error
IncomingReinitShard(ctx context.Context, shardName string) error
}
type RemoteIndexIncoming struct {
repo RemoteIncomingRepo
}
func NewRemoteIndexIncoming(repo RemoteIncomingRepo) *RemoteIndexIncoming {
return &RemoteIndexIncoming{
repo: repo,
}
}
func (rii *RemoteIndexIncoming) PutObject(ctx context.Context, indexName,
shardName string, obj *storobj.Object,
) error {
index := rii.repo.GetIndexForIncoming(schema.ClassName(indexName))
if index == nil {
return errors.Errorf("local index %q not found", indexName)
}
return index.IncomingPutObject(ctx, shardName, obj)
}
func (rii *RemoteIndexIncoming) BatchPutObjects(ctx context.Context, indexName,
shardName string, objs []*storobj.Object,
) []error {
index := rii.repo.GetIndexForIncoming(schema.ClassName(indexName))
if index == nil {
return duplicateErr(errors.Errorf("local index %q not found", indexName),
len(objs))
}
return index.IncomingBatchPutObjects(ctx, shardName, objs)
}
func (rii *RemoteIndexIncoming) BatchAddReferences(ctx context.Context, indexName,
shardName string, refs objects.BatchReferences,
) []error {
index := rii.repo.GetIndexForIncoming(schema.ClassName(indexName))
if index == nil {
return duplicateErr(errors.Errorf("local index %q not found", indexName),
len(refs))
}
return index.IncomingBatchAddReferences(ctx, shardName, refs)
}
func (rii *RemoteIndexIncoming) GetObject(ctx context.Context, indexName,
shardName string, id strfmt.UUID, selectProperties search.SelectProperties,
additional additional.Properties,
) (*storobj.Object, error) {
index := rii.repo.GetIndexForIncoming(schema.ClassName(indexName))
if index == nil {
return nil, errors.Errorf("local index %q not found", indexName)
}
return index.IncomingGetObject(ctx, shardName, id, selectProperties, additional)
}
func (rii *RemoteIndexIncoming) Exists(ctx context.Context, indexName,
shardName string, id strfmt.UUID,
) (bool, error) {
index := rii.repo.GetIndexForIncoming(schema.ClassName(indexName))
if index == nil {
return false, errors.Errorf("local index %q not found", indexName)
}
return index.IncomingExists(ctx, shardName, id)
}
func (rii *RemoteIndexIncoming) DeleteObject(ctx context.Context, indexName,
shardName string, id strfmt.UUID,
) error {
index := rii.repo.GetIndexForIncoming(schema.ClassName(indexName))
if index == nil {
return errors.Errorf("local index %q not found", indexName)
}
return index.IncomingDeleteObject(ctx, shardName, id)
}
func (rii *RemoteIndexIncoming) MergeObject(ctx context.Context, indexName,
shardName string, mergeDoc objects.MergeDocument,
) error {
index := rii.repo.GetIndexForIncoming(schema.ClassName(indexName))
if index == nil {
return errors.Errorf("local index %q not found", indexName)
}
return index.IncomingMergeObject(ctx, shardName, mergeDoc)
}
func (rii *RemoteIndexIncoming) MultiGetObjects(ctx context.Context, indexName,
shardName string, ids []strfmt.UUID,
) ([]*storobj.Object, error) {
index := rii.repo.GetIndexForIncoming(schema.ClassName(indexName))
if index == nil {
return nil, errors.Errorf("local index %q not found", indexName)
}
return index.IncomingMultiGetObjects(ctx, shardName, ids)
}
func (rii *RemoteIndexIncoming) Search(ctx context.Context, indexName, shardName string,
vector []float32, distance float32, limit int, filters *filters.LocalFilter,
keywordRanking *searchparams.KeywordRanking, sort []filters.Sort, cursor *filters.Cursor,
groupBy *searchparams.GroupBy, additional additional.Properties,
) ([]*storobj.Object, []float32, error) {
index := rii.repo.GetIndexForIncoming(schema.ClassName(indexName))
if index == nil {
return nil, nil, errors.Errorf("local index %q not found", indexName)
}
return index.IncomingSearch(
ctx, shardName, vector, distance, limit, filters, keywordRanking, sort, cursor, groupBy, additional)
}
func (rii *RemoteIndexIncoming) Aggregate(ctx context.Context, indexName, shardName string,
params aggregation.Params,
) (*aggregation.Result, error) {
index := rii.repo.GetIndexForIncoming(schema.ClassName(indexName))
if index == nil {
return nil, errors.Errorf("local index %q not found", indexName)
}
return index.IncomingAggregate(ctx, shardName, params)
}
func (rii *RemoteIndexIncoming) FindUUIDs(ctx context.Context, indexName, shardName string,
filters *filters.LocalFilter,
) ([]strfmt.UUID, error) {
index := rii.repo.GetIndexForIncoming(schema.ClassName(indexName))
if index == nil {
return nil, errors.Errorf("local index %q not found", indexName)
}
return index.IncomingFindUUIDs(ctx, shardName, filters)
}
func (rii *RemoteIndexIncoming) DeleteObjectBatch(ctx context.Context, indexName, shardName string,
uuids []strfmt.UUID, dryRun bool,
) objects.BatchSimpleObjects {
index := rii.repo.GetIndexForIncoming(schema.ClassName(indexName))
if index == nil {
err := errors.Errorf("local index %q not found", indexName)
return objects.BatchSimpleObjects{objects.BatchSimpleObject{Err: err}}
}
return index.IncomingDeleteObjectBatch(ctx, shardName, uuids, dryRun)
}
func (rii *RemoteIndexIncoming) GetShardQueueSize(ctx context.Context,
indexName, shardName string,
) (int64, error) {
index := rii.repo.GetIndexForIncoming(schema.ClassName(indexName))
if index == nil {
return 0, errors.Errorf("local index %q not found", indexName)
}
return index.IncomingGetShardQueueSize(ctx, shardName)
}
func (rii *RemoteIndexIncoming) GetShardStatus(ctx context.Context,
indexName, shardName string,
) (string, error) {
index := rii.repo.GetIndexForIncoming(schema.ClassName(indexName))
if index == nil {
return "", errors.Errorf("local index %q not found", indexName)
}
return index.IncomingGetShardStatus(ctx, shardName)
}
func (rii *RemoteIndexIncoming) UpdateShardStatus(ctx context.Context,
indexName, shardName, targetStatus string,
) error {
index := rii.repo.GetIndexForIncoming(schema.ClassName(indexName))
if index == nil {
return errors.Errorf("local index %q not found", indexName)
}
return index.IncomingUpdateShardStatus(ctx, shardName, targetStatus)
}
func (rii *RemoteIndexIncoming) FilePutter(ctx context.Context,
indexName, shardName, filePath string,
) (io.WriteCloser, error) {
index := rii.repo.GetIndexForIncoming(schema.ClassName(indexName))
if index == nil {
return nil, errors.Errorf("local index %q not found", indexName)
}
return index.IncomingFilePutter(ctx, shardName, filePath)
}
func (rii *RemoteIndexIncoming) CreateShard(ctx context.Context,
indexName, shardName string,
) error {
index := rii.repo.GetIndexForIncoming(schema.ClassName(indexName))
if index == nil {
return errors.Errorf("local index %q not found", indexName)
}
return index.IncomingCreateShard(ctx, indexName, shardName)
}
func (rii *RemoteIndexIncoming) ReInitShard(ctx context.Context,
indexName, shardName string,
) error {
index := rii.repo.GetIndexForIncoming(schema.ClassName(indexName))
if index == nil {
return errors.Errorf("local index %q not found", indexName)
}
return index.IncomingReinitShard(ctx, shardName)
}
func (rii *RemoteIndexIncoming) OverwriteObjects(ctx context.Context,
indexName, shardName string, vobjects []*objects.VObject,
) ([]replica.RepairResponse, error) {
index := rii.repo.GetIndexForIncoming(schema.ClassName(indexName))
if index == nil {
return nil, fmt.Errorf("local index %q not found", indexName)
}
return index.IncomingOverwriteObjects(ctx, shardName, vobjects)
}
func (rii *RemoteIndexIncoming) DigestObjects(ctx context.Context,
indexName, shardName string, ids []strfmt.UUID,
) ([]replica.RepairResponse, error) {
index := rii.repo.GetIndexForIncoming(schema.ClassName(indexName))
if index == nil {
return nil, fmt.Errorf("local index %q not found", indexName)
}
return index.IncomingDigestObjects(ctx, shardName, ids)
}