KevinStephenson
Adding in weaviate code
b110593
raw
history blame
6.99 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package aggregator
import (
"math"
"sort"
"github.com/pkg/errors"
"github.com/weaviate/weaviate/adapters/repos/db/inverted"
"github.com/weaviate/weaviate/entities/aggregation"
)
func addNumericalAggregations(prop *aggregation.Property,
aggs []aggregation.Aggregator, agg *numericalAggregator,
) {
if prop.NumericalAggregations == nil {
prop.NumericalAggregations = map[string]interface{}{}
}
agg.buildPairsFromCounts()
// if there are no elements to aggregate over because a filter does not match anything, calculating mean etc. makes
// no sense. Non-existent entries evaluate to nil with an interface{} map
if agg.count == 0 {
for _, entry := range aggs {
if entry == aggregation.CountAggregator {
prop.NumericalAggregations["count"] = float64(agg.count)
break
}
}
return
}
// when combining the results from different shards, we need the raw numbers to recompute the mode, mean and median.
// Therefore we add a reference later which needs to be cleared out before returning the results to a user
loop:
for _, aProp := range aggs {
switch aProp {
case aggregation.ModeAggregator, aggregation.MedianAggregator, aggregation.MeanAggregator:
prop.NumericalAggregations["_numericalAggregator"] = agg
break loop
}
}
for _, aProp := range aggs {
switch aProp {
case aggregation.MeanAggregator:
prop.NumericalAggregations[aProp.String()] = agg.Mean()
case aggregation.MinimumAggregator:
prop.NumericalAggregations[aProp.String()] = agg.Min()
case aggregation.MaximumAggregator:
prop.NumericalAggregations[aProp.String()] = agg.Max()
case aggregation.MedianAggregator:
prop.NumericalAggregations[aProp.String()] = agg.Median()
case aggregation.ModeAggregator:
prop.NumericalAggregations[aProp.String()] = agg.Mode()
case aggregation.SumAggregator:
prop.NumericalAggregations[aProp.String()] = agg.Sum()
case aggregation.CountAggregator:
prop.NumericalAggregations[aProp.String()] = agg.Count()
default:
continue
}
}
}
func newNumericalAggregator() *numericalAggregator {
return &numericalAggregator{
min: math.MaxFloat64,
max: -math.MaxFloat64,
valueCounter: map[float64]uint64{},
pairs: make([]floatCountPair, 0),
}
}
type numericalAggregator struct {
count uint64
min float64
max float64
sum float64
maxCount uint64
mode float64
pairs []floatCountPair // for row-based median calculation
valueCounter map[float64]uint64 // for individual median calculation
}
type floatCountPair struct {
value float64
count uint64
}
func (a *numericalAggregator) AddFloat64(value float64) error {
return a.AddNumberRow(value, 1)
}
// turns the value counter into a sorted list, as well as identifying the mode. Must be called before calling median etc
func (a *numericalAggregator) buildPairsFromCounts() {
a.pairs = a.pairs[:0] // clear out old values in case this function called more than once
a.pairs = append(a.pairs, make([]floatCountPair, 0, len(a.valueCounter))...)
for value, count := range a.valueCounter {
// get one with higher count or lower value if counts are equal
if count > a.maxCount || (count == a.maxCount && value < a.mode) {
a.maxCount = count
a.mode = value
}
a.pairs = append(a.pairs, floatCountPair{value: value, count: count})
}
sort.Slice(a.pairs, func(x, y int) bool {
return a.pairs[x].value < a.pairs[y].value
})
}
func (a *numericalAggregator) AddFloat64Row(number []byte,
count uint64,
) error {
numberParsed, err := inverted.ParseLexicographicallySortableFloat64(number)
if err != nil {
return errors.Wrap(err, "read float64")
}
return a.AddNumberRow(numberParsed, count)
}
func (a *numericalAggregator) AddInt64Row(number []byte, count uint64) error {
numberParsed, err := inverted.ParseLexicographicallySortableInt64(number)
if err != nil {
return errors.Wrap(err, "read int64")
}
return a.AddNumberRow(float64(numberParsed), count)
}
func (a *numericalAggregator) AddNumberRow(number float64, count uint64) error {
if count == 0 {
// skip
return nil
}
a.count += count
a.sum += number * float64(count)
if number < a.min {
a.min = number
}
if number > a.max {
a.max = number
}
currentCount := a.valueCounter[number]
currentCount += count
a.valueCounter[number] = currentCount
return nil
}
func (a *numericalAggregator) Mean() float64 {
if a.count == 0 {
return 0
}
return a.sum / float64(a.count)
}
func (a *numericalAggregator) Max() float64 {
return a.max
}
func (a *numericalAggregator) Min() float64 {
return a.min
}
func (a *numericalAggregator) Sum() float64 {
return a.sum
}
func (a *numericalAggregator) Count() float64 {
return float64(a.count)
}
// Mode does not require preparation if build from rows, but requires a call of
// buildPairsFromCounts() if it was built using individual objects
func (a *numericalAggregator) Mode() float64 {
return a.mode
}
// Median does not require preparation if build from rows, but requires a call of
// buildPairsFromCounts() if it was built using individual objects. The call will panic
// if called without adding at least one element or without calling buildPairsFromCounts()
//
// since the pairs are read from an inverted index, which is in turn
// lexicographically sorted, we know that our pairs must also be sorted
//
// There are two cases:
// a) There is an uneven number of elements, then the median element is at index N/2
// b) There is an even number of elements, then the median element is (elem_(N/2) + elem_(N/2+1))/2.
//
// with two sub-cases:
// b1) element N/2 and N/2 + 1 are within the same pair, then the median is the value of this pair
// b2) element N/2 and N/2 are part of different pairs, then the average of these pairs is the median and the
// median value is not part of the collection itself
func (a *numericalAggregator) Median() float64 {
middleIndex := a.count / 2
count := uint64(0)
for index, pair := range a.pairs {
count += pair.count
if a.count%2 == 1 && count > middleIndex {
return pair.value // case a)
} else if a.count%2 == 0 {
if count == middleIndex {
return (pair.value + a.pairs[index+1].value) / 2 // case b2)
} else if count > middleIndex {
return pair.value // case b1)
}
}
}
panic("Couldn't determine median. This should never happen. Did you add values and call buildRows before?")
}