File size: 3,628 Bytes
b110593
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
//                           _       _
// __      _____  __ ___   ___  __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
//  \ V  V /  __/ (_| |\ V /| | (_| | ||  __/
//   \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
//  Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
//  CONTACT: [email protected]
//

package lsmkv

import (
	"bufio"
	"encoding/binary"
	"io"
	"os"

	"github.com/pkg/errors"
	"github.com/weaviate/weaviate/entities/diskio"
)

type commitloggerParser struct {
	path         string
	strategy     string
	memtable     *Memtable
	reader       io.Reader
	metrics      *Metrics
	replaceCache map[string]segmentReplaceNode
}

func newCommitLoggerParser(path string, activeMemtable *Memtable,

	strategy string, metrics *Metrics,

) *commitloggerParser {
	return &commitloggerParser{
		path:         path,
		memtable:     activeMemtable,
		strategy:     strategy,
		metrics:      metrics,
		replaceCache: map[string]segmentReplaceNode{},
	}
}

func (p *commitloggerParser) Do() error {
	switch p.strategy {
	case StrategyReplace:
		return p.doReplace()
	case StrategyMapCollection, StrategySetCollection:
		return p.doCollection()
	case StrategyRoaringSet:
		return p.doRoaringSet()
	default:
		return errors.Errorf("unknown strategy %s on commit log parse", p.strategy)
	}
}

// doReplace parsers all entries into a cache for deduplication first and only
// imports unique entries into the actual memtable as a final step.
func (p *commitloggerParser) doReplace() error {
	f, err := os.Open(p.path)
	if err != nil {
		return err
	}

	metered := diskio.NewMeteredReader(f, p.metrics.TrackStartupReadWALDiskIO)
	p.reader = bufio.NewReaderSize(metered, 1*1024*1024)

	// errUnexpectedLength indicates that we could not read the commit log to the
	// end, for example because the last element on the log was corrupt.
	var errUnexpectedLength error

	for {
		var commitType CommitType

		err := binary.Read(p.reader, binary.LittleEndian, &commitType)
		if err == io.EOF {
			break
		}

		if err != nil {
			errUnexpectedLength = errors.Wrap(err, "read commit type")
			break
		}

		if CommitTypeReplace.Is(commitType) {
			if err := p.parseReplaceNode(); err != nil {
				errUnexpectedLength = errors.Wrap(err, "read replace node")
				break
			}
		} else {
			f.Close()
			return errors.Errorf("found a %s commit on a replace bucket", commitType.String())
		}
	}

	for _, node := range p.replaceCache {
		var opts []SecondaryKeyOption
		if p.memtable.secondaryIndices > 0 {
			for i, secKey := range node.secondaryKeys {
				opts = append(opts, WithSecondaryKey(i, secKey))
			}
		}
		if node.tombstone {
			p.memtable.setTombstone(node.primaryKey, opts...)
		} else {
			p.memtable.put(node.primaryKey, node.value, opts...)
		}
	}

	if errUnexpectedLength != nil {
		f.Close()
		return errUnexpectedLength
	}

	return f.Close()
}

// parseReplaceNode only parses into the deduplication cache, not into the
// final memtable yet. A second step is required to parse from the cache into
// the actual memtable.
func (p *commitloggerParser) parseReplaceNode() error {
	n, err := ParseReplaceNode(p.reader, p.memtable.secondaryIndices)
	if err != nil {
		return err
	}

	if !n.tombstone {
		p.replaceCache[string(n.primaryKey)] = n
	} else {
		if existing, ok := p.replaceCache[string(n.primaryKey)]; ok {
			existing.tombstone = true
			p.replaceCache[string(n.primaryKey)] = existing
		} else {
			p.replaceCache[string(n.primaryKey)] = n
		}
	}

	return nil
}