syft/internal/task/executor.go
Alex Goodman 0a3f513f92
Slim down docker cache size (#3190)
* slim down docker cache size

Signed-off-by: Alex Goodman <wagoodman@users.noreply.github.com>

* remove old centos images

Signed-off-by: Alex Goodman <wagoodman@users.noreply.github.com>

* troubleshoot test failure

Signed-off-by: Alex Goodman <wagoodman@users.noreply.github.com>

* fix wget version ref

Signed-off-by: Alex Goodman <wagoodman@users.noreply.github.com>

* refactor caching mechanisms

Signed-off-by: Alex Goodman <wagoodman@users.noreply.github.com>

* add cache cleanup steps

Signed-off-by: Alex Goodman <wagoodman@users.noreply.github.com>

* simplify deleting cache

Signed-off-by: Alex Goodman <wagoodman@users.noreply.github.com>

* fix first clone issue

Signed-off-by: Alex Goodman <wagoodman@users.noreply.github.com>

* add tool dep

Signed-off-by: Alex Goodman <wagoodman@users.noreply.github.com>

---------

Signed-off-by: Alex Goodman <wagoodman@users.noreply.github.com>
2024-09-09 11:15:13 -04:00

77 lines
1.5 KiB
Go

package task
import (
"context"
"fmt"
"runtime/debug"
"sync"
"time"
"github.com/hashicorp/go-multierror"
"github.com/anchore/syft/internal/log"
"github.com/anchore/syft/internal/sbomsync"
"github.com/anchore/syft/syft/event/monitor"
"github.com/anchore/syft/syft/file"
)
type Executor struct {
numWorkers int
tasks chan Task
}
func NewTaskExecutor(tasks []Task, numWorkers int) *Executor {
p := &Executor{
numWorkers: numWorkers,
tasks: make(chan Task, len(tasks)),
}
for i := range tasks {
p.tasks <- tasks[i]
}
close(p.tasks)
return p
}
func (p *Executor) Execute(ctx context.Context, resolver file.Resolver, s sbomsync.Builder, prog *monitor.CatalogerTaskProgress) error {
var errs error
wg := &sync.WaitGroup{}
for i := 0; i < p.numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
tsk, ok := <-p.tasks
if !ok {
return
}
if err := runTaskSafely(ctx, tsk, resolver, s); err != nil {
errs = multierror.Append(errs, fmt.Errorf("failed to run task: %w", err))
prog.SetError(err)
}
prog.Increment()
}
}()
}
wg.Wait()
return errs
}
func runTaskSafely(ctx context.Context, t Task, resolver file.Resolver, s sbomsync.Builder) (err error) {
// handle individual cataloger panics
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("%v at:\n%s", e, string(debug.Stack()))
}
}()
start := time.Now()
res := t.Execute(ctx, resolver, s)
log.WithFields("task", t.Name(), "elapsed", time.Since(start)).Info("task completed")
return res
}