// _ _ // __ _____ __ ___ ___ __ _| |_ ___ // \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ // \ V V / __/ (_| |\ V /| | (_| | || __/ // \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| // // Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. // // CONTACT: hello@weaviate.io // package commitlog import ( "io" "os" "unicode/utf8" ) const ( defaultBufSize = 4096 ) // bufWriter implements buffering for an *os.File object. // If an error occurs writing to a bufWriter, no more data will be // accepted and all subsequent writes, and Flush, will return the error. // After all data has been written, the client should call the // Flush method to guarantee all data has been forwarded to // the underlying *os.File. type bufWriter struct { err error buf []byte n int wr *os.File } // NewWriterSize returns a new Writer whose buffer has at least the specified // size. If the argument *os.File is already a Writer with large enough // size, it returns the underlying Writer. func NewWriterSize(w *os.File, size int) *bufWriter { if size <= 0 { size = defaultBufSize } return &bufWriter{ buf: make([]byte, size), wr: w, } } // NewWriter returns a new Writer whose buffer has the default size. func NewWriter(w *os.File) *bufWriter { return NewWriterSize(w, defaultBufSize) } // Size returns the size of the underlying buffer in bytes. func (b *bufWriter) Size() int { return len(b.buf) } // Reset discards any unflushed buffered data, clears any error, and // resets b to write its output to w. func (b *bufWriter) Reset(w *os.File) { b.err = nil b.n = 0 b.wr = w } // Flush writes any buffered data to the underlying *os.File. func (b *bufWriter) Flush() error { if b.err != nil { return b.err } if b.n == 0 { return nil } n, err := b.wr.Write(b.buf[0:b.n]) if n < b.n && err == nil { err = io.ErrShortWrite } if err != nil { if n > 0 && n < b.n { copy(b.buf[0:b.n-n], b.buf[n:b.n]) } b.n -= n b.err = err return err } b.n = 0 return nil } // Available returns how many bytes are unused in the buffer. func (b *bufWriter) Available() int { return len(b.buf) - b.n } // Buffered returns the number of bytes that have been written into the current buffer. func (b *bufWriter) Buffered() int { return b.n } // Write writes the contents of p into the buffer. // It returns the number of bytes written. // If nn < len(p), it also returns an error explaining // why the write is short. func (b *bufWriter) Write(p []byte) (nn int, err error) { for len(p) > b.Available() && b.err == nil { var n int if b.Buffered() == 0 { // Large write, empty buffer. // Write directly from p to avoid copy. n, b.err = b.wr.Write(p) } else { n = copy(b.buf[b.n:], p) b.n += n b.Flush() } nn += n p = p[n:] } if b.err != nil { return nn, b.err } n := copy(b.buf[b.n:], p) b.n += n nn += n return nn, nil } // WriteByte writes a single byte. func (b *bufWriter) WriteByte(c byte) error { if b.err != nil { return b.err } if b.Available() <= 0 && b.Flush() != nil { return b.err } b.buf[b.n] = c b.n++ return nil } // WriteRune writes a single Unicode code point, returning // the number of bytes written and any error. func (b *bufWriter) WriteRune(r rune) (size int, err error) { if r < utf8.RuneSelf { err = b.WriteByte(byte(r)) if err != nil { return 0, err } return 1, nil } if b.err != nil { return 0, b.err } n := b.Available() if n < utf8.UTFMax { if b.Flush(); b.err != nil { return 0, b.err } n = b.Available() if n < utf8.UTFMax { // Can only happen if buffer is silly small. return b.WriteString(string(r)) } } size = utf8.EncodeRune(b.buf[b.n:], r) b.n += size return size, nil } // WriteString writes a string. // It returns the number of bytes written. // If the count is less than len(s), it also returns an error explaining // why the write is short. func (b *bufWriter) WriteString(s string) (int, error) { nn := 0 for len(s) > b.Available() && b.err == nil { n := copy(b.buf[b.n:], s) b.n += n nn += n s = s[n:] b.Flush() } if b.err != nil { return nn, b.err } n := copy(b.buf[b.n:], s) b.n += n nn += n return nn, nil }