[feat] - buffered file writer metrics (#2395)

* use diff chan

* correctly use the buffered file writer

* use value from source

* reorder fields

* add tests and update

* Fix issue with buffer slices growing

* fix test

* correctly use the buffered file writer

* use value from source

* reorder fields

* fix

* add singleton

* use shared pool

* optimize

* rename and cleanup

* add metrics

* add print

* rebase

* remove extra inc

* add metrics for checkout time

* add comment

* use microseconds

* add metrics

* add metrics pkg

* add more metrics

* rever test

* remove fields

* fix

* resize and return

* update metric name

* remove comment

* address comments

* add comment
This commit is contained in:
ahrav 2024-02-08 07:38:40 -08:00 committed by GitHub
parent 3b40c4fa63
commit 6557b3b321
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 212 additions and 16 deletions

View file

@ -8,19 +8,51 @@ import (
"io" "io"
"os" "os"
"sync" "sync"
"time"
"github.com/trufflesecurity/trufflehog/v3/pkg/cleantemp" "github.com/trufflesecurity/trufflehog/v3/pkg/cleantemp"
"github.com/trufflesecurity/trufflehog/v3/pkg/context" "github.com/trufflesecurity/trufflehog/v3/pkg/context"
) )
type bufferPoolMetrics struct{}
func (bufferPoolMetrics) recordGrowth(growthAmount int) {
growCount.Inc()
growAmount.Add(float64(growthAmount))
}
func (bufferPoolMetrics) recordShrink(amount int) {
shrinkCount.Inc()
shrinkAmount.Add(float64(amount))
}
func (bufferPoolMetrics) recordCheckoutDuration(duration time.Duration) {
checkoutDuration.Observe(float64(duration.Microseconds()))
checkoutCount.Inc()
checkoutDurationTotal.Add(float64(duration.Microseconds()))
}
func (bufferPoolMetrics) recordBufferRetrival() {
activeBufferCount.Inc()
bufferCount.Inc()
}
func (bufferPoolMetrics) recordBufferReturn(bufCap, bufLen int64) {
activeBufferCount.Dec()
totalBufferSize.Add(float64(bufCap))
totalBufferLength.Add(float64(bufLen))
}
type bufPoolOpt func(pool *bufferPool) type bufPoolOpt func(pool *bufferPool)
type bufferPool struct { type bufferPool struct {
bufferSize uint32 bufferSize uint32
*sync.Pool *sync.Pool
metrics bufferPoolMetrics
} }
const defaultBufferSize = 2 << 10 // 2KB const defaultBufferSize = 1 << 12 // 4KB
func newBufferPool(opts ...bufPoolOpt) *bufferPool { func newBufferPool(opts ...bufPoolOpt) *bufferPool {
pool := &bufferPool{bufferSize: defaultBufferSize} pool := &bufferPool{bufferSize: defaultBufferSize}
@ -29,8 +61,7 @@ func newBufferPool(opts ...bufPoolOpt) *bufferPool {
} }
pool.Pool = &sync.Pool{ pool.Pool = &sync.Pool{
New: func() any { New: func() any {
buf := new(bytes.Buffer) buf := &buffer{Buffer: bytes.NewBuffer(make([]byte, 0, pool.bufferSize))}
buf.Grow(int(pool.bufferSize))
return buf return buf
}, },
} }
@ -44,22 +75,40 @@ var sharedBufferPool *bufferPool
func init() { sharedBufferPool = newBufferPool() } func init() { sharedBufferPool = newBufferPool() }
func (bp *bufferPool) get(ctx context.Context) *bytes.Buffer { // buffer is a wrapper around bytes.Buffer that includes a timestamp for tracking buffer checkout duration.
buf, ok := bp.Pool.Get().(*bytes.Buffer) type buffer struct {
*bytes.Buffer
checkedOut time.Time
}
func (bp *bufferPool) get(ctx context.Context) *buffer {
buf, ok := bp.Pool.Get().(*buffer)
if !ok { if !ok {
ctx.Logger().Error(fmt.Errorf("buffer pool returned unexpected type"), "using new buffer") ctx.Logger().Error(fmt.Errorf("buffer pool returned unexpected type"), "using new buffer")
buf = bytes.NewBuffer(make([]byte, 0, bp.bufferSize)) buf = &buffer{Buffer: bytes.NewBuffer(make([]byte, 0, bp.bufferSize))}
} }
buf.checkedOut = time.Now()
bp.metrics.recordBufferRetrival()
return buf return buf
} }
func (bp *bufferPool) put(buf *bytes.Buffer) { func (bp *bufferPool) growBufferWithSize(buf *buffer, size int) {
// If the buffer is more than twice the default size, release it for garbage collection. // Grow the buffer to accommodate the new data.
bp.metrics.recordGrowth(size)
buf.Grow(size)
}
func (bp *bufferPool) put(buf *buffer) {
bp.metrics.recordBufferReturn(int64(buf.Cap()), int64(buf.Len()))
bp.metrics.recordCheckoutDuration(time.Since(buf.checkedOut))
// If the buffer is more than twice the default size, replace it with a new buffer.
// This prevents us from returning very large buffers to the pool. // This prevents us from returning very large buffers to the pool.
const maxAllowedCapacity = 2 * defaultBufferSize const maxAllowedCapacity = 2 * defaultBufferSize
if buf.Cap() > maxAllowedCapacity { if buf.Cap() > maxAllowedCapacity {
buf = nil // Release the large buffer for garbage collection. bp.metrics.recordShrink(buf.Cap() - defaultBufferSize)
buf = &buffer{Buffer: bytes.NewBuffer(make([]byte, 0, bp.bufferSize))}
} else { } else {
// Reset the buffer to clear any existing data. // Reset the buffer to clear any existing data.
buf.Reset() buf.Reset()
@ -68,6 +117,23 @@ func (bp *bufferPool) put(buf *bytes.Buffer) {
bp.Put(buf) bp.Put(buf)
} }
type bufferedFileWriterMetrics struct{}
func (bufferedFileWriterMetrics) recordDataProcessed(size uint64, dur time.Duration) {
totalWriteSize.Add(float64(size))
totalWriteDuration.Add(float64(dur.Microseconds()))
}
func (bufferedFileWriterMetrics) recordDiskWrite(ctx context.Context, f *os.File) {
diskWriteCount.Inc()
size, err := f.Stat()
if err != nil {
ctx.Logger().Error(err, "failed to get file size for metric")
return
}
fileSizeHistogram.Observe(float64(size.Size()))
}
// state represents the current mode of BufferedFileWriter. // state represents the current mode of BufferedFileWriter.
type state uint8 type state uint8
@ -84,12 +150,14 @@ type BufferedFileWriter struct {
threshold uint64 // Threshold for switching to file writing. threshold uint64 // Threshold for switching to file writing.
size uint64 // Total size of the data written. size uint64 // Total size of the data written.
state state // Current state of the writer. (writeOnly or readOnly)
bufPool *bufferPool // Pool for storing buffers for reuse. bufPool *bufferPool // Pool for storing buffers for reuse.
buf *bytes.Buffer // Buffer for storing data under the threshold in memory. buf *buffer // Buffer for storing data under the threshold in memory.
filename string // Name of the temporary file. filename string // Name of the temporary file.
file io.WriteCloser // File for storing data over the threshold. file io.WriteCloser // File for storing data over the threshold.
state state // Current state of the writer. (writeOnly or readOnly)
metrics bufferedFileWriterMetrics
} }
// Option is a function that modifies a BufferedFileWriter. // Option is a function that modifies a BufferedFileWriter.
@ -100,9 +168,9 @@ func WithThreshold(threshold uint64) Option {
return func(w *BufferedFileWriter) { w.threshold = threshold } return func(w *BufferedFileWriter) { w.threshold = threshold }
} }
const defaultThreshold = 10 * 1024 * 1024 // 10MB
// New creates a new BufferedFileWriter with the given options. // New creates a new BufferedFileWriter with the given options.
func New(opts ...Option) *BufferedFileWriter { func New(opts ...Option) *BufferedFileWriter {
const defaultThreshold = 10 * 1024 * 1024 // 10MB
w := &BufferedFileWriter{ w := &BufferedFileWriter{
threshold: defaultThreshold, threshold: defaultThreshold,
state: writeOnly, state: writeOnly,
@ -111,6 +179,7 @@ func New(opts ...Option) *BufferedFileWriter {
for _, opt := range opts { for _, opt := range opts {
opt(w) opt(w)
} }
return w return w
} }
@ -158,7 +227,10 @@ func (w *BufferedFileWriter) Write(ctx context.Context, data []byte) (int, error
bufferLength := w.buf.Len() bufferLength := w.buf.Len()
defer func() { start := time.Now()
defer func(start time.Time) {
w.metrics.recordDataProcessed(size, time.Since(start))
w.size += size w.size += size
ctx.Logger().V(4).Info( ctx.Logger().V(4).Info(
"write complete", "write complete",
@ -166,9 +238,9 @@ func (w *BufferedFileWriter) Write(ctx context.Context, data []byte) (int, error
"content_size", bufferLength, "content_size", bufferLength,
"total_size", w.size, "total_size", w.size,
) )
}() }(start)
totalSizeNeeded := uint64(bufferLength) + uint64(len(data)) totalSizeNeeded := uint64(bufferLength) + size
if totalSizeNeeded <= w.threshold { if totalSizeNeeded <= w.threshold {
// If the total size is within the threshold, write to the buffer. // If the total size is within the threshold, write to the buffer.
ctx.Logger().V(4).Info( ctx.Logger().V(4).Info(
@ -187,6 +259,12 @@ func (w *BufferedFileWriter) Write(ctx context.Context, data []byte) (int, error
"available_space", availableSpace, "available_space", availableSpace,
"grow_size", growSize, "grow_size", growSize,
) )
// We are manually growing the buffer so we can track the growth via metrics.
// Knowing the exact data size, we directly resize to fit it, rather than exponential growth
// which may require multiple allocations and copies if the size required is much larger
// than double the capacity. Our approach aligns with default behavior when growth sizes
// happen to match current capacity, retaining asymptotic efficiency benefits.
w.bufPool.growBufferWithSize(w.buf, growSize)
} }
return w.buf.Write(data) return w.buf.Write(data)
@ -202,6 +280,7 @@ func (w *BufferedFileWriter) Write(ctx context.Context, data []byte) (int, error
w.filename = file.Name() w.filename = file.Name()
w.file = file w.file = file
w.metrics.recordDiskWrite(ctx, file)
// Transfer existing data in buffer to the file, then clear the buffer. // Transfer existing data in buffer to the file, then clear the buffer.
// This ensures all the data is in one place - either entirely in the buffer or the file. // This ensures all the data is in one place - either entirely in the buffer or the file.

View file

@ -0,0 +1,117 @@
package bufferedfilewriter
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/trufflesecurity/trufflehog/v3/pkg/common"
)
var (
activeBufferCount = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "active_buffer_count",
Help: "Current number of active buffers.",
})
bufferCount = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "buffer_count",
Help: "Total number of buffers managed by the pool.",
})
totalBufferLength = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "total_buffer_length",
Help: "Total length of all buffers combined.",
})
totalBufferSize = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "total_buffer_size",
Help: "Total size of all buffers combined.",
})
growCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "grow_count",
Help: "Total number of times buffers in the pool have grown.",
})
growAmount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "grow_amount",
Help: "Total amount of bytes buffers in the pool have grown by.",
})
shrinkCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "shrink_count",
Help: "Total number of times buffers in the pool have shrunk.",
})
shrinkAmount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "shrink_amount",
Help: "Total amount of bytes buffers in the pool have shrunk by.",
})
checkoutDurationTotal = promauto.NewCounter(prometheus.CounterOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "checkout_duration_total_us",
Help: "Total duration in microseconds of buffer checkouts.",
})
checkoutDuration = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "checkout_duration_us",
Help: "Duration in microseconds of buffer checkouts.",
Buckets: []float64{50, 500, 5000},
})
checkoutCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "checkout_count",
Help: "Total number of buffer checkouts.",
})
totalWriteSize = promauto.NewCounter(prometheus.CounterOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "total_write_size_bytes",
Help: "Total size of data written by the BufferedFileWriter in bytes.",
})
totalWriteDuration = promauto.NewCounter(prometheus.CounterOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "total_write_duration_microseconds",
Help: "Total duration of write operations by the BufferedFileWriter in microseconds.",
})
diskWriteCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "disk_write_count",
Help: "Total number of times data was written to disk by the BufferedFileWriter.",
})
fileSizeHistogram = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "file_size_bytes",
Help: "Sizes of files created by the BufferedFileWriter.",
Buckets: prometheus.ExponentialBuckets(defaultThreshold, 2, 4),
})
)