SemanticSearchPOC / adapters /repos /db /lsmkv /commitlogger_parser.go
KevinStephenson
Adding in weaviate code
b110593
raw
history blame
3.63 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package lsmkv
import (
"bufio"
"encoding/binary"
"io"
"os"
"github.com/pkg/errors"
"github.com/weaviate/weaviate/entities/diskio"
)
type commitloggerParser struct {
path string
strategy string
memtable *Memtable
reader io.Reader
metrics *Metrics
replaceCache map[string]segmentReplaceNode
}
func newCommitLoggerParser(path string, activeMemtable *Memtable,
strategy string, metrics *Metrics,
) *commitloggerParser {
return &commitloggerParser{
path: path,
memtable: activeMemtable,
strategy: strategy,
metrics: metrics,
replaceCache: map[string]segmentReplaceNode{},
}
}
func (p *commitloggerParser) Do() error {
switch p.strategy {
case StrategyReplace:
return p.doReplace()
case StrategyMapCollection, StrategySetCollection:
return p.doCollection()
case StrategyRoaringSet:
return p.doRoaringSet()
default:
return errors.Errorf("unknown strategy %s on commit log parse", p.strategy)
}
}
// doReplace parsers all entries into a cache for deduplication first and only
// imports unique entries into the actual memtable as a final step.
func (p *commitloggerParser) doReplace() error {
f, err := os.Open(p.path)
if err != nil {
return err
}
metered := diskio.NewMeteredReader(f, p.metrics.TrackStartupReadWALDiskIO)
p.reader = bufio.NewReaderSize(metered, 1*1024*1024)
// errUnexpectedLength indicates that we could not read the commit log to the
// end, for example because the last element on the log was corrupt.
var errUnexpectedLength error
for {
var commitType CommitType
err := binary.Read(p.reader, binary.LittleEndian, &commitType)
if err == io.EOF {
break
}
if err != nil {
errUnexpectedLength = errors.Wrap(err, "read commit type")
break
}
if CommitTypeReplace.Is(commitType) {
if err := p.parseReplaceNode(); err != nil {
errUnexpectedLength = errors.Wrap(err, "read replace node")
break
}
} else {
f.Close()
return errors.Errorf("found a %s commit on a replace bucket", commitType.String())
}
}
for _, node := range p.replaceCache {
var opts []SecondaryKeyOption
if p.memtable.secondaryIndices > 0 {
for i, secKey := range node.secondaryKeys {
opts = append(opts, WithSecondaryKey(i, secKey))
}
}
if node.tombstone {
p.memtable.setTombstone(node.primaryKey, opts...)
} else {
p.memtable.put(node.primaryKey, node.value, opts...)
}
}
if errUnexpectedLength != nil {
f.Close()
return errUnexpectedLength
}
return f.Close()
}
// parseReplaceNode only parses into the deduplication cache, not into the
// final memtable yet. A second step is required to parse from the cache into
// the actual memtable.
func (p *commitloggerParser) parseReplaceNode() error {
n, err := ParseReplaceNode(p.reader, p.memtable.secondaryIndices)
if err != nil {
return err
}
if !n.tombstone {
p.replaceCache[string(n.primaryKey)] = n
} else {
if existing, ok := p.replaceCache[string(n.primaryKey)]; ok {
existing.tombstone = true
p.replaceCache[string(n.primaryKey)] = existing
} else {
p.replaceCache[string(n.primaryKey)] = n
}
}
return nil
}