add metrics to s3 scan

This commit is contained in:
Ahrav Dutta 2024-11-08 14:19:39 -08:00
parent c5c48e04c1
commit 9722ad7a41
2 changed files with 116 additions and 1 deletions

97
pkg/sources/s3/metrics.go Normal file
View file

@ -0,0 +1,97 @@
package s3
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/trufflesecurity/trufflehog/v3/pkg/common"
)
// metricsCollector defines the interface for recording S3 scan metrics.
type metricsCollector interface {
// Object metrics.
RecordObjectScanned(bucket string)
RecordObjectSkipped(bucket, reason string)
RecordObjectError(bucket string)
// Role metrics.
RecordRoleScanned(roleArn string)
RecordBucketForRole(roleArn string)
}
type collector struct {
objectsScanned *prometheus.CounterVec
objectsSkipped *prometheus.CounterVec
objectsErrors *prometheus.CounterVec
rolesScanned *prometheus.GaugeVec
bucketsPerRole *prometheus.GaugeVec
}
func newS3MetricsCollector() metricsCollector {
return &collector{
objectsScanned: promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "objects_scanned_total",
Help: "Total number of S3 objects successfully scanned",
}, []string{"bucket"}),
objectsSkipped: promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "objects_skipped_total",
Help: "Total number of S3 objects skipped during scan",
}, []string{"bucket", "reason"}),
objectsErrors: promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "objects_errors_total",
Help: "Total number of errors encountered during S3 scan",
}, []string{"bucket"}),
rolesScanned: promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "roles_scanned",
Help: "Number of AWS roles being scanned",
}, []string{"role_arn"}),
bucketsPerRole: promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "buckets_per_role",
Help: "Number of buckets accessible per AWS role",
}, []string{"role_arn"}),
}
}
func (c *collector) RecordObjectScanned(bucket string) {
c.objectsScanned.WithLabelValues(bucket).Inc()
}
func (c *collector) RecordObjectSkipped(bucket, reason string) {
c.objectsSkipped.WithLabelValues(bucket, reason).Inc()
}
func (c *collector) RecordObjectError(bucket string) {
c.objectsErrors.WithLabelValues(bucket).Inc()
}
const defaultRoleARN = "default"
func (c *collector) RecordRoleScanned(roleArn string) {
if roleArn == "" {
roleArn = defaultRoleARN
}
c.rolesScanned.WithLabelValues(roleArn).Set(1)
}
func (c *collector) RecordBucketForRole(roleArn string) {
if roleArn == "" {
roleArn = defaultRoleARN
}
c.bucketsPerRole.WithLabelValues(roleArn).Inc()
}

View file

@ -48,6 +48,7 @@ type Source struct {
progressTracker *ProgressTracker
sources.Progress
metricsCollector metricsCollector
errorCount *sync.Map
jobPool *errgroup.Group
@ -98,6 +99,7 @@ func (s *Source) Init(
if err != nil {
return err
}
s.metricsCollector = newS3MetricsCollector()
s.setMaxObjectSize(conn.GetMaxObjectSize())
@ -282,6 +284,8 @@ func (s *Source) scanBuckets(
bucketsToScanCount := len(bucketsToScan)
for i := startIdx; i < bucketsToScanCount; i++ {
s.metricsCollector.RecordBucketForRole(role)
bucket := bucketsToScan[i]
ctx := context.WithValue(ctx, "bucket", bucket)
@ -413,6 +417,7 @@ func (s *Source) pageChunker(
for objIdx, obj := range metadata.page.Contents {
if obj == nil {
s.metricsCollector.RecordObjectSkipped(metadata.bucket, "nil_object")
if err := s.progressTracker.UpdateObjectProgress(ctx, objIdx, metadata.bucket, metadata.page.Contents); err != nil {
ctx.Logger().Error(err, "could not update progress for nil object")
}
@ -429,6 +434,7 @@ func (s *Source) pageChunker(
// Skip GLACIER and GLACIER_IR objects.
if obj.StorageClass == nil || strings.Contains(*obj.StorageClass, "GLACIER") {
ctx.Logger().V(5).Info("Skipping object in storage class", "storage_class", *obj.StorageClass)
s.metricsCollector.RecordObjectSkipped(metadata.bucket, "storage_class")
if err := s.progressTracker.UpdateObjectProgress(ctx, objIdx, metadata.bucket, metadata.page.Contents); err != nil {
ctx.Logger().Error(err, "could not update progress for glacier object")
}
@ -438,6 +444,7 @@ func (s *Source) pageChunker(
// Ignore large files.
if *obj.Size > s.maxObjectSize {
ctx.Logger().V(5).Info("Skipping %d byte file (over maxObjectSize limit)")
s.metricsCollector.RecordObjectSkipped(metadata.bucket, "size_limit")
if err := s.progressTracker.UpdateObjectProgress(ctx, objIdx, metadata.bucket, metadata.page.Contents); err != nil {
ctx.Logger().Error(err, "could not update progress for large file")
}
@ -447,6 +454,7 @@ func (s *Source) pageChunker(
// File empty file.
if *obj.Size == 0 {
ctx.Logger().V(5).Info("Skipping empty file")
s.metricsCollector.RecordObjectSkipped(metadata.bucket, "empty_file")
if err := s.progressTracker.UpdateObjectProgress(ctx, objIdx, metadata.bucket, metadata.page.Contents); err != nil {
ctx.Logger().Error(err, "could not update progress for empty file")
}
@ -456,6 +464,7 @@ func (s *Source) pageChunker(
// Skip incompatible extensions.
if common.SkipFile(*obj.Key) {
ctx.Logger().V(5).Info("Skipping file with incompatible extension")
s.metricsCollector.RecordObjectSkipped(metadata.bucket, "incompatible_extension")
if err := s.progressTracker.UpdateObjectProgress(ctx, objIdx, metadata.bucket, metadata.page.Contents); err != nil {
ctx.Logger().Error(err, "could not update progress for incompatible file")
}
@ -471,6 +480,7 @@ func (s *Source) pageChunker(
if strings.HasSuffix(*obj.Key, "/") {
ctx.Logger().V(5).Info("Skipping directory")
s.metricsCollector.RecordObjectSkipped(metadata.bucket, "directory")
return nil
}
@ -496,8 +506,12 @@ func (s *Source) pageChunker(
Key: obj.Key,
})
if err != nil {
if !strings.Contains(err.Error(), "AccessDenied") {
if strings.Contains(err.Error(), "AccessDenied") {
ctx.Logger().Error(err, "could not get S3 object; access denied")
s.metricsCollector.RecordObjectSkipped(metadata.bucket, "access_denied")
} else {
ctx.Logger().Error(err, "could not get S3 object")
s.metricsCollector.RecordObjectError(metadata.bucket)
}
// According to the documentation for GetObjectWithContext,
// the response can be non-nil even if there was an error.
@ -551,6 +565,7 @@ func (s *Source) pageChunker(
if err := handlers.HandleFile(ctx, res.Body, chunkSkel, sources.ChanReporter{Ch: chunksChan}); err != nil {
ctx.Logger().Error(err, "error handling file")
s.metricsCollector.RecordObjectError(metadata.bucket)
return nil
}
@ -568,6 +583,7 @@ func (s *Source) pageChunker(
if err := s.progressTracker.UpdateObjectProgress(ctx, objIdx, metadata.bucket, metadata.page.Contents); err != nil {
ctx.Logger().Error(err, "could not update progress for scanned object")
}
s.metricsCollector.RecordObjectScanned(metadata.bucket)
return nil
})
@ -629,6 +645,8 @@ func (s *Source) visitRoles(
}
for _, role := range roles {
s.metricsCollector.RecordRoleScanned(role)
client, err := s.newClient(defaultAWSRegion, role)
if err != nil {
return fmt.Errorf("could not create s3 client: %w", err)