From a330aa6f53dc661ce239c4e1b867ae5fa193bb20 Mon Sep 17 00:00:00 2001 From: Miccah Date: Thu, 6 Jun 2024 17:29:23 -0700 Subject: [PATCH] [chore] Polish channelmetrics package (#2938) --- pkg/channelmetrics/noopcollector.go | 12 +++++++++ pkg/channelmetrics/observablechan.go | 30 ++++++++++++++--------- pkg/channelmetrics/observablechan_test.go | 19 ++++++++++++++ pkg/common/context.go | 23 ++++++++++++----- 4 files changed, 66 insertions(+), 18 deletions(-) create mode 100644 pkg/channelmetrics/noopcollector.go diff --git a/pkg/channelmetrics/noopcollector.go b/pkg/channelmetrics/noopcollector.go new file mode 100644 index 000000000..7bdd7f119 --- /dev/null +++ b/pkg/channelmetrics/noopcollector.go @@ -0,0 +1,12 @@ +package channelmetrics + +import "time" + +// noopCollector is a default implementation of the MetricsCollector interface +// for internal package use only. +type noopCollector struct{} + +func (noopCollector) RecordProduceDuration(duration time.Duration) {} +func (noopCollector) RecordConsumeDuration(duration time.Duration) {} +func (noopCollector) RecordChannelLen(size int) {} +func (noopCollector) RecordChannelCap(capacity int) {} diff --git a/pkg/channelmetrics/observablechan.go b/pkg/channelmetrics/observablechan.go index 07197844c..09683faa5 100644 --- a/pkg/channelmetrics/observablechan.go +++ b/pkg/channelmetrics/observablechan.go @@ -33,13 +33,17 @@ type ObservableChan[T any] struct { // distinguish between metrics for different channels by incorporating it into // the metric names. func NewObservableChan[T any](ch chan T, metrics MetricsCollector) *ObservableChan[T] { + if metrics == nil { + metrics = noopCollector{} + } oChan := &ObservableChan[T]{ ch: ch, metrics: metrics, } oChan.RecordChannelCapacity() // Record the current length of the channel. - // Note: The channel is likely empty, but it may contain items if it was pre-existing. + // Note: The channel is likely empty, but it may contain items if it + // was pre-existing. oChan.RecordChannelLen() return oChan } @@ -51,12 +55,13 @@ func (oc *ObservableChan[T]) Close() { } // Send sends an item into the channel and records the duration taken to do so. -// It also updates the current size of the channel buffer. -// This method blocks until the item is sent. +// It also updates the current size of the channel buffer. This method blocks +// until the item is sent. func (oc *ObservableChan[T]) Send(item T) { _ = oc.SendCtx(context.Background(), item) } -// SendCtx sends an item into the channel with context and records the duration taken to do so. -// It also updates the current size of the channel buffer and supports context cancellation. +// SendCtx sends an item into the channel with context and records the duration +// taken to do so. It also updates the current size of the channel buffer and +// supports context cancellation. func (oc *ObservableChan[T]) SendCtx(ctx context.Context, item T) error { defer func(start time.Time) { oc.metrics.RecordProduceDuration(time.Since(start)) @@ -66,24 +71,25 @@ func (oc *ObservableChan[T]) SendCtx(ctx context.Context, item T) error { return common.CancellableWrite(ctx, oc.ch, item) } -// Recv receives an item from the channel and records the duration taken to do so. -// It also updates the current size of the channel buffer. -// This method blocks until an item is available. +// Recv receives an item from the channel and records the duration taken to do +// so. It also updates the current size of the channel buffer. This method +// blocks until an item is available. func (oc *ObservableChan[T]) Recv() T { v, _ := oc.RecvCtx(context.Background()) return v } -// RecvCtx receives an item from the channel with context and records the duration taken to do so. -// It also updates the current size of the channel buffer and supports context cancellation. -// If an error occurs, it logs the error. +// RecvCtx receives an item from the channel with context and records the +// duration taken to do so. It also updates the current size of the channel +// buffer and supports context cancellation. If an error occurs, it logs the +// error. func (oc *ObservableChan[T]) RecvCtx(ctx context.Context) (T, error) { defer func(start time.Time) { oc.metrics.RecordConsumeDuration(time.Since(start)) oc.RecordChannelLen() }(time.Now()) - return common.CancellableRecv(ctx, oc.ch) + return common.CancellableRead(ctx, oc.ch) } // RecordChannelCapacity records the capacity of the channel buffer. diff --git a/pkg/channelmetrics/observablechan_test.go b/pkg/channelmetrics/observablechan_test.go index e8fa7aac1..8e3973891 100644 --- a/pkg/channelmetrics/observablechan_test.go +++ b/pkg/channelmetrics/observablechan_test.go @@ -118,3 +118,22 @@ func TestObservableChan_Close(t *testing.T) { mockMetrics.AssertExpectations(t) } + +func TestObservableChanClosed(t *testing.T) { + t.Parallel() + + ch := make(chan int) + close(ch) + oc := NewObservableChan(ch, nil) + + ctx, cancel := context.WithCancel(context.Background()) + // Closed channel should return with an error. + v, err := oc.RecvCtx(ctx) + assert.Error(t, err) + assert.Equal(t, 0, v) + + // Cancelled context should also return with an error. + cancel() + _, err = oc.RecvCtx(ctx) + assert.Error(t, err) +} diff --git a/pkg/common/context.go b/pkg/common/context.go index 7c8a0aa20..1602f1b7a 100644 --- a/pkg/common/context.go +++ b/pkg/common/context.go @@ -2,6 +2,11 @@ package common import "context" +// ChannelClosedErr indicates that a read was performed from a closed channel. +type ChannelClosedErr struct{} + +func (ChannelClosedErr) Error() string { return "channel is closed" } + func IsDone(ctx context.Context) bool { select { case <-ctx.Done(): @@ -13,7 +18,8 @@ func IsDone(ctx context.Context) bool { // CancellableWrite blocks on writing the item to the channel but can be // cancelled by the context. If both the context is cancelled and the channel -// write would succeed, either operation will be performed randomly. +// write would succeed, either operation will be performed randomly, however +// priority is given to context cancellation. func CancellableWrite[T any](ctx context.Context, ch chan<- T, item T) error { select { case <-ctx.Done(): // priority to context cancellation @@ -28,10 +34,12 @@ func CancellableWrite[T any](ctx context.Context, ch chan<- T, item T) error { } } -// CancellableRecv blocks on receiving an item from the channel but can be -// cancelled by the context. If both the context is cancelled and the channel -// read would succeed, either operation will be performed randomly. -func CancellableRecv[T any](ctx context.Context, ch <-chan T) (T, error) { +// CancellableRead blocks on receiving an item from the channel but can be +// cancelled by the context. If the channel is closed, a ChannelClosedErr is +// returned. If both the context is cancelled and the channel read would +// succeed, either operation will be performed randomly, however priority is +// given to context cancellation. +func CancellableRead[T any](ctx context.Context, ch <-chan T) (T, error) { var zero T // zero value of type T select { @@ -41,7 +49,10 @@ func CancellableRecv[T any](ctx context.Context, ch <-chan T) (T, error) { select { case <-ctx.Done(): return zero, ctx.Err() - case item := <-ch: + case item, ok := <-ch: + if !ok { + return item, ChannelClosedErr{} + } return item, nil } }