From 048ec26c9261f70732d4403df2f45561c41c8161 Mon Sep 17 00:00:00 2001 From: ahrav Date: Wed, 31 Jul 2024 18:58:18 -0700 Subject: [PATCH] move concurrency (#3135) --- pkg/sources/docker/docker.go | 85 ++++++++++++++++++------------------ 1 file changed, 42 insertions(+), 43 deletions(-) diff --git a/pkg/sources/docker/docker.go b/pkg/sources/docker/docker.go index d5a01c909..fbf0baac1 100644 --- a/pkg/sources/docker/docker.go +++ b/pkg/sources/docker/docker.go @@ -101,62 +101,61 @@ func (s *Source) Chunks(ctx context.Context, chunksChan chan *sources.Chunk, _ . workers := new(errgroup.Group) workers.SetLimit(s.concurrency) - scanErrs := sources.NewScanErrors() for _, image := range s.conn.GetImages() { - image := image - workers.Go(func() error { - if common.IsDone(ctx) { + if common.IsDone(ctx) { + return nil + } + + imgInfo, err := s.processImage(ctx, image) + if err != nil { + ctx.Logger().Error(err, "error processing image", "image", image) + return nil + } + + ctx = context.WithValues(ctx, "image", imgInfo.base, "tag", imgInfo.tag) + + ctx.Logger().V(2).Info("scanning image history") + + historyEntries, err := getHistoryEntries(ctx, imgInfo) + if err != nil { + ctx.Logger().Error(err, "error getting image history entries") + return nil + } + + for _, historyEntry := range historyEntries { + if err := s.processHistoryEntry(ctx, historyEntry, chunksChan); err != nil { + ctx.Logger().Error(err, "error processing history entry") return nil } + dockerHistoryEntriesScanned.WithLabelValues(s.name).Inc() + } - imgInfo, err := s.processImage(ctx, image) - if err != nil { - scanErrs.Add(err) - return nil - } + ctx.Logger().V(2).Info("scanning image layers") - ctx = context.WithValues(ctx, "image", imgInfo.base, "tag", imgInfo.tag) + layers, err := imgInfo.image.Layers() + if err != nil { + ctx.Logger().Error(err, "error getting image layers") + return nil + } - ctx.Logger().V(2).Info("scanning image history") - - historyEntries, err := getHistoryEntries(ctx, imgInfo) - if err != nil { - scanErrs.Add(err) - return nil - } - - for _, historyEntry := range historyEntries { - if err := s.processHistoryEntry(ctx, historyEntry, chunksChan); err != nil { - scanErrs.Add(err) - return nil - } - dockerHistoryEntriesScanned.WithLabelValues(s.name).Inc() - } - - ctx.Logger().V(2).Info("scanning image layers") - - layers, err := imgInfo.image.Layers() - if err != nil { - scanErrs.Add(err) - return nil - } - - for _, layer := range layers { + for _, layer := range layers { + workers.Go(func() error { if err := s.processLayer(ctx, layer, imgInfo, chunksChan); err != nil { - scanErrs.Add(err) + ctx.Logger().Error(err, "error processing layer") return nil } dockerLayersScanned.WithLabelValues(s.name).Inc() - } - dockerImagesScanned.WithLabelValues(s.name).Inc() + return nil + }) + } + if err := workers.Wait(); err != nil { + ctx.Logger().Error(err, "error processing layers") return nil - }) - } - _ = workers.Wait() - if scanErrs.Count() > 0 { - ctx.Logger().V(2).Info("scan errors", "errors", scanErrs.String()) + } + + dockerImagesScanned.WithLabelValues(s.name).Inc() } return nil