Spaces:
Running
Running
// _ _ | |
// __ _____ __ ___ ___ __ _| |_ ___ | |
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
// \ V V / __/ (_| |\ V /| | (_| | || __/ | |
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
// | |
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. | |
// | |
// CONTACT: [email protected] | |
// | |
package commitlog | |
import ( | |
"encoding/binary" | |
"os" | |
"github.com/pkg/errors" | |
"github.com/weaviate/weaviate/adapters/repos/db/vector/compressionhelpers" | |
) | |
type Logger struct { | |
file *os.File | |
bufw *bufWriter | |
} | |
// TODO: these are duplicates with the hnsw package, unify them | |
type HnswCommitType uint8 // 256 options, plenty of room for future extensions | |
// TODO: these are duplicates with the hnsw package, unify them | |
const ( | |
AddNode HnswCommitType = iota | |
SetEntryPointMaxLevel | |
AddLinkAtLevel | |
ReplaceLinksAtLevel | |
AddTombstone | |
RemoveTombstone | |
ClearLinks | |
DeleteNode | |
ResetIndex | |
ClearLinksAtLevel // added in v1.8.0-rc.1, see https://github.com/weaviate/weaviate/issues/1701 | |
AddLinksAtLevel // added in v1.8.0-rc.1, see https://github.com/weaviate/weaviate/issues/1705 | |
AddPQ | |
) | |
func NewLogger(fileName string) *Logger { | |
file, err := os.Create(fileName) | |
if err != nil { | |
panic(err) | |
} | |
return &Logger{file: file, bufw: NewWriter(file)} | |
} | |
func NewLoggerWithFile(file *os.File) *Logger { | |
return &Logger{file: file, bufw: NewWriterSize(file, 32*1024)} | |
} | |
func (l *Logger) SetEntryPointWithMaxLayer(id uint64, level int) error { | |
toWrite := make([]byte, 11) | |
toWrite[0] = byte(SetEntryPointMaxLevel) | |
binary.LittleEndian.PutUint64(toWrite[1:9], id) | |
binary.LittleEndian.PutUint16(toWrite[9:11], uint16(level)) | |
_, err := l.bufw.Write(toWrite) | |
return err | |
} | |
func (l *Logger) AddNode(id uint64, level int) error { | |
toWrite := make([]byte, 11) | |
toWrite[0] = byte(AddNode) | |
binary.LittleEndian.PutUint64(toWrite[1:9], id) | |
binary.LittleEndian.PutUint16(toWrite[9:11], uint16(level)) | |
_, err := l.bufw.Write(toWrite) | |
return err | |
} | |
func (l *Logger) AddPQ(data compressionhelpers.PQData) error { | |
toWrite := make([]byte, 10) | |
toWrite[0] = byte(AddPQ) | |
binary.LittleEndian.PutUint16(toWrite[1:3], data.Dimensions) | |
toWrite[3] = byte(data.EncoderType) | |
binary.LittleEndian.PutUint16(toWrite[4:6], data.Ks) | |
binary.LittleEndian.PutUint16(toWrite[6:8], data.M) | |
toWrite[8] = data.EncoderDistribution | |
if data.UseBitsEncoding { | |
toWrite[9] = 1 | |
} else { | |
toWrite[9] = 0 | |
} | |
for _, encoder := range data.Encoders { | |
toWrite = append(toWrite, encoder.ExposeDataForRestore()...) | |
} | |
_, err := l.bufw.Write(toWrite) | |
return err | |
} | |
func (l *Logger) AddLinkAtLevel(id uint64, level int, target uint64) error { | |
toWrite := make([]byte, 19) | |
toWrite[0] = byte(AddLinkAtLevel) | |
binary.LittleEndian.PutUint64(toWrite[1:9], id) | |
binary.LittleEndian.PutUint16(toWrite[9:11], uint16(level)) | |
binary.LittleEndian.PutUint64(toWrite[11:19], target) | |
_, err := l.bufw.Write(toWrite) | |
return err | |
} | |
func (l *Logger) AddLinksAtLevel(id uint64, level int, targets []uint64) error { | |
toWrite := make([]byte, 13+len(targets)*8) | |
toWrite[0] = byte(AddLinksAtLevel) | |
binary.LittleEndian.PutUint64(toWrite[1:9], id) | |
binary.LittleEndian.PutUint16(toWrite[9:11], uint16(level)) | |
binary.LittleEndian.PutUint16(toWrite[11:13], uint16(len(targets))) | |
for i, target := range targets { | |
offsetStart := 13 + i*8 | |
offsetEnd := offsetStart + 8 | |
binary.LittleEndian.PutUint64(toWrite[offsetStart:offsetEnd], target) | |
} | |
_, err := l.bufw.Write(toWrite) | |
return err | |
} | |
// chunks links in increments of 8, so that we never have to allocate a dynamic | |
// []byte size which would be guaranteed to escape to the heap | |
func (l *Logger) ReplaceLinksAtLevel(id uint64, level int, targets []uint64) error { | |
headers := make([]byte, 13) | |
headers[0] = byte(ReplaceLinksAtLevel) | |
binary.LittleEndian.PutUint64(headers[1:9], id) | |
binary.LittleEndian.PutUint16(headers[9:11], uint16(level)) | |
binary.LittleEndian.PutUint16(headers[11:13], uint16(len(targets))) | |
_, err := l.bufw.Write(headers) | |
if err != nil { | |
return errors.Wrap(err, "write headers") | |
} | |
i := 0 | |
// chunks of 8 | |
buf := make([]byte, 64) | |
for i < len(targets) { | |
if i != 0 && i%8 == 0 { | |
if _, err := l.bufw.Write(buf); err != nil { | |
return errors.Wrap(err, "write link chunk") | |
} | |
} | |
pos := i % 8 | |
start := pos * 8 | |
end := start + 8 | |
binary.LittleEndian.PutUint64(buf[start:end], targets[i]) | |
i++ | |
} | |
// remainder | |
if i != 0 { | |
start := 0 | |
end := i % 8 * 8 | |
if end == 0 { | |
end = 64 | |
} | |
if _, err := l.bufw.Write(buf[start:end]); err != nil { | |
return errors.Wrap(err, "write link remainder") | |
} | |
} | |
return nil | |
} | |
func (l *Logger) AddTombstone(id uint64) error { | |
toWrite := make([]byte, 9) | |
toWrite[0] = byte(AddTombstone) | |
binary.LittleEndian.PutUint64(toWrite[1:9], id) | |
_, err := l.bufw.Write(toWrite) | |
return err | |
} | |
func (l *Logger) RemoveTombstone(id uint64) error { | |
toWrite := make([]byte, 9) | |
toWrite[0] = byte(RemoveTombstone) | |
binary.LittleEndian.PutUint64(toWrite[1:9], id) | |
_, err := l.bufw.Write(toWrite) | |
return err | |
} | |
func (l *Logger) ClearLinks(id uint64) error { | |
toWrite := make([]byte, 9) | |
toWrite[0] = byte(ClearLinks) | |
binary.LittleEndian.PutUint64(toWrite[1:9], id) | |
_, err := l.bufw.Write(toWrite) | |
return err | |
} | |
func (l *Logger) ClearLinksAtLevel(id uint64, level uint16) error { | |
toWrite := make([]byte, 11) | |
toWrite[0] = byte(ClearLinksAtLevel) | |
binary.LittleEndian.PutUint64(toWrite[1:9], id) | |
binary.LittleEndian.PutUint16(toWrite[9:11], level) | |
_, err := l.bufw.Write(toWrite) | |
return err | |
} | |
func (l *Logger) DeleteNode(id uint64) error { | |
toWrite := make([]byte, 9) | |
toWrite[0] = byte(DeleteNode) | |
binary.LittleEndian.PutUint64(toWrite[1:9], id) | |
_, err := l.bufw.Write(toWrite) | |
return err | |
} | |
func (l *Logger) Reset() error { | |
toWrite := make([]byte, 1) | |
toWrite[0] = byte(ResetIndex) | |
_, err := l.bufw.Write(toWrite) | |
return err | |
} | |
func (l *Logger) FileSize() (int64, error) { | |
i, err := l.file.Stat() | |
if err != nil { | |
return -1, err | |
} | |
return i.Size(), nil | |
} | |
func (l *Logger) FileName() (string, error) { | |
i, err := l.file.Stat() | |
if err != nil { | |
return "", err | |
} | |
return i.Name(), nil | |
} | |
func (l *Logger) Flush() error { | |
return l.bufw.Flush() | |
} | |
func (l *Logger) Close() error { | |
if err := l.bufw.Flush(); err != nil { | |
return err | |
} | |
if err := l.file.Close(); err != nil { | |
return err | |
} | |
return nil | |
} | |