[not-fixup] - Reduce memory consumption for Buffered File Writer (#2377)

* correctly use the buffered file writer

* use value from source

* reorder fields

* use only the DetectorKey as a map field

* correctly use the buffered file writer

* use value from source

* reorder fields

* add tests and update

* Fix issue with buffer slices growing

* fix test

* fix

* add singleton

* use shared pool

* optimize

* rename and cleanup

* use correct calculation to grow buffer

* only grow if needed

* address comments

* remove unused

* remove

* rip out Grow

* address coment

* use 2k default buffer

* update comment allow large buffers to be garbage collected
This commit is contained in:
ahrav 2024-02-06 09:22:25 -08:00 committed by GitHub
parent 8104611d6e
commit 843334222c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 295 additions and 36 deletions

View file

@ -449,7 +449,6 @@ func (c *Parser) FromReader(ctx context.Context, stdOut io.Reader, commitChan ch
} }
// Create a new currentDiff and currentCommit // Create a new currentDiff and currentCommit
currentDiff = diff() currentDiff = diff()
// currentDiff = NewDiff(withCustomContentWriter(c.contentWriter()))
currentCommit = &Commit{Message: strings.Builder{}} currentCommit = &Commit{Message: strings.Builder{}}
// Check that the commit line contains a hash and set it. // Check that the commit line contains a hash and set it.
if len(line) >= 47 { if len(line) >= 47 {

View file

@ -13,12 +13,59 @@ import (
"github.com/trufflesecurity/trufflehog/v3/pkg/context" "github.com/trufflesecurity/trufflehog/v3/pkg/context"
) )
// bufferPool is used to store buffers for reuse. type bufPoolOpt func(pool *bufferPool)
var bufferPool = sync.Pool{
// TODO: Consider growing the buffer before returning it if we can find an optimal size. type bufferPool struct {
// Ideally the size would cover the majority of cases without being too large. bufferSize uint32
// This would avoid the need to grow the buffer when writing to it, reducing allocations. *sync.Pool
New: func() any { return new(bytes.Buffer) }, }
const defaultBufferSize = 2 << 10 // 2KB
func newBufferPool(opts ...bufPoolOpt) *bufferPool {
pool := &bufferPool{bufferSize: defaultBufferSize}
for _, opt := range opts {
opt(pool)
}
pool.Pool = &sync.Pool{
New: func() any {
buf := new(bytes.Buffer)
buf.Grow(int(pool.bufferSize))
return buf
},
}
return pool
}
// sharedBufferPool is the shared buffer pool used by all BufferedFileWriters.
// This allows for efficient reuse of buffers across multiple writers.
var sharedBufferPool *bufferPool
func init() { sharedBufferPool = newBufferPool() }
func (bp *bufferPool) get(ctx context.Context) *bytes.Buffer {
buf, ok := bp.Pool.Get().(*bytes.Buffer)
if !ok {
ctx.Logger().Error(fmt.Errorf("buffer pool returned unexpected type"), "using new buffer")
buf = bytes.NewBuffer(make([]byte, 0, bp.bufferSize))
}
return buf
}
func (bp *bufferPool) put(buf *bytes.Buffer) {
// If the buffer is more than twice the default size, release it for garbage collection.
// This prevents us from returning very large buffers to the pool.
const maxAllowedCapacity = 2 * defaultBufferSize
if buf.Cap() > maxAllowedCapacity {
buf = nil // Release the large buffer for garbage collection.
} else {
// Reset the buffer to clear any existing data.
buf.Reset()
}
bp.Put(buf)
} }
// state represents the current mode of BufferedFileWriter. // state represents the current mode of BufferedFileWriter.
@ -39,7 +86,8 @@ type BufferedFileWriter struct {
state state // Current state of the writer. (writeOnly or readOnly) state state // Current state of the writer. (writeOnly or readOnly)
buf bytes.Buffer // Buffer for storing data under the threshold in memory. bufPool *bufferPool // Pool for storing buffers for reuse.
buf *bytes.Buffer // Buffer for storing data under the threshold in memory.
filename string // Name of the temporary file. filename string // Name of the temporary file.
file io.WriteCloser // File for storing data over the threshold. file io.WriteCloser // File for storing data over the threshold.
} }
@ -55,7 +103,11 @@ func WithThreshold(threshold uint64) Option {
// New creates a new BufferedFileWriter with the given options. // New creates a new BufferedFileWriter with the given options.
func New(opts ...Option) *BufferedFileWriter { func New(opts ...Option) *BufferedFileWriter {
const defaultThreshold = 10 * 1024 * 1024 // 10MB const defaultThreshold = 10 * 1024 * 1024 // 10MB
w := &BufferedFileWriter{threshold: defaultThreshold, state: writeOnly} w := &BufferedFileWriter{
threshold: defaultThreshold,
state: writeOnly,
bufPool: sharedBufferPool,
}
for _, opt := range opts { for _, opt := range opts {
opt(w) opt(w)
} }
@ -78,17 +130,16 @@ func (w *BufferedFileWriter) String() (string, error) {
} }
defer file.Close() defer file.Close()
// Create a buffer large enough to hold file data and additional buffer data, if any. var buf bytes.Buffer
fileSize := w.size
buf := bytes.NewBuffer(make([]byte, 0, fileSize))
// Read the file contents into the buffer. // Read the file contents into the buffer.
if _, err := io.Copy(buf, file); err != nil { if _, err := io.CopyBuffer(&buf, file, nil); err != nil {
return "", fmt.Errorf("failed to read file contents: %w", err) return "", fmt.Errorf("failed to read file contents: %w", err)
} }
// Append buffer data, if any, to the end of the file contents. // Append buffer data, if any, to the end of the file contents.
buf.Write(w.buf.Bytes()) if _, err := w.buf.WriteTo(&buf); err != nil {
return "", err
}
return buf.String(), nil return buf.String(), nil
} }
@ -100,33 +151,44 @@ func (w *BufferedFileWriter) Write(ctx context.Context, data []byte) (int, error
} }
size := uint64(len(data)) size := uint64(len(data))
if w.buf == nil || w.buf.Len() == 0 {
w.buf = w.bufPool.get(ctx)
}
bufferLength := w.buf.Len()
defer func() { defer func() {
w.size += size w.size += size
ctx.Logger().V(4).Info( ctx.Logger().V(4).Info(
"write complete", "write complete",
"data_size", size, "data_size", size,
"content_size", w.buf.Len(), "content_size", bufferLength,
"total_size", w.size, "total_size", w.size,
) )
}() }()
if w.buf.Len() == 0 { totalSizeNeeded := uint64(bufferLength) + uint64(len(data))
bufPtr, ok := bufferPool.Get().(*bytes.Buffer) if totalSizeNeeded <= w.threshold {
if !ok {
ctx.Logger().Error(fmt.Errorf("buffer pool returned unexpected type"), "using new buffer")
bufPtr = new(bytes.Buffer)
}
bufPtr.Reset() // Reset the buffer to clear any existing data
w.buf = *bufPtr
}
if uint64(w.buf.Len())+size <= w.threshold {
// If the total size is within the threshold, write to the buffer. // If the total size is within the threshold, write to the buffer.
ctx.Logger().V(4).Info( ctx.Logger().V(4).Info(
"writing to buffer", "writing to buffer",
"data_size", size, "data_size", size,
"content_size", w.buf.Len(), "content_size", bufferLength,
) )
availableSpace := w.buf.Cap() - bufferLength
growSize := int(totalSizeNeeded) - bufferLength
if growSize > availableSpace {
ctx.Logger().V(4).Info(
"buffer size exceeded, growing buffer",
"current_size", bufferLength,
"new_size", totalSizeNeeded,
"available_space", availableSpace,
"grow_size", growSize,
)
}
return w.buf.Write(data) return w.buf.Write(data)
} }
@ -143,14 +205,12 @@ func (w *BufferedFileWriter) Write(ctx context.Context, data []byte) (int, error
// Transfer existing data in buffer to the file, then clear the buffer. // Transfer existing data in buffer to the file, then clear the buffer.
// This ensures all the data is in one place - either entirely in the buffer or the file. // This ensures all the data is in one place - either entirely in the buffer or the file.
if w.buf.Len() > 0 { if bufferLength > 0 {
ctx.Logger().V(4).Info("writing buffer to file", "content_size", w.buf.Len()) ctx.Logger().V(4).Info("writing buffer to file", "content_size", bufferLength)
if _, err := w.file.Write(w.buf.Bytes()); err != nil { if _, err := w.buf.WriteTo(w.file); err != nil {
return 0, err return 0, err
} }
// Reset the buffer to clear any existing data and return it to the pool. w.bufPool.put(w.buf)
w.buf.Reset()
bufferPool.Put(&w.buf)
} }
} }
ctx.Logger().V(4).Info("writing to file", "data_size", size) ctx.Logger().V(4).Info("writing to file", "data_size", size)
@ -167,7 +227,7 @@ func (w *BufferedFileWriter) CloseForWriting() error {
} }
if w.buf.Len() > 0 { if w.buf.Len() > 0 {
_, err := w.file.Write(w.buf.Bytes()) _, err := w.buf.WriteTo(w.file)
if err != nil { if err != nil {
return err return err
} }
@ -199,7 +259,7 @@ func (w *BufferedFileWriter) ReadCloser() (io.ReadCloser, error) {
// Data is in memory. // Data is in memory.
return &bufferReadCloser{ return &bufferReadCloser{
Reader: bytes.NewReader(w.buf.Bytes()), Reader: bytes.NewReader(w.buf.Bytes()),
onClose: func() { bufferPool.Put(&w.buf) }, onClose: func() { w.bufPool.put(w.buf) },
}, nil }, nil
} }

View file

@ -1,6 +1,7 @@
package bufferedfilewriter package bufferedfilewriter
import ( import (
"bytes"
"os" "os"
"testing" "testing"
"time" "time"
@ -89,12 +90,151 @@ func TestBufferedFileWriterString(t *testing.T) {
got, err := writer.String() got, err := writer.String()
assert.NoError(t, err) assert.NoError(t, err)
err = writer.CloseForWriting()
assert.NoError(t, err)
assert.Equal(t, tc.expectedStr, got, "String content mismatch") assert.Equal(t, tc.expectedStr, got, "String content mismatch")
}) })
} }
} }
const (
smallBuffer = 2 << 5 // 64B
mediumBuffer = 2 << 10 // 2KB
smallFile = 2 << 25 // 32MB
mediumFile = 2 << 28 // 256MB
)
func BenchmarkBufferedFileWriterString_BufferOnly_Small(b *testing.B) {
data := bytes.Repeat([]byte("a"), smallBuffer)
ctx := context.Background()
writer := New()
_, err := writer.Write(ctx, data)
assert.NoError(b, err)
benchmarkBufferedFileWriterString(b, writer)
err = writer.CloseForWriting()
assert.NoError(b, err)
rc, err := writer.ReadCloser()
assert.NoError(b, err)
rc.Close()
}
func BenchmarkBufferedFileWriterString_BufferOnly_Medium(b *testing.B) {
data := bytes.Repeat([]byte("a"), mediumBuffer)
ctx := context.Background()
writer := New()
_, err := writer.Write(ctx, data)
assert.NoError(b, err)
benchmarkBufferedFileWriterString(b, writer)
err = writer.CloseForWriting()
assert.NoError(b, err)
rc, err := writer.ReadCloser()
assert.NoError(b, err)
rc.Close()
}
func BenchmarkBufferedFileWriterString_OnlyFile_Small(b *testing.B) {
data := bytes.Repeat([]byte("a"), smallFile)
ctx := context.Background()
writer := New()
_, err := writer.Write(ctx, data)
assert.NoError(b, err)
benchmarkBufferedFileWriterString(b, writer)
err = writer.CloseForWriting()
assert.NoError(b, err)
rc, err := writer.ReadCloser()
assert.NoError(b, err)
rc.Close()
}
func BenchmarkBufferedFileWriterString_OnlyFile_Medium(b *testing.B) {
data := bytes.Repeat([]byte("a"), mediumFile)
ctx := context.Background()
writer := New()
_, err := writer.Write(ctx, data)
assert.NoError(b, err)
benchmarkBufferedFileWriterString(b, writer)
err = writer.CloseForWriting()
assert.NoError(b, err)
rc, err := writer.ReadCloser()
assert.NoError(b, err)
rc.Close()
}
func BenchmarkBufferedFileWriterString_BufferWithFile_Small(b *testing.B) {
data := bytes.Repeat([]byte("a"), smallFile)
ctx := context.Background()
writer := New()
_, err := writer.Write(ctx, data)
assert.NoError(b, err)
// Write again so we also fill up the buffer.
_, err = writer.Write(ctx, data)
assert.NoError(b, err)
benchmarkBufferedFileWriterString(b, writer)
err = writer.CloseForWriting()
assert.NoError(b, err)
rc, err := writer.ReadCloser()
assert.NoError(b, err)
rc.Close()
}
func BenchmarkBufferedFileWriterString_BufferWithFile_Medium(b *testing.B) {
data := bytes.Repeat([]byte("a"), mediumFile)
ctx := context.Background()
writer := New()
_, err := writer.Write(ctx, data)
assert.NoError(b, err)
// Write again so we also fill up the buffer.
_, err = writer.Write(ctx, data)
assert.NoError(b, err)
benchmarkBufferedFileWriterString(b, writer)
err = writer.CloseForWriting()
assert.NoError(b, err)
rc, err := writer.ReadCloser()
assert.NoError(b, err)
rc.Close()
}
func benchmarkBufferedFileWriterString(b *testing.B, w *BufferedFileWriter) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := w.String()
assert.NoError(b, err)
}
b.StopTimer()
}
func TestBufferedFileWriterLen(t *testing.T) { func TestBufferedFileWriterLen(t *testing.T) {
t.Parallel() t.Parallel()
tests := []struct { tests := []struct {
@ -306,3 +446,63 @@ func TestBufferedFileWriterWriteInReadOnlyState(t *testing.T) {
_, err := writer.Write(context.Background(), []byte("should fail")) _, err := writer.Write(context.Background(), []byte("should fail"))
assert.Error(t, err) assert.Error(t, err)
} }
func BenchmarkBufferedFileWriterWriteLarge(b *testing.B) {
ctx := context.Background()
data := make([]byte, 1024*1024*10) // 10MB
for i := range data {
data[i] = byte(i % 256) // Simple pattern to avoid uniform zero data
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
// Threshold is smaller than the data size, data should get flushed to the file.
writer := New(WithThreshold(1024))
b.StartTimer()
{
_, err := writer.Write(ctx, data)
assert.NoError(b, err)
}
b.StopTimer()
// Ensure proper cleanup after each write operation, including closing the file
err := writer.CloseForWriting()
assert.NoError(b, err)
rc, err := writer.ReadCloser()
assert.NoError(b, err)
rc.Close()
}
}
func BenchmarkBufferedFileWriterWriteSmall(b *testing.B) {
ctx := context.Background()
data := make([]byte, 1024*1024) // 1MB
for i := range data {
data[i] = byte(i % 256) // Simple pattern to avoid uniform zero data
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
// Threshold is the same as the buffer size, data should always be written to the buffer.
writer := New(WithThreshold(1024 * 1024))
b.StartTimer()
{
_, err := writer.Write(ctx, data)
assert.NoError(b, err)
}
b.StopTimer()
// Ensure proper cleanup after each write operation, including closing the file.
err := writer.CloseForWriting()
assert.NoError(b, err)
rc, err := writer.ReadCloser()
assert.NoError(b, err)
rc.Close()
}
}