Spaces:
Running
Running
// _ _ | |
// __ _____ __ ___ ___ __ _| |_ ___ | |
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
// \ V V / __/ (_| |\ V /| | (_| | || __/ | |
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
// | |
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. | |
// | |
// CONTACT: [email protected] | |
// | |
package aggregator | |
import ( | |
"math" | |
"sort" | |
"github.com/pkg/errors" | |
"github.com/weaviate/weaviate/adapters/repos/db/inverted" | |
"github.com/weaviate/weaviate/entities/aggregation" | |
) | |
func addNumericalAggregations(prop *aggregation.Property, | |
aggs []aggregation.Aggregator, agg *numericalAggregator, | |
) { | |
if prop.NumericalAggregations == nil { | |
prop.NumericalAggregations = map[string]interface{}{} | |
} | |
agg.buildPairsFromCounts() | |
// if there are no elements to aggregate over because a filter does not match anything, calculating mean etc. makes | |
// no sense. Non-existent entries evaluate to nil with an interface{} map | |
if agg.count == 0 { | |
for _, entry := range aggs { | |
if entry == aggregation.CountAggregator { | |
prop.NumericalAggregations["count"] = float64(agg.count) | |
break | |
} | |
} | |
return | |
} | |
// when combining the results from different shards, we need the raw numbers to recompute the mode, mean and median. | |
// Therefore we add a reference later which needs to be cleared out before returning the results to a user | |
loop: | |
for _, aProp := range aggs { | |
switch aProp { | |
case aggregation.ModeAggregator, aggregation.MedianAggregator, aggregation.MeanAggregator: | |
prop.NumericalAggregations["_numericalAggregator"] = agg | |
break loop | |
} | |
} | |
for _, aProp := range aggs { | |
switch aProp { | |
case aggregation.MeanAggregator: | |
prop.NumericalAggregations[aProp.String()] = agg.Mean() | |
case aggregation.MinimumAggregator: | |
prop.NumericalAggregations[aProp.String()] = agg.Min() | |
case aggregation.MaximumAggregator: | |
prop.NumericalAggregations[aProp.String()] = agg.Max() | |
case aggregation.MedianAggregator: | |
prop.NumericalAggregations[aProp.String()] = agg.Median() | |
case aggregation.ModeAggregator: | |
prop.NumericalAggregations[aProp.String()] = agg.Mode() | |
case aggregation.SumAggregator: | |
prop.NumericalAggregations[aProp.String()] = agg.Sum() | |
case aggregation.CountAggregator: | |
prop.NumericalAggregations[aProp.String()] = agg.Count() | |
default: | |
continue | |
} | |
} | |
} | |
func newNumericalAggregator() *numericalAggregator { | |
return &numericalAggregator{ | |
min: math.MaxFloat64, | |
max: -math.MaxFloat64, | |
valueCounter: map[float64]uint64{}, | |
pairs: make([]floatCountPair, 0), | |
} | |
} | |
type numericalAggregator struct { | |
count uint64 | |
min float64 | |
max float64 | |
sum float64 | |
maxCount uint64 | |
mode float64 | |
pairs []floatCountPair // for row-based median calculation | |
valueCounter map[float64]uint64 // for individual median calculation | |
} | |
type floatCountPair struct { | |
value float64 | |
count uint64 | |
} | |
func (a *numericalAggregator) AddFloat64(value float64) error { | |
return a.AddNumberRow(value, 1) | |
} | |
// turns the value counter into a sorted list, as well as identifying the mode. Must be called before calling median etc | |
func (a *numericalAggregator) buildPairsFromCounts() { | |
a.pairs = a.pairs[:0] // clear out old values in case this function called more than once | |
a.pairs = append(a.pairs, make([]floatCountPair, 0, len(a.valueCounter))...) | |
for value, count := range a.valueCounter { | |
// get one with higher count or lower value if counts are equal | |
if count > a.maxCount || (count == a.maxCount && value < a.mode) { | |
a.maxCount = count | |
a.mode = value | |
} | |
a.pairs = append(a.pairs, floatCountPair{value: value, count: count}) | |
} | |
sort.Slice(a.pairs, func(x, y int) bool { | |
return a.pairs[x].value < a.pairs[y].value | |
}) | |
} | |
func (a *numericalAggregator) AddFloat64Row(number []byte, | |
count uint64, | |
) error { | |
numberParsed, err := inverted.ParseLexicographicallySortableFloat64(number) | |
if err != nil { | |
return errors.Wrap(err, "read float64") | |
} | |
return a.AddNumberRow(numberParsed, count) | |
} | |
func (a *numericalAggregator) AddInt64Row(number []byte, count uint64) error { | |
numberParsed, err := inverted.ParseLexicographicallySortableInt64(number) | |
if err != nil { | |
return errors.Wrap(err, "read int64") | |
} | |
return a.AddNumberRow(float64(numberParsed), count) | |
} | |
func (a *numericalAggregator) AddNumberRow(number float64, count uint64) error { | |
if count == 0 { | |
// skip | |
return nil | |
} | |
a.count += count | |
a.sum += number * float64(count) | |
if number < a.min { | |
a.min = number | |
} | |
if number > a.max { | |
a.max = number | |
} | |
currentCount := a.valueCounter[number] | |
currentCount += count | |
a.valueCounter[number] = currentCount | |
return nil | |
} | |
func (a *numericalAggregator) Mean() float64 { | |
if a.count == 0 { | |
return 0 | |
} | |
return a.sum / float64(a.count) | |
} | |
func (a *numericalAggregator) Max() float64 { | |
return a.max | |
} | |
func (a *numericalAggregator) Min() float64 { | |
return a.min | |
} | |
func (a *numericalAggregator) Sum() float64 { | |
return a.sum | |
} | |
func (a *numericalAggregator) Count() float64 { | |
return float64(a.count) | |
} | |
// Mode does not require preparation if build from rows, but requires a call of | |
// buildPairsFromCounts() if it was built using individual objects | |
func (a *numericalAggregator) Mode() float64 { | |
return a.mode | |
} | |
// Median does not require preparation if build from rows, but requires a call of | |
// buildPairsFromCounts() if it was built using individual objects. The call will panic | |
// if called without adding at least one element or without calling buildPairsFromCounts() | |
// | |
// since the pairs are read from an inverted index, which is in turn | |
// lexicographically sorted, we know that our pairs must also be sorted | |
// | |
// There are two cases: | |
// a) There is an uneven number of elements, then the median element is at index N/2 | |
// b) There is an even number of elements, then the median element is (elem_(N/2) + elem_(N/2+1))/2. | |
// | |
// with two sub-cases: | |
// b1) element N/2 and N/2 + 1 are within the same pair, then the median is the value of this pair | |
// b2) element N/2 and N/2 are part of different pairs, then the average of these pairs is the median and the | |
// median value is not part of the collection itself | |
func (a *numericalAggregator) Median() float64 { | |
middleIndex := a.count / 2 | |
count := uint64(0) | |
for index, pair := range a.pairs { | |
count += pair.count | |
if a.count%2 == 1 && count > middleIndex { | |
return pair.value // case a) | |
} else if a.count%2 == 0 { | |
if count == middleIndex { | |
return (pair.value + a.pairs[index+1].value) / 2 // case b2) | |
} else if count > middleIndex { | |
return pair.value // case b1) | |
} | |
} | |
} | |
panic("Couldn't determine median. This should never happen. Did you add values and call buildRows before?") | |
} | |