Spaces:
Running
Running
File size: 6,989 Bytes
b110593 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 |
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package aggregator
import (
"math"
"sort"
"github.com/pkg/errors"
"github.com/weaviate/weaviate/adapters/repos/db/inverted"
"github.com/weaviate/weaviate/entities/aggregation"
)
func addNumericalAggregations(prop *aggregation.Property,
aggs []aggregation.Aggregator, agg *numericalAggregator,
) {
if prop.NumericalAggregations == nil {
prop.NumericalAggregations = map[string]interface{}{}
}
agg.buildPairsFromCounts()
// if there are no elements to aggregate over because a filter does not match anything, calculating mean etc. makes
// no sense. Non-existent entries evaluate to nil with an interface{} map
if agg.count == 0 {
for _, entry := range aggs {
if entry == aggregation.CountAggregator {
prop.NumericalAggregations["count"] = float64(agg.count)
break
}
}
return
}
// when combining the results from different shards, we need the raw numbers to recompute the mode, mean and median.
// Therefore we add a reference later which needs to be cleared out before returning the results to a user
loop:
for _, aProp := range aggs {
switch aProp {
case aggregation.ModeAggregator, aggregation.MedianAggregator, aggregation.MeanAggregator:
prop.NumericalAggregations["_numericalAggregator"] = agg
break loop
}
}
for _, aProp := range aggs {
switch aProp {
case aggregation.MeanAggregator:
prop.NumericalAggregations[aProp.String()] = agg.Mean()
case aggregation.MinimumAggregator:
prop.NumericalAggregations[aProp.String()] = agg.Min()
case aggregation.MaximumAggregator:
prop.NumericalAggregations[aProp.String()] = agg.Max()
case aggregation.MedianAggregator:
prop.NumericalAggregations[aProp.String()] = agg.Median()
case aggregation.ModeAggregator:
prop.NumericalAggregations[aProp.String()] = agg.Mode()
case aggregation.SumAggregator:
prop.NumericalAggregations[aProp.String()] = agg.Sum()
case aggregation.CountAggregator:
prop.NumericalAggregations[aProp.String()] = agg.Count()
default:
continue
}
}
}
func newNumericalAggregator() *numericalAggregator {
return &numericalAggregator{
min: math.MaxFloat64,
max: -math.MaxFloat64,
valueCounter: map[float64]uint64{},
pairs: make([]floatCountPair, 0),
}
}
type numericalAggregator struct {
count uint64
min float64
max float64
sum float64
maxCount uint64
mode float64
pairs []floatCountPair // for row-based median calculation
valueCounter map[float64]uint64 // for individual median calculation
}
type floatCountPair struct {
value float64
count uint64
}
func (a *numericalAggregator) AddFloat64(value float64) error {
return a.AddNumberRow(value, 1)
}
// turns the value counter into a sorted list, as well as identifying the mode. Must be called before calling median etc
func (a *numericalAggregator) buildPairsFromCounts() {
a.pairs = a.pairs[:0] // clear out old values in case this function called more than once
a.pairs = append(a.pairs, make([]floatCountPair, 0, len(a.valueCounter))...)
for value, count := range a.valueCounter {
// get one with higher count or lower value if counts are equal
if count > a.maxCount || (count == a.maxCount && value < a.mode) {
a.maxCount = count
a.mode = value
}
a.pairs = append(a.pairs, floatCountPair{value: value, count: count})
}
sort.Slice(a.pairs, func(x, y int) bool {
return a.pairs[x].value < a.pairs[y].value
})
}
func (a *numericalAggregator) AddFloat64Row(number []byte,
count uint64,
) error {
numberParsed, err := inverted.ParseLexicographicallySortableFloat64(number)
if err != nil {
return errors.Wrap(err, "read float64")
}
return a.AddNumberRow(numberParsed, count)
}
func (a *numericalAggregator) AddInt64Row(number []byte, count uint64) error {
numberParsed, err := inverted.ParseLexicographicallySortableInt64(number)
if err != nil {
return errors.Wrap(err, "read int64")
}
return a.AddNumberRow(float64(numberParsed), count)
}
func (a *numericalAggregator) AddNumberRow(number float64, count uint64) error {
if count == 0 {
// skip
return nil
}
a.count += count
a.sum += number * float64(count)
if number < a.min {
a.min = number
}
if number > a.max {
a.max = number
}
currentCount := a.valueCounter[number]
currentCount += count
a.valueCounter[number] = currentCount
return nil
}
func (a *numericalAggregator) Mean() float64 {
if a.count == 0 {
return 0
}
return a.sum / float64(a.count)
}
func (a *numericalAggregator) Max() float64 {
return a.max
}
func (a *numericalAggregator) Min() float64 {
return a.min
}
func (a *numericalAggregator) Sum() float64 {
return a.sum
}
func (a *numericalAggregator) Count() float64 {
return float64(a.count)
}
// Mode does not require preparation if build from rows, but requires a call of
// buildPairsFromCounts() if it was built using individual objects
func (a *numericalAggregator) Mode() float64 {
return a.mode
}
// Median does not require preparation if build from rows, but requires a call of
// buildPairsFromCounts() if it was built using individual objects. The call will panic
// if called without adding at least one element or without calling buildPairsFromCounts()
//
// since the pairs are read from an inverted index, which is in turn
// lexicographically sorted, we know that our pairs must also be sorted
//
// There are two cases:
// a) There is an uneven number of elements, then the median element is at index N/2
// b) There is an even number of elements, then the median element is (elem_(N/2) + elem_(N/2+1))/2.
//
// with two sub-cases:
// b1) element N/2 and N/2 + 1 are within the same pair, then the median is the value of this pair
// b2) element N/2 and N/2 are part of different pairs, then the average of these pairs is the median and the
// median value is not part of the collection itself
func (a *numericalAggregator) Median() float64 {
middleIndex := a.count / 2
count := uint64(0)
for index, pair := range a.pairs {
count += pair.count
if a.count%2 == 1 && count > middleIndex {
return pair.value // case a)
} else if a.count%2 == 0 {
if count == middleIndex {
return (pair.value + a.pairs[index+1].value) / 2 // case b2)
} else if count > middleIndex {
return pair.value // case b1)
}
}
}
panic("Couldn't determine median. This should never happen. Did you add values and call buildRows before?")
}
|