Spaces:
Running
Running
File size: 5,993 Bytes
b110593 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 |
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package aggregator
import (
"fmt"
"math"
"sort"
"time"
"github.com/pkg/errors"
"github.com/weaviate/weaviate/adapters/repos/db/inverted"
"github.com/weaviate/weaviate/entities/aggregation"
)
func addDateAggregations(prop *aggregation.Property,
aggs []aggregation.Aggregator, agg *dateAggregator,
) {
if prop.DateAggregations == nil {
prop.DateAggregations = map[string]interface{}{}
}
agg.buildPairsFromCounts()
// if there are no elements to aggregate over because a filter does not match anything, calculating median etc. makes
// no sense. Non-existent entries evaluate to nil with an interface{} map
if agg.count == 0 {
for _, entry := range aggs {
if entry == aggregation.CountAggregator {
prop.DateAggregations["count"] = int64(agg.count)
break
}
}
return
}
// when combining the results from different shards, we need the raw dates to recompute the mode and median.
// Therefore we add a reference later which needs to be cleared out before returning the results to a user
for _, aProp := range aggs {
switch aProp {
case aggregation.ModeAggregator, aggregation.MedianAggregator:
prop.DateAggregations["_dateAggregator"] = agg
}
}
for _, aProp := range aggs {
switch aProp {
case aggregation.MinimumAggregator:
prop.DateAggregations[aProp.String()] = agg.Min()
case aggregation.MaximumAggregator:
prop.DateAggregations[aProp.String()] = agg.Max()
case aggregation.ModeAggregator:
prop.DateAggregations[aProp.String()] = agg.Mode()
case aggregation.CountAggregator:
prop.DateAggregations[aProp.String()] = agg.Count()
case aggregation.MedianAggregator:
prop.DateAggregations[aProp.String()] = agg.Median()
default:
continue
}
}
}
type dateAggregator struct {
count uint64
maxCount uint64
min timestamp
max timestamp
mode timestamp
pairs []timestampCountPair // for row-based median calculation
valueCounter map[timestamp]uint64 // for individual median calculation
}
func newDateAggregator() *dateAggregator {
return &dateAggregator{
min: timestamp{epochNano: math.MaxInt64},
valueCounter: map[timestamp]uint64{},
pairs: make([]timestampCountPair, 0),
}
}
// timestamp allows us to contain multiple representations of a datetime
// the nanosecs value is needed for the numerical comparisons, and the
// string value is what the user expects to see
type timestamp struct {
epochNano int64
rfc3339 string
}
func newTimestamp(epochNano int64) timestamp {
return timestamp{
epochNano: epochNano,
rfc3339: time.Unix(0, epochNano).UTC().Format(time.RFC3339Nano),
}
}
type timestampCountPair struct {
value timestamp
count uint64
}
func (a *dateAggregator) AddTimestamp(rfc3339 string) error {
t, err := time.Parse(time.RFC3339Nano, rfc3339)
if err != nil {
return fmt.Errorf("failed to parse timestamp: %s", err)
}
ts := timestamp{
epochNano: t.UnixNano(),
rfc3339: rfc3339,
}
return a.addRow(ts, 1)
}
func (a *dateAggregator) AddTimestampRow(b []byte, count uint64) error {
nsec, err := inverted.ParseLexicographicallySortableInt64(b)
if err != nil {
return errors.Wrap(err, "read int64")
}
ts := newTimestamp(nsec)
return a.addRow(ts, count)
}
func (a *dateAggregator) addRow(ts timestamp, count uint64) error {
if count == 0 {
// skip
return nil
}
a.count += count
if ts.epochNano < a.min.epochNano {
a.min = ts
}
if ts.epochNano > a.max.epochNano {
a.max = ts
}
currentCount := a.valueCounter[ts]
currentCount += count
a.valueCounter[ts] = currentCount
return nil
}
func (a *dateAggregator) Max() string {
return a.max.rfc3339
}
func (a *dateAggregator) Min() string {
return a.min.rfc3339
}
// Mode does not require preparation if build from rows, but requires a call of
// buildPairsFromCounts() if it was built using individual objects
func (a *dateAggregator) Mode() string {
return a.mode.rfc3339
}
func (a *dateAggregator) Count() int64 {
return int64(a.count)
}
// Median does not require preparation if build from rows, but requires a call of
// buildPairsFromCounts() if it was built using individual objects
//
// Check the numericalAggregator.Median() for details about the calculation
func (a *dateAggregator) Median() string {
middleIndex := a.count / 2
count := uint64(0)
for index, pair := range a.pairs {
count += pair.count
if a.count%2 == 1 && count > middleIndex {
return pair.value.rfc3339 // case a)
} else if a.count%2 == 0 {
if count == middleIndex {
MedianEpochNano := pair.value.epochNano + (a.pairs[index+1].value.epochNano-pair.value.epochNano)/2
return time.Unix(0, MedianEpochNano).UTC().Format(time.RFC3339Nano) // case b2)
} else if count > middleIndex {
return pair.value.rfc3339 // case b1)
}
}
}
panic("Couldn't determine median. This should never happen. Did you add values and call buildRows before?")
}
// turns the value counter into a sorted list, as well as identifying the mode
func (a *dateAggregator) buildPairsFromCounts() {
a.pairs = a.pairs[:0] // clear out old values in case this function called more than once
for value, count := range a.valueCounter {
if count > a.maxCount {
a.maxCount = count
a.mode = value
}
a.pairs = append(a.pairs, timestampCountPair{value: value, count: count})
}
sort.Slice(a.pairs, func(x, y int) bool {
return a.pairs[x].value.epochNano < a.pairs[y].value.epochNano
})
}
|