add metrics to the pipeline (#2968)

This commit is contained in:
ahrav 2024-06-14 07:57:52 -07:00 committed by GitHub
parent 7bf3a9b5e2
commit 523a915143
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 145 additions and 4 deletions

View file

@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"runtime"
"strconv"
"sync"
"sync/atomic"
"time"
@ -726,8 +727,8 @@ func (e *Engine) scannerWorker(ctx context.Context) {
var wgVerificationOverlap sync.WaitGroup
for chunk := range e.ChunksChan() {
startTime := time.Now()
sourceVerify := chunk.Verify
atomic.AddUint64(&e.metrics.BytesScanned, uint64(len(chunk.Data)))
for _, decoder := range e.decoders {
decoded := decoder.FromChunk(chunk)
if decoded == nil {
@ -760,7 +761,23 @@ func (e *Engine) scannerWorker(ctx context.Context) {
continue
}
dataSize := float64(len(chunk.Data))
scanBytesPerChunk.Observe(dataSize)
jobBytesScanned.WithLabelValues(
strconv.Itoa(int(chunk.JobID)),
chunk.SourceType.String(),
chunk.SourceName,
).Add(dataSize)
chunksScannedLatency.Observe(float64(time.Since(startTime).Microseconds()))
jobChunksScanned.WithLabelValues(
strconv.Itoa(int(chunk.JobID)),
chunk.SourceType.String(),
chunk.SourceName,
).Inc()
atomic.AddUint64(&e.metrics.ChunksScanned, 1)
atomic.AddUint64(&e.metrics.BytesScanned, uint64(dataSize))
}
wgVerificationOverlap.Wait()
@ -950,7 +967,9 @@ func (e *Engine) verificationOverlapWorker(ctx context.Context) {
func (e *Engine) detectorWorker(ctx context.Context) {
for data := range e.detectableChunksChan {
start := time.Now()
e.detectChunk(ctx, data)
chunksDetectedLatency.Observe(float64(time.Since(start).Milliseconds()))
}
}
@ -965,19 +984,31 @@ func (e *Engine) detectChunk(ctx context.Context, data detectableChunk) {
isFalsePositive := detectors.GetFalsePositiveCheck(data.detector)
var matchCount int
// To reduce the overhead of regex calls in the detector,
// we limit the amount of data passed to each detector.
// The matches field of the DetectorMatch struct contains the
// relevant portions of the chunk data that were matched.
// This avoids the need for additional regex processing on the entire chunk data.
matchedBytes := data.detector.Matches()
for _, match := range matchedBytes {
results, err := data.detector.Detector.FromData(ctx, data.chunk.Verify, match)
matches := data.detector.Matches()
for _, matchBytes := range matches {
matchCount++
detectBytesPerMatch.Observe(float64(len(matchBytes)))
results, err := data.detector.Detector.FromData(ctx, data.chunk.Verify, matchBytes)
if err != nil {
ctx.Logger().Error(err, "error scanning chunk")
continue
}
detectorExecutionCount.WithLabelValues(
data.detector.Type().String(),
strconv.Itoa(int(data.chunk.JobID)),
data.chunk.SourceName,
).Inc()
detectorExecutionDuration.WithLabelValues(
data.detector.Type().String(),
).Observe(float64(time.Since(start).Milliseconds()))
if e.printAvgDetectorTime && len(results) > 0 {
elapsed := time.Since(start)
detectorName := results[0].DetectorType.String()
@ -999,6 +1030,9 @@ func (e *Engine) detectChunk(ctx context.Context, data detectableChunk) {
e.processResult(ctx, data, res, isFalsePositive)
}
}
matchesPerChunk.Observe(float64(matchCount))
data.wgDoneFn()
}
@ -1055,6 +1089,7 @@ func (e *Engine) processResult(
func (e *Engine) notifierWorker(ctx context.Context) {
for result := range e.ResultsChan() {
startTime := time.Now()
// Filter unwanted results, based on `--results`.
if !result.Verified {
if result.VerificationError() != nil {
@ -1095,6 +1130,8 @@ func (e *Engine) notifierWorker(ctx context.Context) {
if err := e.dispatcher.Dispatch(ctx, result); err != nil {
ctx.Logger().Error(err, "error notifying result")
}
chunksNotifiedLatency.Observe(float64(time.Since(startTime).Milliseconds()))
}
}

104
pkg/engine/metrics.go Normal file
View file

@ -0,0 +1,104 @@
package engine
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/trufflesecurity/trufflehog/v3/pkg/common"
)
var (
// Detector metrics.
detectorExecutionCount = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "detector_execution_count",
Help: "Total number of times a detector has been executed.",
},
[]string{"detector_name", "job_id", "source_name"},
)
// Note this is the time taken to call FromData on each detector, not necessarily the time taken
// to verify a credential via an API call. If the regex match within FromData does not match, the
// detector will return early. For now this is a good proxy for the time taken to verify a credential.
// TODO (ahrav)
// We can work on a more fine-grained metric later.
detectorExecutionDuration = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "detector_execution_duration",
Help: "Duration of detector execution in milliseconds.",
Buckets: prometheus.ExponentialBuckets(1, 5, 6),
},
[]string{"detector_name"},
)
jobBytesScanned = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "job_bytes_scanned",
Help: "Total number of bytes scanned for a job.",
},
[]string{"job_id", "source_type", "source_name"},
)
scanBytesPerChunk = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "scan_bytes_per_chunk",
Help: "Total number of bytes in a chunk.",
Buckets: prometheus.ExponentialBuckets(1, 2, 18),
})
jobChunksScanned = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "job_chunks_scanned",
Help: "Total number of chunks scanned for a job.",
},
[]string{"job_id", "source_type", "source_name"},
)
detectBytesPerMatch = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "detect_bytes_per_match",
Help: "Total number of bytes used to detect a credential in a match per chunk.",
Buckets: prometheus.ExponentialBuckets(1, 2, 18),
})
matchesPerChunk = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "matches_per_chunk",
Help: "Total number of matches found in a chunk.",
Buckets: prometheus.ExponentialBuckets(1, 2, 10),
})
// Metrics around latency for the different stages of the pipeline.
chunksScannedLatency = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "chunk_scanned_latency",
Help: "Time taken to scan a chunk in microseconds.",
Buckets: prometheus.ExponentialBuckets(1, 2, 22),
})
chunksDetectedLatency = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "chunk_detected_latency",
Help: "Time taken to detect a chunk in microseconds.",
Buckets: prometheus.ExponentialBuckets(50, 2, 20),
})
chunksNotifiedLatency = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "chunk_notified_latency",
Help: "Time taken to notify a chunk in milliseconds.",
Buckets: prometheus.ExponentialBuckets(5, 2, 12),
})
)