diff --git a/pkg/sources/s3/s3.go b/pkg/sources/s3/s3.go index 1ef54391b..b43d25000 100644 --- a/pkg/sources/s3/s3.go +++ b/pkg/sources/s3/s3.go @@ -131,6 +131,7 @@ func (s *Source) setMaxObjectSize(maxObjectSize int64) { func (s *Source) newClient(region, roleArn string) (*s3.S3, error) { cfg := aws.NewConfig() cfg.CredentialsChainVerboseErrors = aws.Bool(true) + cfg.LogLevel = aws.LogLevel(aws.LogDebugWithRequestErrors) cfg.Region = aws.String(region) switch cred := s.conn.GetCredential().(type) { @@ -271,38 +272,54 @@ func (s *Source) getRegionalClientForBucket(ctx context.Context, defaultRegionCl } // pageChunker emits chunks onto the given channel from a page -func (s *Source) pageChunker(ctx context.Context, client *s3.S3, chunksChan chan *sources.Chunk, bucket string, page *s3.ListObjectsV2Output, errorCount *sync.Map, pageNumber int, objectCount *uint64) { +func (s *Source) pageChunker( + ctx context.Context, + client *s3.S3, + chunksChan chan *sources.Chunk, + bucket string, + page *s3.ListObjectsV2Output, + errorCount *sync.Map, + pageNumber int, + objectCount *uint64, +) { for _, obj := range page.Contents { - obj := obj - if common.IsDone(ctx) { - return - } - if obj == nil { continue } - // skip GLACIER and GLACIER_IR objects + ctx = context.WithValues( + ctx, + "key", *obj.Key, + "bucket", bucket, + "page", pageNumber, + "size", *obj.Size, + ) + + if common.IsDone(ctx) { + return + } + + // Skip GLACIER and GLACIER_IR objects. if obj.StorageClass == nil || strings.Contains(*obj.StorageClass, "GLACIER") { - s.log.V(5).Info("Skipping object in storage class", "storage_class", *obj.StorageClass, "object", *obj.Key) + ctx.Logger().V(5).Info("Skipping object in storage class", "storage_class", *obj.StorageClass) continue } - // ignore large files + // Ignore large files. if *obj.Size > s.maxObjectSize { - s.log.V(5).Info("Skipping %d byte file (over maxObjectSize limit)", "object", *obj.Key) + ctx.Logger().V(5).Info("Skipping %d byte file (over maxObjectSize limit)") continue } - // file empty file + // File empty file. if *obj.Size == 0 { - s.log.V(5).Info("Skipping 0 byte file", "object", *obj.Key) + ctx.Logger().V(5).Info("Skipping empty file") continue } - // skip incompatible extensions + // Skip incompatible extensions. if common.SkipFile(*obj.Key) { - s.log.V(5).Info("Skipping file with incompatible extension", "object", *obj.Key) + ctx.Logger().V(5).Info("Skipping file with incompatible extension") continue } @@ -310,7 +327,7 @@ func (s *Source) pageChunker(ctx context.Context, client *s3.S3, chunksChan chan defer common.RecoverWithExit(ctx) if strings.HasSuffix(*obj.Key, "/") { - s.log.V(5).Info("Skipping directory", "object", *obj.Key) + ctx.Logger().V(5).Info("Skipping directory") return nil } @@ -322,14 +339,14 @@ func (s *Source) pageChunker(ctx context.Context, client *s3.S3, chunksChan chan nErr = 0 } if nErr.(int) > 3 { - s.log.V(2).Info("Skipped due to excessive errors", "object", *obj.Key) + ctx.Logger().V(2).Info("Skipped due to excessive errors") return nil } // Use an anonymous function to retrieve the S3 object with a dedicated timeout context. // This ensures that the timeout is isolated and does not affect any downstream operations. (e.g. HandleFile) getObject := func() (*s3.GetObjectOutput, error) { - const getObjectTimeout = 5 * time.Second + const getObjectTimeout = 30 * time.Second objCtx, cancel := context.WithTimeout(ctx, getObjectTimeout) defer cancel() @@ -342,7 +359,14 @@ func (s *Source) pageChunker(ctx context.Context, client *s3.S3, chunksChan chan res, err := getObject() if err != nil { if !strings.Contains(err.Error(), "AccessDenied") { - s.log.Error(err, "could not get S3 object", "object", *obj.Key) + ctx.Logger().Error(err, "could not get S3 object") + } + // According to the documentation for GetObjectWithContext, + // the response can be non-nil even if there was an error. + // It's uncertain if the body will be nil in such cases, + // but we'll close it if it's not. + if res != nil && res.Body != nil { + res.Body.Close() } nErr, ok := errorCount.Load(prefix) @@ -350,14 +374,14 @@ func (s *Source) pageChunker(ctx context.Context, client *s3.S3, chunksChan chan nErr = 0 } if nErr.(int) > 3 { - s.log.V(3).Info("Skipped due to excessive errors", "object", *obj.Key) + ctx.Logger().V(3).Info("Skipped due to excessive errors") return nil } nErr = nErr.(int) + 1 errorCount.Store(prefix, nErr) // too many consecutive errors on this page if nErr.(int) > 3 { - s.log.V(2).Info("Too many consecutive errors, excluding prefix", "prefix", prefix) + ctx.Logger().V(2).Info("Too many consecutive errors, excluding prefix", "prefix", prefix) } return nil } @@ -393,7 +417,7 @@ func (s *Source) pageChunker(ctx context.Context, client *s3.S3, chunksChan chan } atomic.AddUint64(objectCount, 1) - s.log.V(5).Info("S3 object scanned.", "object_count", objectCount, "page_number", pageNumber) + ctx.Logger().V(5).Info("S3 object scanned.", "object_count", objectCount) nErr, ok = errorCount.Load(prefix) if !ok { nErr = 0