[chore] Polish channelmetrics package (#2938)

This commit is contained in:
Miccah 2024-06-06 17:29:23 -07:00 committed by GitHub
parent 467c4232c9
commit a330aa6f53
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 66 additions and 18 deletions

View file

@ -0,0 +1,12 @@
package channelmetrics
import "time"
// noopCollector is a default implementation of the MetricsCollector interface
// for internal package use only.
type noopCollector struct{}
func (noopCollector) RecordProduceDuration(duration time.Duration) {}
func (noopCollector) RecordConsumeDuration(duration time.Duration) {}
func (noopCollector) RecordChannelLen(size int) {}
func (noopCollector) RecordChannelCap(capacity int) {}

View file

@ -33,13 +33,17 @@ type ObservableChan[T any] struct {
// distinguish between metrics for different channels by incorporating it into // distinguish between metrics for different channels by incorporating it into
// the metric names. // the metric names.
func NewObservableChan[T any](ch chan T, metrics MetricsCollector) *ObservableChan[T] { func NewObservableChan[T any](ch chan T, metrics MetricsCollector) *ObservableChan[T] {
if metrics == nil {
metrics = noopCollector{}
}
oChan := &ObservableChan[T]{ oChan := &ObservableChan[T]{
ch: ch, ch: ch,
metrics: metrics, metrics: metrics,
} }
oChan.RecordChannelCapacity() oChan.RecordChannelCapacity()
// Record the current length of the channel. // Record the current length of the channel.
// Note: The channel is likely empty, but it may contain items if it was pre-existing. // Note: The channel is likely empty, but it may contain items if it
// was pre-existing.
oChan.RecordChannelLen() oChan.RecordChannelLen()
return oChan return oChan
} }
@ -51,12 +55,13 @@ func (oc *ObservableChan[T]) Close() {
} }
// Send sends an item into the channel and records the duration taken to do so. // Send sends an item into the channel and records the duration taken to do so.
// It also updates the current size of the channel buffer. // It also updates the current size of the channel buffer. This method blocks
// This method blocks until the item is sent. // until the item is sent.
func (oc *ObservableChan[T]) Send(item T) { _ = oc.SendCtx(context.Background(), item) } func (oc *ObservableChan[T]) Send(item T) { _ = oc.SendCtx(context.Background(), item) }
// SendCtx sends an item into the channel with context and records the duration taken to do so. // SendCtx sends an item into the channel with context and records the duration
// It also updates the current size of the channel buffer and supports context cancellation. // taken to do so. It also updates the current size of the channel buffer and
// supports context cancellation.
func (oc *ObservableChan[T]) SendCtx(ctx context.Context, item T) error { func (oc *ObservableChan[T]) SendCtx(ctx context.Context, item T) error {
defer func(start time.Time) { defer func(start time.Time) {
oc.metrics.RecordProduceDuration(time.Since(start)) oc.metrics.RecordProduceDuration(time.Since(start))
@ -66,24 +71,25 @@ func (oc *ObservableChan[T]) SendCtx(ctx context.Context, item T) error {
return common.CancellableWrite(ctx, oc.ch, item) return common.CancellableWrite(ctx, oc.ch, item)
} }
// Recv receives an item from the channel and records the duration taken to do so. // Recv receives an item from the channel and records the duration taken to do
// It also updates the current size of the channel buffer. // so. It also updates the current size of the channel buffer. This method
// This method blocks until an item is available. // blocks until an item is available.
func (oc *ObservableChan[T]) Recv() T { func (oc *ObservableChan[T]) Recv() T {
v, _ := oc.RecvCtx(context.Background()) v, _ := oc.RecvCtx(context.Background())
return v return v
} }
// RecvCtx receives an item from the channel with context and records the duration taken to do so. // RecvCtx receives an item from the channel with context and records the
// It also updates the current size of the channel buffer and supports context cancellation. // duration taken to do so. It also updates the current size of the channel
// If an error occurs, it logs the error. // buffer and supports context cancellation. If an error occurs, it logs the
// error.
func (oc *ObservableChan[T]) RecvCtx(ctx context.Context) (T, error) { func (oc *ObservableChan[T]) RecvCtx(ctx context.Context) (T, error) {
defer func(start time.Time) { defer func(start time.Time) {
oc.metrics.RecordConsumeDuration(time.Since(start)) oc.metrics.RecordConsumeDuration(time.Since(start))
oc.RecordChannelLen() oc.RecordChannelLen()
}(time.Now()) }(time.Now())
return common.CancellableRecv(ctx, oc.ch) return common.CancellableRead(ctx, oc.ch)
} }
// RecordChannelCapacity records the capacity of the channel buffer. // RecordChannelCapacity records the capacity of the channel buffer.

View file

@ -118,3 +118,22 @@ func TestObservableChan_Close(t *testing.T) {
mockMetrics.AssertExpectations(t) mockMetrics.AssertExpectations(t)
} }
func TestObservableChanClosed(t *testing.T) {
t.Parallel()
ch := make(chan int)
close(ch)
oc := NewObservableChan(ch, nil)
ctx, cancel := context.WithCancel(context.Background())
// Closed channel should return with an error.
v, err := oc.RecvCtx(ctx)
assert.Error(t, err)
assert.Equal(t, 0, v)
// Cancelled context should also return with an error.
cancel()
_, err = oc.RecvCtx(ctx)
assert.Error(t, err)
}

View file

@ -2,6 +2,11 @@ package common
import "context" import "context"
// ChannelClosedErr indicates that a read was performed from a closed channel.
type ChannelClosedErr struct{}
func (ChannelClosedErr) Error() string { return "channel is closed" }
func IsDone(ctx context.Context) bool { func IsDone(ctx context.Context) bool {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -13,7 +18,8 @@ func IsDone(ctx context.Context) bool {
// CancellableWrite blocks on writing the item to the channel but can be // CancellableWrite blocks on writing the item to the channel but can be
// cancelled by the context. If both the context is cancelled and the channel // cancelled by the context. If both the context is cancelled and the channel
// write would succeed, either operation will be performed randomly. // write would succeed, either operation will be performed randomly, however
// priority is given to context cancellation.
func CancellableWrite[T any](ctx context.Context, ch chan<- T, item T) error { func CancellableWrite[T any](ctx context.Context, ch chan<- T, item T) error {
select { select {
case <-ctx.Done(): // priority to context cancellation case <-ctx.Done(): // priority to context cancellation
@ -28,10 +34,12 @@ func CancellableWrite[T any](ctx context.Context, ch chan<- T, item T) error {
} }
} }
// CancellableRecv blocks on receiving an item from the channel but can be // CancellableRead blocks on receiving an item from the channel but can be
// cancelled by the context. If both the context is cancelled and the channel // cancelled by the context. If the channel is closed, a ChannelClosedErr is
// read would succeed, either operation will be performed randomly. // returned. If both the context is cancelled and the channel read would
func CancellableRecv[T any](ctx context.Context, ch <-chan T) (T, error) { // succeed, either operation will be performed randomly, however priority is
// given to context cancellation.
func CancellableRead[T any](ctx context.Context, ch <-chan T) (T, error) {
var zero T // zero value of type T var zero T // zero value of type T
select { select {
@ -41,7 +49,10 @@ func CancellableRecv[T any](ctx context.Context, ch <-chan T) (T, error) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return zero, ctx.Err() return zero, ctx.Err()
case item := <-ch: case item, ok := <-ch:
if !ok {
return item, ChannelClosedErr{}
}
return item, nil return item, nil
} }
} }