Spaces:
Running
Running
// _ _ | |
// __ _____ __ ___ ___ __ _| |_ ___ | |
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
// \ V V / __/ (_| |\ V /| | (_| | || __/ | |
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
// | |
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. | |
// | |
// CONTACT: [email protected] | |
// | |
package aggregator | |
import ( | |
"context" | |
"github.com/pkg/errors" | |
"github.com/weaviate/weaviate/entities/aggregation" | |
) | |
// groupedAggregator performs aggregation in groups. This is a two-step | |
// process. First a whole-db scan is performed to identify the groups, then | |
// the top-n groups are selected (the rest is discarded). Only for those top | |
// groups an actual aggregation is performed | |
type groupedAggregator struct { | |
*Aggregator | |
} | |
func newGroupedAggregator(agg *Aggregator) *groupedAggregator { | |
return &groupedAggregator{Aggregator: agg} | |
} | |
func (ga *groupedAggregator) Do(ctx context.Context) (*aggregation.Result, error) { | |
out := aggregation.Result{} | |
groups, err := ga.identifyGroups(ctx) | |
if err != nil { | |
return nil, errors.Wrap(err, "identify groups") | |
} | |
out.Groups = make([]aggregation.Group, len(groups)) | |
for i, g := range groups { | |
res, err := ga.aggregateGroup(ctx, g.res, g.docIDs) | |
if err != nil { | |
return nil, errors.Wrapf(err, "aggregate group %d (%v)", i, | |
g.res.GroupedBy.Value) | |
} | |
out.Groups[i] = res | |
} | |
return &out, nil | |
} | |
// group is a helper construct that contains the final aggregation.Group which | |
// will eventually be served to the user. But it also contains the list of | |
// docIDs in that group, so we can use those to perform the actual aggregation | |
// (for each group) in a second step | |
type group struct { | |
res aggregation.Group | |
docIDs []uint64 | |
} | |
func (ga *groupedAggregator) identifyGroups(ctx context.Context) ([]group, error) { | |
limit := 100 // reasonable default in case we get none | |
if ga.params.Limit != nil { | |
limit = *ga.params.Limit | |
} | |
return newGrouper(ga.Aggregator, limit).Do(ctx) | |
} | |
func (ga *groupedAggregator) aggregateGroup(ctx context.Context, | |
in aggregation.Group, ids []uint64, | |
) (aggregation.Group, error) { | |
out := in | |
fa := newFilteredAggregator(ga.Aggregator) | |
props, err := fa.properties(ctx, ids) | |
if err != nil { | |
return out, errors.Wrap(err, "aggregate properties") | |
} | |
out.Properties = props | |
return out, nil | |
} | |