Spaces:
Running
Running
// _ _ | |
// __ _____ __ ___ ___ __ _| |_ ___ | |
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
// \ V V / __/ (_| |\ V /| | (_| | || __/ | |
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
// | |
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. | |
// | |
// CONTACT: [email protected] | |
// | |
package aggregator | |
import ( | |
"fmt" | |
"math" | |
"sort" | |
"time" | |
"github.com/pkg/errors" | |
"github.com/weaviate/weaviate/adapters/repos/db/inverted" | |
"github.com/weaviate/weaviate/entities/aggregation" | |
) | |
func addDateAggregations(prop *aggregation.Property, | |
aggs []aggregation.Aggregator, agg *dateAggregator, | |
) { | |
if prop.DateAggregations == nil { | |
prop.DateAggregations = map[string]interface{}{} | |
} | |
agg.buildPairsFromCounts() | |
// if there are no elements to aggregate over because a filter does not match anything, calculating median etc. makes | |
// no sense. Non-existent entries evaluate to nil with an interface{} map | |
if agg.count == 0 { | |
for _, entry := range aggs { | |
if entry == aggregation.CountAggregator { | |
prop.DateAggregations["count"] = int64(agg.count) | |
break | |
} | |
} | |
return | |
} | |
// when combining the results from different shards, we need the raw dates to recompute the mode and median. | |
// Therefore we add a reference later which needs to be cleared out before returning the results to a user | |
for _, aProp := range aggs { | |
switch aProp { | |
case aggregation.ModeAggregator, aggregation.MedianAggregator: | |
prop.DateAggregations["_dateAggregator"] = agg | |
} | |
} | |
for _, aProp := range aggs { | |
switch aProp { | |
case aggregation.MinimumAggregator: | |
prop.DateAggregations[aProp.String()] = agg.Min() | |
case aggregation.MaximumAggregator: | |
prop.DateAggregations[aProp.String()] = agg.Max() | |
case aggregation.ModeAggregator: | |
prop.DateAggregations[aProp.String()] = agg.Mode() | |
case aggregation.CountAggregator: | |
prop.DateAggregations[aProp.String()] = agg.Count() | |
case aggregation.MedianAggregator: | |
prop.DateAggregations[aProp.String()] = agg.Median() | |
default: | |
continue | |
} | |
} | |
} | |
type dateAggregator struct { | |
count uint64 | |
maxCount uint64 | |
min timestamp | |
max timestamp | |
mode timestamp | |
pairs []timestampCountPair // for row-based median calculation | |
valueCounter map[timestamp]uint64 // for individual median calculation | |
} | |
func newDateAggregator() *dateAggregator { | |
return &dateAggregator{ | |
min: timestamp{epochNano: math.MaxInt64}, | |
valueCounter: map[timestamp]uint64{}, | |
pairs: make([]timestampCountPair, 0), | |
} | |
} | |
// timestamp allows us to contain multiple representations of a datetime | |
// the nanosecs value is needed for the numerical comparisons, and the | |
// string value is what the user expects to see | |
type timestamp struct { | |
epochNano int64 | |
rfc3339 string | |
} | |
func newTimestamp(epochNano int64) timestamp { | |
return timestamp{ | |
epochNano: epochNano, | |
rfc3339: time.Unix(0, epochNano).UTC().Format(time.RFC3339Nano), | |
} | |
} | |
type timestampCountPair struct { | |
value timestamp | |
count uint64 | |
} | |
func (a *dateAggregator) AddTimestamp(rfc3339 string) error { | |
t, err := time.Parse(time.RFC3339Nano, rfc3339) | |
if err != nil { | |
return fmt.Errorf("failed to parse timestamp: %s", err) | |
} | |
ts := timestamp{ | |
epochNano: t.UnixNano(), | |
rfc3339: rfc3339, | |
} | |
return a.addRow(ts, 1) | |
} | |
func (a *dateAggregator) AddTimestampRow(b []byte, count uint64) error { | |
nsec, err := inverted.ParseLexicographicallySortableInt64(b) | |
if err != nil { | |
return errors.Wrap(err, "read int64") | |
} | |
ts := newTimestamp(nsec) | |
return a.addRow(ts, count) | |
} | |
func (a *dateAggregator) addRow(ts timestamp, count uint64) error { | |
if count == 0 { | |
// skip | |
return nil | |
} | |
a.count += count | |
if ts.epochNano < a.min.epochNano { | |
a.min = ts | |
} | |
if ts.epochNano > a.max.epochNano { | |
a.max = ts | |
} | |
currentCount := a.valueCounter[ts] | |
currentCount += count | |
a.valueCounter[ts] = currentCount | |
return nil | |
} | |
func (a *dateAggregator) Max() string { | |
return a.max.rfc3339 | |
} | |
func (a *dateAggregator) Min() string { | |
return a.min.rfc3339 | |
} | |
// Mode does not require preparation if build from rows, but requires a call of | |
// buildPairsFromCounts() if it was built using individual objects | |
func (a *dateAggregator) Mode() string { | |
return a.mode.rfc3339 | |
} | |
func (a *dateAggregator) Count() int64 { | |
return int64(a.count) | |
} | |
// Median does not require preparation if build from rows, but requires a call of | |
// buildPairsFromCounts() if it was built using individual objects | |
// | |
// Check the numericalAggregator.Median() for details about the calculation | |
func (a *dateAggregator) Median() string { | |
middleIndex := a.count / 2 | |
count := uint64(0) | |
for index, pair := range a.pairs { | |
count += pair.count | |
if a.count%2 == 1 && count > middleIndex { | |
return pair.value.rfc3339 // case a) | |
} else if a.count%2 == 0 { | |
if count == middleIndex { | |
MedianEpochNano := pair.value.epochNano + (a.pairs[index+1].value.epochNano-pair.value.epochNano)/2 | |
return time.Unix(0, MedianEpochNano).UTC().Format(time.RFC3339Nano) // case b2) | |
} else if count > middleIndex { | |
return pair.value.rfc3339 // case b1) | |
} | |
} | |
} | |
panic("Couldn't determine median. This should never happen. Did you add values and call buildRows before?") | |
} | |
// turns the value counter into a sorted list, as well as identifying the mode | |
func (a *dateAggregator) buildPairsFromCounts() { | |
a.pairs = a.pairs[:0] // clear out old values in case this function called more than once | |
for value, count := range a.valueCounter { | |
if count > a.maxCount { | |
a.maxCount = count | |
a.mode = value | |
} | |
a.pairs = append(a.pairs, timestampCountPair{value: value, count: count}) | |
} | |
sort.Slice(a.pairs, func(x, y int) bool { | |
return a.pairs[x].value.epochNano < a.pairs[y].value.epochNano | |
}) | |
} | |