[perf] - Leverage pgzip for Parallel decompression (#3149)

This commit is contained in:
ahrav 2024-08-02 04:11:10 -07:00 committed by GitHub
parent 7d606e2480
commit fba1a8b410
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 18 additions and 16 deletions

2
go.mod
View file

@ -225,7 +225,7 @@ require (
github.com/jpillora/s3 v1.1.4 // indirect
github.com/kevinburke/ssh_config v1.2.0 // indirect
github.com/kjk/lzma v0.0.0-20161016003348-3fd93898850d // indirect
github.com/klauspost/compress v1.17.8 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/klauspost/pgzip v1.2.6 // indirect
github.com/lucasb-eyer/go-colorful v1.2.0 // indirect

2
go.sum
View file

@ -500,6 +500,8 @@ github.com/kjk/lzma v0.0.0-20161016003348-3fd93898850d/go.mod h1:phT/jsRPBAEqjAi
github.com/klauspost/compress v1.4.1/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/klauspost/cpuid v1.2.0/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg=
github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=

View file

@ -2,7 +2,6 @@ package docker
import (
"archive/tar"
"compress/gzip"
"errors"
"fmt"
"io"
@ -13,6 +12,7 @@ import (
v1 "github.com/google/go-containerregistry/pkg/v1"
"github.com/google/go-containerregistry/pkg/v1/remote"
"github.com/google/go-containerregistry/pkg/v1/tarball"
gzip "github.com/klauspost/pgzip"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
@ -116,7 +116,13 @@ func (s *Source) Chunks(ctx context.Context, chunksChan chan *sources.Chunk, _ .
ctx.Logger().V(2).Info("scanning image history")
historyEntries, err := getHistoryEntries(ctx, imgInfo)
layers, err := imgInfo.image.Layers()
if err != nil {
ctx.Logger().Error(err, "error getting image layers")
return nil
}
historyEntries, err := getHistoryEntries(ctx, imgInfo, layers)
if err != nil {
ctx.Logger().Error(err, "error getting image history entries")
return nil
@ -132,12 +138,6 @@ func (s *Source) Chunks(ctx context.Context, chunksChan chan *sources.Chunk, _ .
ctx.Logger().V(2).Info("scanning image layers")
layers, err := imgInfo.image.Layers()
if err != nil {
ctx.Logger().Error(err, "error getting image layers")
return nil
}
for _, layer := range layers {
workers.Go(func() error {
if err := s.processLayer(ctx, layer, imgInfo, chunksChan); err != nil {
@ -207,17 +207,12 @@ func (s *Source) processImage(ctx context.Context, image string) (imageInfo, err
// getHistoryEntries collates an image's configuration history together with the
// corresponding layer digests for any non-empty layers.
func getHistoryEntries(ctx context.Context, imgInfo imageInfo) ([]historyEntryInfo, error) {
func getHistoryEntries(ctx context.Context, imgInfo imageInfo, layers []v1.Layer) ([]historyEntryInfo, error) {
config, err := imgInfo.image.ConfigFile()
if err != nil {
return nil, err
}
layers, err := imgInfo.image.Layers()
if err != nil {
return nil, err
}
history := config.History
entries := make([]historyEntryInfo, len(history))
@ -306,7 +301,12 @@ func (s *Source) processLayer(ctx context.Context, layer v1.Layer, imgInfo image
}
defer rc.Close()
gzipReader, err := gzip.NewReader(rc)
const (
defaultBlockSize = 1 << 24 // 16MB
defaultBlocks = 8
)
gzipReader, err := gzip.NewReaderN(rc, defaultBlockSize, defaultBlocks)
if err != nil {
return err
}