2024-06-05 19:00:48 +00:00
|
|
|
// Package channelmetrics provides a flexible way to wrap Go channels with
|
|
|
|
// additional metrics collection capabilities. This allows for monitoring
|
|
|
|
// and tracking of channel usage and performance using different metrics backends.
|
|
|
|
package channelmetrics
|
|
|
|
|
|
|
|
import (
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/trufflesecurity/trufflehog/v3/pkg/common"
|
|
|
|
"github.com/trufflesecurity/trufflehog/v3/pkg/context"
|
|
|
|
)
|
|
|
|
|
|
|
|
// MetricsCollector is an interface for collecting metrics. Implementations
|
|
|
|
// of this interface can be used to record various channel metrics.
|
|
|
|
type MetricsCollector interface {
|
|
|
|
RecordProduceDuration(duration time.Duration)
|
|
|
|
RecordConsumeDuration(duration time.Duration)
|
|
|
|
RecordChannelLen(size int)
|
|
|
|
RecordChannelCap(capacity int)
|
|
|
|
}
|
|
|
|
|
|
|
|
// ObservableChan wraps a Go channel and collects metrics about its usage.
|
|
|
|
// It supports any type of channel and records metrics using a provided
|
|
|
|
// MetricsCollector implementation.
|
|
|
|
type ObservableChan[T any] struct {
|
2024-06-06 14:58:08 +00:00
|
|
|
ch chan T
|
|
|
|
metrics MetricsCollector
|
2024-06-05 19:00:48 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// NewObservableChan creates a new ObservableChan wrapping the provided channel.
|
|
|
|
// It records the channel's capacity immediately and sets up metrics collection
|
|
|
|
// using the provided MetricsCollector and channel name. The chanName is used to
|
|
|
|
// distinguish between metrics for different channels by incorporating it into
|
|
|
|
// the metric names.
|
|
|
|
func NewObservableChan[T any](ch chan T, metrics MetricsCollector) *ObservableChan[T] {
|
2024-06-07 00:29:23 +00:00
|
|
|
if metrics == nil {
|
|
|
|
metrics = noopCollector{}
|
|
|
|
}
|
2024-06-05 19:00:48 +00:00
|
|
|
oChan := &ObservableChan[T]{
|
2024-06-06 14:58:08 +00:00
|
|
|
ch: ch,
|
|
|
|
metrics: metrics,
|
2024-06-05 19:00:48 +00:00
|
|
|
}
|
2024-06-06 14:58:08 +00:00
|
|
|
oChan.RecordChannelCapacity()
|
|
|
|
// Record the current length of the channel.
|
2024-06-07 00:29:23 +00:00
|
|
|
// Note: The channel is likely empty, but it may contain items if it
|
|
|
|
// was pre-existing.
|
2024-06-06 14:58:08 +00:00
|
|
|
oChan.RecordChannelLen()
|
2024-06-05 19:00:48 +00:00
|
|
|
return oChan
|
|
|
|
}
|
|
|
|
|
|
|
|
// Close closes the channel and records the current size of the channel buffer.
|
|
|
|
func (oc *ObservableChan[T]) Close() {
|
|
|
|
close(oc.ch)
|
|
|
|
oc.RecordChannelLen()
|
|
|
|
}
|
|
|
|
|
|
|
|
// Send sends an item into the channel and records the duration taken to do so.
|
2024-06-07 00:29:23 +00:00
|
|
|
// It also updates the current size of the channel buffer. This method blocks
|
|
|
|
// until the item is sent.
|
2024-06-06 14:58:08 +00:00
|
|
|
func (oc *ObservableChan[T]) Send(item T) { _ = oc.SendCtx(context.Background(), item) }
|
|
|
|
|
2024-06-07 00:29:23 +00:00
|
|
|
// SendCtx sends an item into the channel with context and records the duration
|
|
|
|
// taken to do so. It also updates the current size of the channel buffer and
|
|
|
|
// supports context cancellation.
|
2024-06-06 14:58:08 +00:00
|
|
|
func (oc *ObservableChan[T]) SendCtx(ctx context.Context, item T) error {
|
|
|
|
defer func(start time.Time) {
|
|
|
|
oc.metrics.RecordProduceDuration(time.Since(start))
|
2024-06-05 19:00:48 +00:00
|
|
|
oc.RecordChannelLen()
|
2024-06-06 14:58:08 +00:00
|
|
|
}(time.Now())
|
|
|
|
|
|
|
|
return common.CancellableWrite(ctx, oc.ch, item)
|
2024-06-05 19:00:48 +00:00
|
|
|
}
|
|
|
|
|
2024-06-07 00:29:23 +00:00
|
|
|
// Recv receives an item from the channel and records the duration taken to do
|
|
|
|
// so. It also updates the current size of the channel buffer. This method
|
|
|
|
// blocks until an item is available.
|
2024-06-06 14:58:08 +00:00
|
|
|
func (oc *ObservableChan[T]) Recv() T {
|
|
|
|
v, _ := oc.RecvCtx(context.Background())
|
|
|
|
return v
|
|
|
|
}
|
|
|
|
|
2024-06-07 00:29:23 +00:00
|
|
|
// RecvCtx receives an item from the channel with context and records the
|
|
|
|
// duration taken to do so. It also updates the current size of the channel
|
|
|
|
// buffer and supports context cancellation. If an error occurs, it logs the
|
|
|
|
// error.
|
2024-06-06 14:58:08 +00:00
|
|
|
func (oc *ObservableChan[T]) RecvCtx(ctx context.Context) (T, error) {
|
|
|
|
defer func(start time.Time) {
|
|
|
|
oc.metrics.RecordConsumeDuration(time.Since(start))
|
2024-06-05 19:00:48 +00:00
|
|
|
oc.RecordChannelLen()
|
2024-06-06 14:58:08 +00:00
|
|
|
}(time.Now())
|
|
|
|
|
2024-06-07 00:29:23 +00:00
|
|
|
return common.CancellableRead(ctx, oc.ch)
|
2024-06-05 19:00:48 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// RecordChannelCapacity records the capacity of the channel buffer.
|
2024-06-06 14:58:08 +00:00
|
|
|
func (oc *ObservableChan[T]) RecordChannelCapacity() { oc.metrics.RecordChannelCap(cap(oc.ch)) }
|
2024-06-05 19:00:48 +00:00
|
|
|
|
|
|
|
// RecordChannelLen records the current size of the channel buffer.
|
2024-06-06 14:58:08 +00:00
|
|
|
func (oc *ObservableChan[T]) RecordChannelLen() { oc.metrics.RecordChannelLen(len(oc.ch)) }
|