Add UnitHook and NoopHook implementations (#1930)

* Add UnitHook and NoopHook implementations

The UnitHook tracks metrics per unit of a job, and emits them on a
channel once finished. It should work even if the Source does not
support source units.

* Refactor channel to use an LRU cache instead

An LRU cache has a more favorable failure mode than the channel. With
the channel, if the consumer stopped consuming metrics, scanning would
block. With the LRU cache, metrics will be dropped when space runs out
and a log message emitted.
This commit is contained in:
Miccah 2023-10-23 14:27:01 -07:00 committed by GitHub
parent b4753a60be
commit 0b16142d4f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 336 additions and 0 deletions

1
go.mod
View file

@ -47,6 +47,7 @@ require (
github.com/h2non/filetype v1.1.3
github.com/hashicorp/go-retryablehttp v0.7.4
github.com/hashicorp/golang-lru v0.5.1
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/jlaffaye/ftp v0.2.0
github.com/joho/godotenv v1.5.1
github.com/jpillora/overseer v1.1.6

2
go.sum
View file

@ -418,6 +418,8 @@ github.com/hashicorp/go-retryablehttp v0.7.4/go.mod h1:Jy/gPYAdjqffZ/yFGCFV2doI5
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20210905161508-09a460cdf81d/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w=

View file

@ -0,0 +1,218 @@
package sources
import (
"errors"
"fmt"
"strings"
"sync"
"time"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/trufflesecurity/trufflehog/v3/pkg/context"
)
// UnitHook implements JobProgressHook for tracking the progress of each
// individual unit.
type UnitHook struct {
metrics *lru.Cache[string, *UnitMetrics]
mu sync.Mutex
NoopHook
}
type UnitHookOpt func(*UnitHook)
func WithUnitHookCache(cache *lru.Cache[string, *UnitMetrics]) UnitHookOpt {
return func(hook *UnitHook) { hook.metrics = cache }
}
func NewUnitHook(ctx context.Context, opts ...UnitHookOpt) *UnitHook {
// lru.NewWithEvict can only fail if the size is < 0.
cache, _ := lru.NewWithEvict(1024, func(key string, value *UnitMetrics) {
if value.handled {
return
}
ctx.Logger().Error(fmt.Errorf("eviction"), "dropping unit metric",
"id", key,
"metric", value,
)
})
hook := UnitHook{metrics: cache}
for _, opt := range opts {
opt(&hook)
}
return &hook
}
// id is a helper method to generate an ID for the given job and unit.
func (u *UnitHook) id(ref JobProgressRef, unit SourceUnit) string {
unitID := ""
if unit != nil {
unitID = unit.SourceUnitID()
}
return fmt.Sprintf("%d/%d/%s", ref.SourceID, ref.JobID, unitID)
}
func (u *UnitHook) StartUnitChunking(ref JobProgressRef, unit SourceUnit, start time.Time) {
id := u.id(ref, unit)
u.mu.Lock()
defer u.mu.Unlock()
u.metrics.Add(id, &UnitMetrics{
Unit: unit,
Parent: ref,
StartTime: start,
})
}
func (u *UnitHook) EndUnitChunking(ref JobProgressRef, unit SourceUnit, end time.Time) {
id := u.id(ref, unit)
u.mu.Lock()
defer u.mu.Unlock()
metrics, ok := u.metrics.Get(id)
if !ok {
return
}
metrics.EndTime = end
}
func (u *UnitHook) ReportChunk(ref JobProgressRef, unit SourceUnit, chunk *Chunk) {
id := u.id(ref, unit)
u.mu.Lock()
defer u.mu.Unlock()
metrics, ok := u.metrics.Get(id)
if !ok && unit != nil {
// The unit has been evicted.
return
} else if !ok && unit == nil {
// This is a chunk from a non-unit source.
metrics = &UnitMetrics{
Unit: nil,
Parent: ref,
StartTime: ref.Snapshot().StartTime,
}
u.metrics.Add(id, metrics)
}
metrics.TotalChunks++
metrics.TotalBytes += uint64(len(chunk.Data))
}
func (u *UnitHook) ReportError(ref JobProgressRef, err error) {
u.mu.Lock()
defer u.mu.Unlock()
// Always add the error to the nil unit if it exists.
if metrics, ok := u.metrics.Get(u.id(ref, nil)); ok {
metrics.Errors = append(metrics.Errors, err)
}
// Check if it's a ChunkError for a specific unit.
var chunkErr ChunkError
if !errors.As(err, &chunkErr) {
return
}
id := u.id(ref, chunkErr.Unit)
metrics, ok := u.metrics.Get(id)
if !ok {
return
}
metrics.Errors = append(metrics.Errors, err)
}
func (u *UnitHook) Finish(ref JobProgressRef) {
u.mu.Lock()
defer u.mu.Unlock()
// Clear out any metrics on this job. This covers the case for the
// source running without unit support.
prefix := u.id(ref, nil)
for _, id := range u.metrics.Keys() {
if !strings.HasPrefix(id, prefix) {
continue
}
metric, ok := u.metrics.Get(id)
if !ok {
continue
}
// If the unit is nil, the source does not support units.
// Use the overall job metrics instead.
if metric.Unit == nil {
snap := ref.Snapshot()
metric.StartTime = snap.StartTime
metric.EndTime = snap.EndTime
metric.Errors = snap.Errors
}
}
}
// UnitMetrics gets all the currently active or newly finished metrics for this
// job. If a unit returned from this method has finished, it will be removed
// from the cache and no longer returned in successive calls to UnitMetrics().
func (u *UnitHook) UnitMetrics() []UnitMetrics {
u.mu.Lock()
defer u.mu.Unlock()
output := make([]UnitMetrics, 0, u.metrics.Len())
for _, id := range u.metrics.Keys() {
metric, ok := u.metrics.Get(id)
if !ok {
continue
}
output = append(output, *metric)
if metric.IsFinished() {
metric.handled = true
u.metrics.Remove(id)
}
}
return output
}
type UnitMetrics struct {
Unit SourceUnit
Parent JobProgressRef
// Start and end time for chunking this unit.
StartTime time.Time
EndTime time.Time
// Total number of chunks produced from this unit.
TotalChunks uint64
// Total number of bytes produced from this unit.
TotalBytes uint64
// All errors encountered by this unit.
Errors []error
// Flag to mark that these metrics were intentionally evicted from
// the cache.
handled bool
}
func (u UnitMetrics) IsFinished() bool {
return !u.EndTime.IsZero()
}
// ElapsedTime is a convenience method that provides the elapsed time the job
// has been running. If it hasn't started yet, 0 is returned. If it has
// finished, the total time is returned.
func (u UnitMetrics) ElapsedTime() time.Duration {
if u.StartTime.IsZero() {
return 0
}
if u.EndTime.IsZero() {
return time.Since(u.StartTime)
}
return u.EndTime.Sub(u.StartTime)
}
// NoopHook implements JobProgressHook by doing nothing. This is useful for
// embedding in other structs to overwrite only the methods of the interface
// that you care about.
type NoopHook struct{}
func (NoopHook) Start(JobProgressRef, time.Time) {}
func (NoopHook) End(JobProgressRef, time.Time) {}
func (NoopHook) StartEnumerating(JobProgressRef, time.Time) {}
func (NoopHook) EndEnumerating(JobProgressRef, time.Time) {}
func (NoopHook) StartUnitChunking(JobProgressRef, SourceUnit, time.Time) {}
func (NoopHook) EndUnitChunking(JobProgressRef, SourceUnit, time.Time) {}
func (NoopHook) ReportError(JobProgressRef, error) {}
func (NoopHook) ReportUnit(JobProgressRef, SourceUnit) {}
func (NoopHook) ReportChunk(JobProgressRef, SourceUnit, *Chunk) {}
func (NoopHook) Finish(JobProgressRef) {}

View file

@ -3,8 +3,10 @@ package sources
import (
"errors"
"fmt"
"sort"
"testing"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/types/known/anypb"
@ -200,6 +202,8 @@ func (c *unitChunker) ChunkUnit(ctx context.Context, unit SourceUnit, rep ChunkR
if err := rep.ChunkErr(ctx, fmt.Errorf(step.err)); err != nil {
return err
}
}
if step.output == "" {
continue
}
if err := rep.ChunkOk(ctx, Chunk{Data: []byte(step.output)}); err != nil {
@ -310,3 +314,114 @@ func TestSourceManagerAvailableCapacity(t *testing.T) {
<-ref.Done() // Wait for the job to finish.
assert.Equal(t, 1337, mgr.AvailableCapacity())
}
func TestSourceManagerUnitHook(t *testing.T) {
hook := NewUnitHook(context.TODO())
input := []unitChunk{
{unit: "one", output: "bar"},
{unit: "two", err: "oh no"},
{unit: "three", err: "not again"},
}
mgr := NewManager(
WithBufferedOutput(8),
WithSourceUnits(), WithConcurrentUnits(1),
WithReportHook(hook),
)
source, err := buildDummy(&unitChunker{input})
assert.NoError(t, err)
ref, err := mgr.Run(context.Background(), "dummy", source)
assert.NoError(t, err)
<-ref.Done()
metrics := hook.UnitMetrics()
assert.Equal(t, 3, len(metrics))
sort.Slice(metrics, func(i, j int) bool {
return metrics[i].EndTime.Before(metrics[j].EndTime)
})
m0, m1, m2 := metrics[0], metrics[1], metrics[2]
assert.Equal(t, "one", m0.Unit.SourceUnitID())
assert.Equal(t, uint64(1), m0.TotalChunks)
assert.Equal(t, uint64(3), m0.TotalBytes)
assert.NotZero(t, m0.StartTime)
assert.NotZero(t, m0.EndTime)
assert.NotZero(t, m0.ElapsedTime())
assert.Equal(t, 0, len(m0.Errors))
assert.Equal(t, "two", m1.Unit.SourceUnitID())
assert.Equal(t, uint64(0), m1.TotalChunks)
assert.Equal(t, uint64(0), m1.TotalBytes)
assert.NotZero(t, m1.StartTime)
assert.NotZero(t, m1.EndTime)
assert.NotZero(t, m1.ElapsedTime())
assert.Equal(t, 1, len(m1.Errors))
assert.Equal(t, "three", m2.Unit.SourceUnitID())
assert.Equal(t, uint64(0), m2.TotalChunks)
assert.Equal(t, uint64(0), m2.TotalBytes)
assert.NotZero(t, m2.StartTime)
assert.NotZero(t, m2.EndTime)
assert.NotZero(t, m2.ElapsedTime())
assert.Equal(t, 1, len(m2.Errors))
}
// TestSourceManagerUnitHookNoBlock tests that the UnitHook drops metrics if
// they aren't handled fast enough.
func TestSourceManagerUnitHookNoBlock(t *testing.T) {
var evictedKeys []string
cache, _ := lru.NewWithEvict(1, func(key string, _ *UnitMetrics) {
evictedKeys = append(evictedKeys, key)
})
hook := NewUnitHook(context.TODO(), WithUnitHookCache(cache))
input := []unitChunk{
{unit: "one", output: "bar"},
{unit: "two", err: "oh no"},
{unit: "three", err: "not again"},
}
mgr := NewManager(
WithBufferedOutput(8),
WithSourceUnits(), WithConcurrentUnits(1),
WithReportHook(hook),
)
source, err := buildDummy(&unitChunker{input})
assert.NoError(t, err)
ref, err := mgr.Run(context.Background(), "dummy", source)
assert.NoError(t, err)
<-ref.Done()
assert.Equal(t, 2, len(evictedKeys))
metrics := hook.UnitMetrics()
assert.Equal(t, 1, len(metrics))
assert.Equal(t, "three", metrics[0].Unit.SourceUnitID())
}
// TestSourceManagerUnitHookNoUnits tests whether the UnitHook works for
// sources that don't support units.
func TestSourceManagerUnitHookNoUnits(t *testing.T) {
hook := NewUnitHook(context.TODO())
mgr := NewManager(
WithBufferedOutput(8),
WithReportHook(hook),
)
source, err := buildDummy(&counterChunker{count: 5})
assert.NoError(t, err)
ref, err := mgr.Run(context.Background(), "dummy", source)
assert.NoError(t, err)
<-ref.Done()
metrics := hook.UnitMetrics()
assert.Equal(t, 1, len(metrics))
m := metrics[0]
assert.Equal(t, nil, m.Unit)
assert.Equal(t, uint64(5), m.TotalChunks)
assert.Equal(t, uint64(5), m.TotalBytes)
assert.NotZero(t, m.StartTime)
assert.NotZero(t, m.EndTime)
assert.NotZero(t, m.ElapsedTime())
assert.Equal(t, 0, len(m.Errors))
}