KevinStephenson
Adding in weaviate code
b110593
raw
history blame
6.59 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package commitlog
import (
"encoding/binary"
"os"
"github.com/pkg/errors"
"github.com/weaviate/weaviate/adapters/repos/db/vector/compressionhelpers"
)
type Logger struct {
file *os.File
bufw *bufWriter
}
// TODO: these are duplicates with the hnsw package, unify them
type HnswCommitType uint8 // 256 options, plenty of room for future extensions
// TODO: these are duplicates with the hnsw package, unify them
const (
AddNode HnswCommitType = iota
SetEntryPointMaxLevel
AddLinkAtLevel
ReplaceLinksAtLevel
AddTombstone
RemoveTombstone
ClearLinks
DeleteNode
ResetIndex
ClearLinksAtLevel // added in v1.8.0-rc.1, see https://github.com/weaviate/weaviate/issues/1701
AddLinksAtLevel // added in v1.8.0-rc.1, see https://github.com/weaviate/weaviate/issues/1705
AddPQ
)
func NewLogger(fileName string) *Logger {
file, err := os.Create(fileName)
if err != nil {
panic(err)
}
return &Logger{file: file, bufw: NewWriter(file)}
}
func NewLoggerWithFile(file *os.File) *Logger {
return &Logger{file: file, bufw: NewWriterSize(file, 32*1024)}
}
func (l *Logger) SetEntryPointWithMaxLayer(id uint64, level int) error {
toWrite := make([]byte, 11)
toWrite[0] = byte(SetEntryPointMaxLevel)
binary.LittleEndian.PutUint64(toWrite[1:9], id)
binary.LittleEndian.PutUint16(toWrite[9:11], uint16(level))
_, err := l.bufw.Write(toWrite)
return err
}
func (l *Logger) AddNode(id uint64, level int) error {
toWrite := make([]byte, 11)
toWrite[0] = byte(AddNode)
binary.LittleEndian.PutUint64(toWrite[1:9], id)
binary.LittleEndian.PutUint16(toWrite[9:11], uint16(level))
_, err := l.bufw.Write(toWrite)
return err
}
func (l *Logger) AddPQ(data compressionhelpers.PQData) error {
toWrite := make([]byte, 10)
toWrite[0] = byte(AddPQ)
binary.LittleEndian.PutUint16(toWrite[1:3], data.Dimensions)
toWrite[3] = byte(data.EncoderType)
binary.LittleEndian.PutUint16(toWrite[4:6], data.Ks)
binary.LittleEndian.PutUint16(toWrite[6:8], data.M)
toWrite[8] = data.EncoderDistribution
if data.UseBitsEncoding {
toWrite[9] = 1
} else {
toWrite[9] = 0
}
for _, encoder := range data.Encoders {
toWrite = append(toWrite, encoder.ExposeDataForRestore()...)
}
_, err := l.bufw.Write(toWrite)
return err
}
func (l *Logger) AddLinkAtLevel(id uint64, level int, target uint64) error {
toWrite := make([]byte, 19)
toWrite[0] = byte(AddLinkAtLevel)
binary.LittleEndian.PutUint64(toWrite[1:9], id)
binary.LittleEndian.PutUint16(toWrite[9:11], uint16(level))
binary.LittleEndian.PutUint64(toWrite[11:19], target)
_, err := l.bufw.Write(toWrite)
return err
}
func (l *Logger) AddLinksAtLevel(id uint64, level int, targets []uint64) error {
toWrite := make([]byte, 13+len(targets)*8)
toWrite[0] = byte(AddLinksAtLevel)
binary.LittleEndian.PutUint64(toWrite[1:9], id)
binary.LittleEndian.PutUint16(toWrite[9:11], uint16(level))
binary.LittleEndian.PutUint16(toWrite[11:13], uint16(len(targets)))
for i, target := range targets {
offsetStart := 13 + i*8
offsetEnd := offsetStart + 8
binary.LittleEndian.PutUint64(toWrite[offsetStart:offsetEnd], target)
}
_, err := l.bufw.Write(toWrite)
return err
}
// chunks links in increments of 8, so that we never have to allocate a dynamic
// []byte size which would be guaranteed to escape to the heap
func (l *Logger) ReplaceLinksAtLevel(id uint64, level int, targets []uint64) error {
headers := make([]byte, 13)
headers[0] = byte(ReplaceLinksAtLevel)
binary.LittleEndian.PutUint64(headers[1:9], id)
binary.LittleEndian.PutUint16(headers[9:11], uint16(level))
binary.LittleEndian.PutUint16(headers[11:13], uint16(len(targets)))
_, err := l.bufw.Write(headers)
if err != nil {
return errors.Wrap(err, "write headers")
}
i := 0
// chunks of 8
buf := make([]byte, 64)
for i < len(targets) {
if i != 0 && i%8 == 0 {
if _, err := l.bufw.Write(buf); err != nil {
return errors.Wrap(err, "write link chunk")
}
}
pos := i % 8
start := pos * 8
end := start + 8
binary.LittleEndian.PutUint64(buf[start:end], targets[i])
i++
}
// remainder
if i != 0 {
start := 0
end := i % 8 * 8
if end == 0 {
end = 64
}
if _, err := l.bufw.Write(buf[start:end]); err != nil {
return errors.Wrap(err, "write link remainder")
}
}
return nil
}
func (l *Logger) AddTombstone(id uint64) error {
toWrite := make([]byte, 9)
toWrite[0] = byte(AddTombstone)
binary.LittleEndian.PutUint64(toWrite[1:9], id)
_, err := l.bufw.Write(toWrite)
return err
}
func (l *Logger) RemoveTombstone(id uint64) error {
toWrite := make([]byte, 9)
toWrite[0] = byte(RemoveTombstone)
binary.LittleEndian.PutUint64(toWrite[1:9], id)
_, err := l.bufw.Write(toWrite)
return err
}
func (l *Logger) ClearLinks(id uint64) error {
toWrite := make([]byte, 9)
toWrite[0] = byte(ClearLinks)
binary.LittleEndian.PutUint64(toWrite[1:9], id)
_, err := l.bufw.Write(toWrite)
return err
}
func (l *Logger) ClearLinksAtLevel(id uint64, level uint16) error {
toWrite := make([]byte, 11)
toWrite[0] = byte(ClearLinksAtLevel)
binary.LittleEndian.PutUint64(toWrite[1:9], id)
binary.LittleEndian.PutUint16(toWrite[9:11], level)
_, err := l.bufw.Write(toWrite)
return err
}
func (l *Logger) DeleteNode(id uint64) error {
toWrite := make([]byte, 9)
toWrite[0] = byte(DeleteNode)
binary.LittleEndian.PutUint64(toWrite[1:9], id)
_, err := l.bufw.Write(toWrite)
return err
}
func (l *Logger) Reset() error {
toWrite := make([]byte, 1)
toWrite[0] = byte(ResetIndex)
_, err := l.bufw.Write(toWrite)
return err
}
func (l *Logger) FileSize() (int64, error) {
i, err := l.file.Stat()
if err != nil {
return -1, err
}
return i.Size(), nil
}
func (l *Logger) FileName() (string, error) {
i, err := l.file.Stat()
if err != nil {
return "", err
}
return i.Name(), nil
}
func (l *Logger) Flush() error {
return l.bufw.Flush()
}
func (l *Logger) Close() error {
if err := l.bufw.Flush(); err != nil {
return err
}
if err := l.file.Close(); err != nil {
return err
}
return nil
}