File size: 12,520 Bytes
b110593
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
//                           _       _
// __      _____  __ ___   ___  __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
//  \ V  V /  __/ (_| |\ V /| | (_| | ||  __/
//   \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
//  Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
//  CONTACT: [email protected]
//

package refcache

import (
	"context"
	"fmt"
	"sync"

	"github.com/pkg/errors"
	"github.com/sirupsen/logrus"
	"github.com/weaviate/weaviate/entities/additional"
	"github.com/weaviate/weaviate/entities/models"
	"github.com/weaviate/weaviate/entities/multi"
	"github.com/weaviate/weaviate/entities/schema/crossref"
	"github.com/weaviate/weaviate/entities/search"
)

type repo interface {
	MultiGet(ctx context.Context, query []multi.Identifier,
		additional additional.Properties, tenant string) ([]search.Result, error)
}

func NewCacher(repo repo, logger logrus.FieldLogger, tenant string) *Cacher {
	return &Cacher{
		logger:    logger,
		repo:      repo,
		store:     map[multi.Identifier]search.Result{},
		withGroup: false,
		tenant:    tenant,
	}
}

func NewCacherWithGroup(repo repo, logger logrus.FieldLogger, tenant string) *Cacher {
	return &Cacher{
		logger: logger,
		repo:   repo,
		store:  map[multi.Identifier]search.Result{},
		// for groupBy feature
		withGroup:                true,
		getGroupSelectProperties: getGroupSelectProperties,
		tenant:                   tenant,
	}
}

type cacherJob struct {
	si       multi.Identifier
	props    search.SelectProperties
	complete bool
}

type Cacher struct {
	sync.Mutex
	jobs       []cacherJob
	logger     logrus.FieldLogger
	repo       repo
	store      map[multi.Identifier]search.Result
	additional additional.Properties // meta is immutable for the lifetime of the request cacher, so we can safely store it
	// for groupBy feature
	withGroup                bool
	getGroupSelectProperties func(properties search.SelectProperties) search.SelectProperties
	tenant                   string
}

func (c *Cacher) Get(si multi.Identifier) (search.Result, bool) {
	sr, ok := c.store[si]
	return sr, ok
}

// Build builds the lookup cache recursively and tries to be smart about it. This
// means that it aims to use only a single (multiget) transaction per layer.
// The recursion exit condition is jobs marked as done. At some point
// the cacher will realise that for every nested prop there is already a
// complete job, so it it stop the recursion.
//
// build is called on a "level" i.e. the search result. After working
// on the job list for the first time if the resolved items still contain
// references and the user set the SelectProperty to indicate they want to
// resolve them, build is called again on all the results (plural!) from the
// previous run. We thus end up with one request to the backend per level
// regardless of the amount of lookups per level.
//
// This keeps request times to a minimum even on deeply nested requests.
func (c *Cacher) Build(ctx context.Context, objects []search.Result,
	properties search.SelectProperties, additional additional.Properties,
) error {
	c.additional = additional
	err := c.findJobsFromResponse(objects, properties)
	if err != nil {
		return fmt.Errorf("build request cache: %v", err)
	}

	c.dedupJobList()
	err = c.fetchJobs(ctx)
	if err != nil {
		return fmt.Errorf("build request cache: %v", err)
	}

	return nil
}

// A response is a []search.Result which has all primitive props parsed (and
// even ref-beacons parsed into their respective types, but not resolved!)
// findJobsFromResponse will traverse through it and  check if there are
// references. In a recursive lookup this can both be done on the rootlevel to
// start the first lookup as well as recursively on the results of a lookup to
// further look if a next-level call is required.
func (c *Cacher) findJobsFromResponse(objects []search.Result, properties search.SelectProperties) error {
	for _, obj := range objects {
		var err error

		// we can only set SelectProperties on the rootlevel since this is the only
		// place where we have a single root class. In nested lookups we need to
		// first identify the correct path in the SelectProperties graph which
		// correspends with the path we're currently traversing through. Thus we
		// always cache the original SelectProps with the job. This call goes
		// through the job history and looks up the correct SelectProperties
		// subpath to use in this place.
		// tl;dr: On root level (root=base) take props from the outside, on a
		// nested level lookup the SelectProps matching the current base element
		propertiesReplaced, err := c.ReplaceInitialPropertiesWithSpecific(obj, properties)
		if err != nil {
			return err
		}

		if obj.Schema == nil {
			return nil
		}

		schemaMap, ok := obj.Schema.(map[string]interface{})
		if !ok {
			return fmt.Errorf("object schema is present, but not a map: %T", obj)
		}

		if err := c.parseSchemaMap(schemaMap, propertiesReplaced); err != nil {
			return err
		}

		if c.withGroup {
			if err := c.parseAdditionalGroup(obj, properties); err != nil {
				return err
			}
		}
	}

	return nil
}

func (c *Cacher) parseAdditionalGroup(obj search.Result, properties search.SelectProperties) error {
	if obj.AdditionalProperties != nil && obj.AdditionalProperties["group"] != nil {
		if group, ok := obj.AdditionalProperties["group"].(*additional.Group); ok {
			for _, hitMap := range group.Hits {
				if err := c.parseSchemaMap(hitMap, c.getGroupSelectProperties(properties)); err != nil {
					return err
				}
			}
		}
	}
	return nil
}

func (c *Cacher) parseSchemaMap(schemaMap map[string]interface{}, propertiesReplaced search.SelectProperties) error {
	for key, value := range schemaMap {
		selectProp := propertiesReplaced.FindProperty(key)
		skip, unresolved := c.skipProperty(key, value, selectProp)
		if skip {
			continue
		}

		for _, selectPropRef := range selectProp.Refs {
			innerProperties := selectPropRef.RefProperties

			for _, item := range unresolved {
				ref, err := c.extractAndParseBeacon(item)
				if err != nil {
					return err
				}
				c.addJob(multi.Identifier{
					ID:        ref.TargetID.String(),
					ClassName: selectPropRef.ClassName,
				}, innerProperties)
			}
		}
	}
	return nil
}

func (c *Cacher) skipProperty(key string, value interface{}, selectProp *search.SelectProperty) (bool, models.MultipleRef) {
	// the cacher runs at a point where primitive props have already been
	// parsed, so we can simply look for parsed, but not resolved refenereces
	parsed, ok := value.(models.MultipleRef)
	if !ok {
		// must be another kind of prop, not interesting for us
		return true, nil
	}

	if selectProp == nil {
		// while we did hit a ref propr, the user is not interested in resolving
		// this prop
		return true, nil
	}

	return false, parsed
}

func (c *Cacher) extractAndParseBeacon(item *models.SingleRef) (*crossref.Ref, error) {
	return crossref.Parse(item.Beacon.String())
}

func (c *Cacher) ReplaceInitialPropertiesWithSpecific(obj search.Result,
	properties search.SelectProperties,
) (search.SelectProperties, error) {
	if properties != nil {
		// don't overwrite the properties if the caller has explicitly set them,
		// this can only mean they're at the root level
		return properties, nil
	}

	// this is a nested level, we cannot rely on global initialSelectProperties
	// anymore, instead we need to find the selectProperties for exactly this
	// ID
	job, ok := c.findJob(multi.Identifier{
		ID:        obj.ID.String(),
		ClassName: obj.ClassName,
	})
	if ok {
		return job.props, nil
	}

	return properties, nil
}

func (c *Cacher) addJob(si multi.Identifier, props search.SelectProperties) {
	c.jobs = append(c.jobs, cacherJob{si, props, false})
}

func (c *Cacher) findJob(si multi.Identifier) (cacherJob, bool) {
	for _, job := range c.jobs {
		if job.si == si {
			return job, true
		}
	}

	return cacherJob{}, false
}

// finds incompleteJobs without altering the original job list
func (c *Cacher) incompleteJobs() []cacherJob {
	out := make([]cacherJob, len(c.jobs))
	n := 0
	for _, job := range c.jobs {
		if !job.complete {
			out[n] = job
			n++
		}
	}

	return out[:n]
}

// finds complete jobs  without altering the original job list
func (c *Cacher) completeJobs() []cacherJob {
	out := make([]cacherJob, len(c.jobs))
	n := 0
	for _, job := range c.jobs {
		if job.complete {
			out[n] = job
			n++
		}
	}

	return out[:n]
}

// alters the list, removes duplicates. Ignores complete jobs, as a job could
// already marked as complete, but not yet stored since the completion is the
// exit condition for the recursion. However, the storage can only happen once
// the schema was parsed. If the schema contains more refs to an item that is
// already in the joblist we are in a catch-22. To resolve that, we allow
// duplicates with already complete jobs since retrieving the required item
// again (with different SelectProperties) comes at minimal cost and is the
// only way out of that deadlock situation.
func (c *Cacher) dedupJobList() {
	incompleteJobs := c.incompleteJobs()
	before := len(incompleteJobs)
	if before == 0 {
		// nothing to do
		return
	}
	c.logger.
		WithFields(logrus.Fields{
			"action": "request_cacher_dedup_joblist_start",
			"jobs":   before,
		}).
		Debug("starting job list deduplication")
	deduped := make([]cacherJob, len(incompleteJobs))
	found := map[multi.Identifier]struct{}{}

	n := 0
	for _, job := range incompleteJobs {
		if _, ok := found[job.si]; ok {
			continue
		}

		found[job.si] = struct{}{}
		deduped[n] = job
		n++
	}

	c.jobs = append(c.completeJobs(), deduped[:n]...)

	c.logger.
		WithFields(logrus.Fields{
			"action":      "request_cacher_dedup_joblist_complete",
			"jobs":        n,
			"removedJobs": before - n,
		}).
		Debug("completed job list deduplication")
}

func (c *Cacher) fetchJobs(ctx context.Context) error {
	jobs := c.incompleteJobs()
	if len(jobs) == 0 {
		c.logSkipFetchJobs()
		return nil
	}

	query := jobListToMultiGetQuery(jobs)
	res, err := c.repo.MultiGet(ctx, query, c.additional, c.tenant)
	if err != nil {
		return errors.Wrap(err, "fetch job list")
	}

	return c.parseAndStore(ctx, res)
}

func (c *Cacher) logSkipFetchJobs() {
	c.logger.
		WithFields(
			logrus.Fields{
				"action": "request_cacher_fetch_jobs_skip",
			}).
		Trace("skip fetch jobs, have no incomplete jobs")
}

// parseAndStore parses the results for nested refs. Since it is already a
// []search.Result no other parsing is required, as we can expect this type to
// have all primitive props parsed correctly
//
// If nested refs are found, the recursion is started.
//
// Once no more nested refs can be found, the recursion triggers its exit
// condition and all jobs are stored.
func (c *Cacher) parseAndStore(ctx context.Context, res []search.Result) error {
	// mark all current jobs as done, as we use the amount of incomplete jobs as
	// the exit condition for the recursion. Next up, we will start a nested
	// Build() call. If the Build call returns no new jobs, we are done and the
	// recursion stops. If it does return more jobs, we will enter a nested
	// iteration which will eventually come to this place again
	c.markAllJobsAsDone()

	err := c.Build(ctx, removeEmptyResults(res), nil, c.additional)
	if err != nil {
		return errors.Wrap(err, "build nested cache")
	}

	err = c.storeResults(res)
	if err != nil {
		return err
	}

	return nil
}

func removeEmptyResults(in []search.Result) []search.Result {
	out := make([]search.Result, len(in))
	n := 0
	for _, obj := range in {
		if obj.ID != "" {
			out[n] = obj
			n++
		}
	}

	return out[0:n]
}

func (c *Cacher) storeResults(res search.Results) error {
	for _, item := range res {
		c.store[multi.Identifier{
			ID:        item.ID.String(),
			ClassName: item.ClassName,
		}] = item
	}

	return nil
}

func (c *Cacher) markAllJobsAsDone() {
	for i := range c.jobs {
		c.jobs[i].complete = true
	}
}

func jobListToMultiGetQuery(jobs []cacherJob) []multi.Identifier {
	query := make([]multi.Identifier, len(jobs))
	for i, job := range jobs {
		query[i] = job.si
	}

	return query
}