File size: 2,412 Bytes
b110593
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
//                           _       _
// __      _____  __ ___   ___  __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
//  \ V  V /  __/ (_| |\ V /| | (_| | ||  __/
//   \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
//  Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
//  CONTACT: [email protected]
//

package aggregator

import (
	"context"

	"github.com/pkg/errors"
	"github.com/weaviate/weaviate/entities/aggregation"
)

// groupedAggregator performs aggregation in groups. This is a two-step
// process. First a whole-db scan is performed to identify the groups, then
// the top-n groups are selected (the rest is discarded). Only for those top
// groups an actual aggregation is performed
type groupedAggregator struct {
	*Aggregator
}

func newGroupedAggregator(agg *Aggregator) *groupedAggregator {
	return &groupedAggregator{Aggregator: agg}
}

func (ga *groupedAggregator) Do(ctx context.Context) (*aggregation.Result, error) {
	out := aggregation.Result{}

	groups, err := ga.identifyGroups(ctx)
	if err != nil {
		return nil, errors.Wrap(err, "identify groups")
	}

	out.Groups = make([]aggregation.Group, len(groups))
	for i, g := range groups {
		res, err := ga.aggregateGroup(ctx, g.res, g.docIDs)
		if err != nil {
			return nil, errors.Wrapf(err, "aggregate group %d (%v)", i,
				g.res.GroupedBy.Value)
		}
		out.Groups[i] = res
	}

	return &out, nil
}

// group is a helper construct that contains the final aggregation.Group which
// will eventually be served to the user. But it also contains the list of
// docIDs in that group, so we can use those to perform the actual aggregation
// (for each group) in a second step
type group struct {
	res    aggregation.Group
	docIDs []uint64
}

func (ga *groupedAggregator) identifyGroups(ctx context.Context) ([]group, error) {
	limit := 100 // reasonable default in case we get none
	if ga.params.Limit != nil {
		limit = *ga.params.Limit
	}
	return newGrouper(ga.Aggregator, limit).Do(ctx)
}

func (ga *groupedAggregator) aggregateGroup(ctx context.Context,
	in aggregation.Group, ids []uint64,
) (aggregation.Group, error) {
	out := in
	fa := newFilteredAggregator(ga.Aggregator)
	props, err := fa.properties(ctx, ids)
	if err != nil {
		return out, errors.Wrap(err, "aggregate properties")
	}

	out.Properties = props
	return out, nil
}