KevinStephenson
Adding in weaviate code
b110593
raw
history blame
9.7 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
//go:build integrationTest
// +build integrationTest
package clusterintegrationtest
import (
"context"
"encoding/json"
"fmt"
"math"
"math/rand"
"sort"
"testing"
"time"
"github.com/go-openapi/strfmt"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"github.com/weaviate/weaviate/adapters/repos/db"
"github.com/weaviate/weaviate/adapters/repos/db/vector/hnsw/distancer"
"github.com/weaviate/weaviate/entities/additional"
"github.com/weaviate/weaviate/entities/models"
"github.com/weaviate/weaviate/entities/schema"
"github.com/weaviate/weaviate/entities/schema/crossref"
enthnsw "github.com/weaviate/weaviate/entities/vectorindex/hnsw"
"github.com/weaviate/weaviate/usecases/objects"
"github.com/weaviate/weaviate/usecases/sharding"
)
func getRandomSeed() *rand.Rand {
return rand.New(rand.NewSource(time.Now().UnixNano()))
}
func setupDirectory(t *testing.T) string {
dirName := t.TempDir()
return dirName
}
func dataAsBatch(data []*models.Object) objects.BatchObjects {
batchObjs := make(objects.BatchObjects, len(data))
for i := range data {
batchObjs[i] = objects.BatchObject{
OriginalIndex: i,
Err: nil,
Object: data[i],
UUID: data[i].ID,
Vector: data[i].Vector,
}
}
return batchObjs
}
func dataAsBatchWithProps(data []*models.Object, props []string) objects.BatchObjects {
batchObjs := make(objects.BatchObjects, len(data))
for i := range data {
batchObjs[i] = objects.BatchObject{
OriginalIndex: i,
Err: nil,
Object: copyObjectWithProp(data[i], props),
UUID: data[i].ID,
Vector: data[i].Vector,
}
}
return batchObjs
}
// copyObjectWithProp is not a 100% copy. It may still contain the same
// pointers in some properties, it does however guarantee that it does not
// alter the existing input - this guarantee is lost, if you modify the output
func copyObjectWithProp(in *models.Object, propsToCopy []string) *models.Object {
out := &models.Object{}
out.Additional = in.Additional
out.Class = in.Class
out.Vector = in.Vector
out.CreationTimeUnix = in.CreationTimeUnix
out.LastUpdateTimeUnix = in.LastUpdateTimeUnix
out.ID = in.ID
props := map[string]interface{}{}
for _, propName := range propsToCopy {
props[propName] = in.Properties.(map[string]interface{})[propName]
}
out.Properties = props
return out
}
func multiShardState(nodeCount int) *sharding.State {
config, err := sharding.ParseConfig(map[string]interface{}{
"desiredCount": json.Number(fmt.Sprintf("%d", nodeCount)),
}, 1)
if err != nil {
panic(err)
}
nodeList := make([]string, nodeCount)
for i := range nodeList {
nodeList[i] = fmt.Sprintf("node-%d", i)
}
s, err := sharding.InitState("multi-shard-test-index", config,
fakeNodes{nodeList}, 1, false)
if err != nil {
panic(err)
}
return s
}
func class() *models.Class {
cfg := enthnsw.NewDefaultUserConfig()
cfg.EF = 500
return &models.Class{
Class: distributedClass,
VectorIndexConfig: cfg,
InvertedIndexConfig: invertedConfig(),
Properties: []*models.Property{
{
Name: "description",
DataType: schema.DataTypeText.PropString(),
Tokenization: models.PropertyTokenizationWord,
},
{
Name: "other_property",
DataType: schema.DataTypeText.PropString(),
Tokenization: models.PropertyTokenizationWord,
},
{
Name: "date_property",
DataType: schema.DataTypeDate.PropString(),
},
{
Name: "date_array_property",
DataType: schema.DataTypeDateArray.PropString(),
},
{
Name: "int_property",
DataType: schema.DataTypeInt.PropString(),
},
{
Name: "phone_property",
DataType: schema.DataTypePhoneNumber.PropString(),
},
},
}
}
func secondClassWithRef() *models.Class {
cfg := enthnsw.NewDefaultUserConfig()
cfg.EF = 500
return &models.Class{
Class: "SecondDistributed",
VectorIndexConfig: cfg,
InvertedIndexConfig: invertedConfig(),
Properties: []*models.Property{
{
Name: "description",
DataType: []string{string(schema.DataTypeText)},
},
{
Name: "toFirst",
DataType: []string{distributedClass},
},
},
}
}
func invertedConfig() *models.InvertedIndexConfig {
return &models.InvertedIndexConfig{
CleanupIntervalSeconds: 60,
}
}
func exampleData(size int) []*models.Object {
out := make([]*models.Object, size)
for i := range out {
vec := make([]float32, vectorDims)
for i := range vec {
vec[i] = rand.Float32()
}
timestamp := time.Unix(0, 0).Add(time.Duration(i) * time.Hour)
phoneNumber := uint64(1000000 + rand.Intn(10000))
out[i] = &models.Object{
Class: distributedClass,
ID: strfmt.UUID(uuid.New().String()),
Properties: map[string]interface{}{
"description": fmt.Sprintf("object-%d", i),
"date_property": timestamp,
"date_array_property": []interface{}{timestamp},
"int_property": rand.Intn(1000),
"phone_property": &models.PhoneNumber{
CountryCode: 49,
DefaultCountry: "DE",
Input: fmt.Sprintf("0171 %d", phoneNumber),
Valid: true,
InternationalFormatted: fmt.Sprintf("+49 171 %d", phoneNumber),
National: phoneNumber,
NationalFormatted: fmt.Sprintf("0171 %d", phoneNumber),
},
},
Vector: vec,
}
}
return out
}
func exampleDataWithRefs(size int, refCount int, targetObjs []*models.Object) []*models.Object {
out := make([]*models.Object, size)
for i := range out {
vec := make([]float32, vectorDims)
for i := range vec {
vec[i] = rand.Float32()
}
refs := make(models.MultipleRef, refCount)
for i := range refs {
randomTarget := targetObjs[rand.Intn(len(targetObjs))]
refs[i] = crossref.New("localhost", distributedClass, randomTarget.ID).SingleRef()
}
out[i] = &models.Object{
Class: "SecondDistributed",
ID: strfmt.UUID(uuid.New().String()),
Properties: map[string]interface{}{
"description": fmt.Sprintf("second-object-%d", i),
"toFirst": refs,
},
Vector: vec,
}
}
return out
}
func bruteForceObjectsByQuery(objs []*models.Object,
query []float32,
) []*models.Object {
type distanceAndObj struct {
distance float32
obj *models.Object
}
distProv := distancer.NewCosineDistanceProvider()
distances := make([]distanceAndObj, len(objs))
for i := range objs {
dist, _, _ := distProv.SingleDist(normalize(query), normalize(objs[i].Vector))
distances[i] = distanceAndObj{
distance: dist,
obj: objs[i],
}
}
sort.Slice(distances, func(a, b int) bool {
return distances[a].distance < distances[b].distance
})
out := make([]*models.Object, len(objs))
for i := range out {
out[i] = distances[i].obj
}
return out
}
func normalize(v []float32) []float32 {
var norm float32
for i := range v {
norm += v[i] * v[i]
}
norm = float32(math.Sqrt(float64(norm)))
for i := range v {
v[i] = v[i] / norm
}
return v
}
func manuallyResolveRef(t *testing.T, obj *models.Object,
possibleTargets []*models.Object, localPropName,
referencedPropName string,
repo *db.DB,
) []map[string]interface{} {
beacons := obj.Properties.(map[string]interface{})[localPropName].(models.MultipleRef)
out := make([]map[string]interface{}, len(beacons))
for i, ref := range beacons {
parsed, err := crossref.Parse(ref.Beacon.String())
require.Nil(t, err)
target := findId(possibleTargets, parsed.TargetID)
require.NotNil(t, target, "target not found")
if referencedPropName == "vector" {
// find referenced object to get his actual vector from DB
require.NotNil(t, repo)
res, err := repo.Object(context.Background(), parsed.Class, parsed.TargetID,
nil, additional.Properties{Vector: true}, nil, "")
require.Nil(t, err)
require.NotNil(t, res)
out[i] = map[string]interface{}{
referencedPropName: res.Vector,
}
} else {
out[i] = map[string]interface{}{
referencedPropName: target.Properties.(map[string]interface{})[referencedPropName],
}
}
}
return out
}
func findId(list []*models.Object, id strfmt.UUID) *models.Object {
for _, obj := range list {
if obj.ID == id {
return obj
}
}
return nil
}
func refsAsBatch(in []*models.Object, propName string) objects.BatchReferences {
out := objects.BatchReferences{}
originalIndex := 0
for _, obj := range in {
beacons := obj.Properties.(map[string]interface{})[propName].(models.MultipleRef)
current := make(objects.BatchReferences, len(beacons))
for i, beacon := range beacons {
to, err := crossref.Parse(beacon.Beacon.String())
if err != nil {
panic(err)
}
current[i] = objects.BatchReference{
OriginalIndex: originalIndex,
To: to,
From: crossref.NewSource(schema.ClassName(obj.Class),
schema.PropertyName(propName), obj.ID),
}
originalIndex++
}
out = append(out, current...)
}
return out
}