KevinStephenson
Adding in weaviate code
b110593
raw
history blame
9.93 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package aggregator
import (
"sort"
"time"
"github.com/weaviate/weaviate/entities/aggregation"
)
type ShardCombiner struct{}
func NewShardCombiner() *ShardCombiner {
return &ShardCombiner{}
}
func (sc *ShardCombiner) Do(results []*aggregation.Result) *aggregation.Result {
allResultsAreNil := true
firstNonNilRes := 0
for i, res := range results {
if res == nil || len(res.Groups) < 1 {
continue
}
allResultsAreNil = false
firstNonNilRes = i
}
if allResultsAreNil {
return &aggregation.Result{}
}
if results[firstNonNilRes].Groups[0].GroupedBy == nil {
return sc.combineUngrouped(results)
}
return sc.combineGrouped(results)
}
func (sc *ShardCombiner) combineUngrouped(results []*aggregation.Result) *aggregation.Result {
combined := aggregation.Result{
Groups: make([]aggregation.Group, 1),
}
for _, shard := range results {
if len(shard.Groups) == 0 { // not every shard has results
continue
}
sc.mergeIntoCombinedGroupAtPos(combined.Groups, 0, shard.Groups[0])
}
sc.finalizeGroup(&combined.Groups[0])
return &combined
}
func (sc *ShardCombiner) combineGrouped(results []*aggregation.Result) *aggregation.Result {
combined := aggregation.Result{}
for _, shard := range results {
for _, shardGroup := range shard.Groups {
pos := getPosOfGroup(combined.Groups, shardGroup.GroupedBy.Value)
if pos < 0 {
combined.Groups = append(combined.Groups, shardGroup)
} else {
sc.mergeIntoCombinedGroupAtPos(combined.Groups, pos, shardGroup)
}
}
}
for i := range combined.Groups {
sc.finalizeGroup(&combined.Groups[i])
}
sort.Slice(combined.Groups, func(a, b int) bool {
return combined.Groups[a].Count > combined.Groups[b].Count
})
return &combined
}
func (sc *ShardCombiner) mergeIntoCombinedGroupAtPos(combinedGroups []aggregation.Group,
pos int, shardGroup aggregation.Group,
) {
combinedGroups[pos].Count += shardGroup.Count
for propName, prop := range shardGroup.Properties {
if combinedGroups[pos].Properties == nil {
combinedGroups[pos].Properties = map[string]aggregation.Property{}
}
combinedProp := combinedGroups[pos].Properties[propName]
combinedProp.Type = prop.Type
switch prop.Type {
case aggregation.PropertyTypeNumerical:
if combinedProp.NumericalAggregations == nil {
combinedProp.NumericalAggregations = map[string]interface{}{}
}
sc.mergeNumericalProp(
combinedProp.NumericalAggregations, prop.NumericalAggregations)
case aggregation.PropertyTypeDate:
if combinedProp.DateAggregations == nil {
combinedProp.DateAggregations = map[string]interface{}{}
}
sc.mergeDateProp(
combinedProp.DateAggregations, prop.DateAggregations)
case aggregation.PropertyTypeBoolean:
sc.mergeBooleanProp(
&combinedProp.BooleanAggregation, &prop.BooleanAggregation)
case aggregation.PropertyTypeText:
sc.mergeTextProp(
&combinedProp.TextAggregation, &prop.TextAggregation)
case aggregation.PropertyTypeReference:
sc.mergeRefProp(
&combinedProp.ReferenceAggregation, &prop.ReferenceAggregation)
default:
panic("unknown prop type: " + prop.Type)
}
combinedGroups[pos].Properties[propName] = combinedProp
}
}
func (sc *ShardCombiner) mergeDateProp(first, second map[string]interface{}) {
if len(second) == 0 {
return
}
// add all values from the second map to the first one. This is needed to compute median and mode correctly
for propType := range second {
switch propType {
case "_dateAggregator":
dateAggSource := second[propType].(*dateAggregator)
if dateAggCombined, ok := first[propType]; ok {
dateAggCombinedTyped := dateAggCombined.(*dateAggregator)
for _, pair := range dateAggSource.pairs {
for i := uint64(0); i < pair.count; i++ {
dateAggCombinedTyped.AddTimestamp(pair.value.rfc3339)
}
}
dateAggCombinedTyped.buildPairsFromCounts()
first[propType] = dateAggCombinedTyped
} else {
first[propType] = second[propType]
}
}
}
for propType, value := range second {
switch propType {
case "count":
if val, ok := first[propType]; ok {
first[propType] = val.(int64) + value.(int64)
} else {
first[propType] = value
}
case "mode":
dateAggCombined := first["_dateAggregator"].(*dateAggregator)
first[propType] = dateAggCombined.Mode()
case "median":
dateAggCombined := first["_dateAggregator"].(*dateAggregator)
first[propType] = dateAggCombined.Median()
case "minimum":
val, ok := first["minimum"]
if !ok {
first["minimum"] = value
} else {
source1Time, _ := time.Parse(time.RFC3339, val.(string))
source2Time, _ := time.Parse(time.RFC3339, value.(string))
if source2Time.Before(source1Time) {
first["minimum"] = value
}
}
case "maximum":
val, ok := first["maximum"]
if !ok {
first["maximum"] = value
} else {
source1Time, _ := time.Parse(time.RFC3339, val.(string))
source2Time, _ := time.Parse(time.RFC3339, value.(string))
if source2Time.After(source1Time) {
first["maximum"] = value
}
}
case "_dateAggregator":
continue
default:
panic("unknown map entry: " + propType)
}
}
}
func (sc *ShardCombiner) mergeNumericalProp(first, second map[string]interface{}) {
if len(second) == 0 {
return
}
// add all values from the second map to the first one. This is needed to compute median, mean and mode correctly
for propType := range second {
switch propType {
case "_numericalAggregator":
numAggSecondTyped := second[propType].(*numericalAggregator)
if numAggFirst, ok := first[propType]; ok {
numAggFirstTyped := numAggFirst.(*numericalAggregator)
for _, pair := range numAggSecondTyped.pairs {
for i := uint64(0); i < pair.count; i++ {
numAggFirstTyped.AddFloat64(pair.value)
}
}
numAggFirstTyped.buildPairsFromCounts()
first[propType] = numAggFirstTyped
} else {
first[propType] = second[propType]
}
}
}
for propType, value := range second {
switch propType {
case "count", "sum":
if val, ok := first[propType]; ok {
first[propType] = val.(float64) + value.(float64)
} else {
first[propType] = value
}
case "mode":
numAggFirst := first["_numericalAggregator"].(*numericalAggregator)
first[propType] = numAggFirst.Mode()
case "mean":
numAggFirst := first["_numericalAggregator"].(*numericalAggregator)
first[propType] = numAggFirst.Mean()
case "median":
numAggFirst := first["_numericalAggregator"].(*numericalAggregator)
first[propType] = numAggFirst.Median()
case "minimum":
if _, ok := first["minimum"]; !ok || value.(float64) < first["minimum"].(float64) {
first["minimum"] = value
}
case "maximum":
if _, ok := first["maximum"]; !ok || value.(float64) > first["maximum"].(float64) {
first["maximum"] = value
}
case "_numericalAggregator":
continue
default:
panic("unknown map entry: " + propType)
}
}
}
func (sc *ShardCombiner) finalizeDateProp(combined map[string]interface{}) {
delete(combined, "_dateAggregator")
}
func (sc *ShardCombiner) finalizeNumerical(combined map[string]interface{}) {
delete(combined, "_numericalAggregator")
}
func (sc *ShardCombiner) mergeBooleanProp(combined, source *aggregation.Boolean) {
combined.Count += source.Count
combined.TotalFalse += source.TotalFalse
combined.TotalTrue += source.TotalTrue
}
func (sc *ShardCombiner) finalizeBoolean(combined *aggregation.Boolean) {
combined.PercentageFalse = float64(combined.TotalFalse) / float64(combined.Count)
combined.PercentageTrue = float64(combined.TotalTrue) / float64(combined.Count)
}
func (sc *ShardCombiner) mergeTextProp(first, second *aggregation.Text) {
first.Count += second.Count
for _, textOcc := range second.Items {
pos := getPosOfTextOcc(first.Items, textOcc.Value)
if pos < 0 {
first.Items = append(first.Items, textOcc)
} else {
first.Items[pos].Occurs += textOcc.Occurs
}
}
}
func (sc *ShardCombiner) mergeRefProp(first, second *aggregation.Reference) {
first.PointingTo = append(first.PointingTo, second.PointingTo...)
}
func (sc *ShardCombiner) finalizeText(combined *aggregation.Text) {
sort.Slice(combined.Items, func(a, b int) bool {
return combined.Items[a].Occurs > combined.Items[b].Occurs
})
}
func getPosOfTextOcc(haystack []aggregation.TextOccurrence, needle string) int {
for i, elem := range haystack {
if elem.Value == needle {
return i
}
}
return -1
}
func (sc *ShardCombiner) finalizeGroup(group *aggregation.Group) {
for propName, prop := range group.Properties {
switch prop.Type {
case aggregation.PropertyTypeNumerical:
sc.finalizeNumerical(prop.NumericalAggregations)
case aggregation.PropertyTypeBoolean:
sc.finalizeBoolean(&prop.BooleanAggregation)
case aggregation.PropertyTypeText:
sc.finalizeText(&prop.TextAggregation)
case aggregation.PropertyTypeDate:
sc.finalizeDateProp(prop.DateAggregations)
case aggregation.PropertyTypeReference:
continue
default:
panic("Unknown prop type: " + prop.Type)
}
group.Properties[propName] = prop
}
}
func getPosOfGroup(haystack []aggregation.Group, needle interface{}) int {
for i, elem := range haystack {
if elem.GroupedBy.Value == needle {
return i
}
}
return -1
}