move concurrency (#3135)

This commit is contained in:
ahrav 2024-07-31 18:58:18 -07:00 committed by GitHub
parent fd257350dd
commit 048ec26c92
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -101,62 +101,61 @@ func (s *Source) Chunks(ctx context.Context, chunksChan chan *sources.Chunk, _ .
workers := new(errgroup.Group)
workers.SetLimit(s.concurrency)
scanErrs := sources.NewScanErrors()
for _, image := range s.conn.GetImages() {
image := image
workers.Go(func() error {
if common.IsDone(ctx) {
if common.IsDone(ctx) {
return nil
}
imgInfo, err := s.processImage(ctx, image)
if err != nil {
ctx.Logger().Error(err, "error processing image", "image", image)
return nil
}
ctx = context.WithValues(ctx, "image", imgInfo.base, "tag", imgInfo.tag)
ctx.Logger().V(2).Info("scanning image history")
historyEntries, err := getHistoryEntries(ctx, imgInfo)
if err != nil {
ctx.Logger().Error(err, "error getting image history entries")
return nil
}
for _, historyEntry := range historyEntries {
if err := s.processHistoryEntry(ctx, historyEntry, chunksChan); err != nil {
ctx.Logger().Error(err, "error processing history entry")
return nil
}
dockerHistoryEntriesScanned.WithLabelValues(s.name).Inc()
}
imgInfo, err := s.processImage(ctx, image)
if err != nil {
scanErrs.Add(err)
return nil
}
ctx.Logger().V(2).Info("scanning image layers")
ctx = context.WithValues(ctx, "image", imgInfo.base, "tag", imgInfo.tag)
layers, err := imgInfo.image.Layers()
if err != nil {
ctx.Logger().Error(err, "error getting image layers")
return nil
}
ctx.Logger().V(2).Info("scanning image history")
historyEntries, err := getHistoryEntries(ctx, imgInfo)
if err != nil {
scanErrs.Add(err)
return nil
}
for _, historyEntry := range historyEntries {
if err := s.processHistoryEntry(ctx, historyEntry, chunksChan); err != nil {
scanErrs.Add(err)
return nil
}
dockerHistoryEntriesScanned.WithLabelValues(s.name).Inc()
}
ctx.Logger().V(2).Info("scanning image layers")
layers, err := imgInfo.image.Layers()
if err != nil {
scanErrs.Add(err)
return nil
}
for _, layer := range layers {
for _, layer := range layers {
workers.Go(func() error {
if err := s.processLayer(ctx, layer, imgInfo, chunksChan); err != nil {
scanErrs.Add(err)
ctx.Logger().Error(err, "error processing layer")
return nil
}
dockerLayersScanned.WithLabelValues(s.name).Inc()
}
dockerImagesScanned.WithLabelValues(s.name).Inc()
return nil
})
}
if err := workers.Wait(); err != nil {
ctx.Logger().Error(err, "error processing layers")
return nil
})
}
_ = workers.Wait()
if scanErrs.Count() > 0 {
ctx.Logger().V(2).Info("scan errors", "errors", scanErrs.String())
}
dockerImagesScanned.WithLabelValues(s.name).Inc()
}
return nil