increase timeout to 30s (#3422)

* increase timeout to 30s

* add debug

* use context logger and include size in logs

* close reader

* address comments
This commit is contained in:
ahrav 2024-10-18 16:13:03 -07:00 committed by GitHub
parent 88b8c862a6
commit 6e055ea578
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -131,6 +131,7 @@ func (s *Source) setMaxObjectSize(maxObjectSize int64) {
func (s *Source) newClient(region, roleArn string) (*s3.S3, error) {
cfg := aws.NewConfig()
cfg.CredentialsChainVerboseErrors = aws.Bool(true)
cfg.LogLevel = aws.LogLevel(aws.LogDebugWithRequestErrors)
cfg.Region = aws.String(region)
switch cred := s.conn.GetCredential().(type) {
@ -271,38 +272,54 @@ func (s *Source) getRegionalClientForBucket(ctx context.Context, defaultRegionCl
}
// pageChunker emits chunks onto the given channel from a page
func (s *Source) pageChunker(ctx context.Context, client *s3.S3, chunksChan chan *sources.Chunk, bucket string, page *s3.ListObjectsV2Output, errorCount *sync.Map, pageNumber int, objectCount *uint64) {
func (s *Source) pageChunker(
ctx context.Context,
client *s3.S3,
chunksChan chan *sources.Chunk,
bucket string,
page *s3.ListObjectsV2Output,
errorCount *sync.Map,
pageNumber int,
objectCount *uint64,
) {
for _, obj := range page.Contents {
obj := obj
if common.IsDone(ctx) {
return
}
if obj == nil {
continue
}
// skip GLACIER and GLACIER_IR objects
ctx = context.WithValues(
ctx,
"key", *obj.Key,
"bucket", bucket,
"page", pageNumber,
"size", *obj.Size,
)
if common.IsDone(ctx) {
return
}
// Skip GLACIER and GLACIER_IR objects.
if obj.StorageClass == nil || strings.Contains(*obj.StorageClass, "GLACIER") {
s.log.V(5).Info("Skipping object in storage class", "storage_class", *obj.StorageClass, "object", *obj.Key)
ctx.Logger().V(5).Info("Skipping object in storage class", "storage_class", *obj.StorageClass)
continue
}
// ignore large files
// Ignore large files.
if *obj.Size > s.maxObjectSize {
s.log.V(5).Info("Skipping %d byte file (over maxObjectSize limit)", "object", *obj.Key)
ctx.Logger().V(5).Info("Skipping %d byte file (over maxObjectSize limit)")
continue
}
// file empty file
// File empty file.
if *obj.Size == 0 {
s.log.V(5).Info("Skipping 0 byte file", "object", *obj.Key)
ctx.Logger().V(5).Info("Skipping empty file")
continue
}
// skip incompatible extensions
// Skip incompatible extensions.
if common.SkipFile(*obj.Key) {
s.log.V(5).Info("Skipping file with incompatible extension", "object", *obj.Key)
ctx.Logger().V(5).Info("Skipping file with incompatible extension")
continue
}
@ -310,7 +327,7 @@ func (s *Source) pageChunker(ctx context.Context, client *s3.S3, chunksChan chan
defer common.RecoverWithExit(ctx)
if strings.HasSuffix(*obj.Key, "/") {
s.log.V(5).Info("Skipping directory", "object", *obj.Key)
ctx.Logger().V(5).Info("Skipping directory")
return nil
}
@ -322,14 +339,14 @@ func (s *Source) pageChunker(ctx context.Context, client *s3.S3, chunksChan chan
nErr = 0
}
if nErr.(int) > 3 {
s.log.V(2).Info("Skipped due to excessive errors", "object", *obj.Key)
ctx.Logger().V(2).Info("Skipped due to excessive errors")
return nil
}
// Use an anonymous function to retrieve the S3 object with a dedicated timeout context.
// This ensures that the timeout is isolated and does not affect any downstream operations. (e.g. HandleFile)
getObject := func() (*s3.GetObjectOutput, error) {
const getObjectTimeout = 5 * time.Second
const getObjectTimeout = 30 * time.Second
objCtx, cancel := context.WithTimeout(ctx, getObjectTimeout)
defer cancel()
@ -342,7 +359,14 @@ func (s *Source) pageChunker(ctx context.Context, client *s3.S3, chunksChan chan
res, err := getObject()
if err != nil {
if !strings.Contains(err.Error(), "AccessDenied") {
s.log.Error(err, "could not get S3 object", "object", *obj.Key)
ctx.Logger().Error(err, "could not get S3 object")
}
// According to the documentation for GetObjectWithContext,
// the response can be non-nil even if there was an error.
// It's uncertain if the body will be nil in such cases,
// but we'll close it if it's not.
if res != nil && res.Body != nil {
res.Body.Close()
}
nErr, ok := errorCount.Load(prefix)
@ -350,14 +374,14 @@ func (s *Source) pageChunker(ctx context.Context, client *s3.S3, chunksChan chan
nErr = 0
}
if nErr.(int) > 3 {
s.log.V(3).Info("Skipped due to excessive errors", "object", *obj.Key)
ctx.Logger().V(3).Info("Skipped due to excessive errors")
return nil
}
nErr = nErr.(int) + 1
errorCount.Store(prefix, nErr)
// too many consecutive errors on this page
if nErr.(int) > 3 {
s.log.V(2).Info("Too many consecutive errors, excluding prefix", "prefix", prefix)
ctx.Logger().V(2).Info("Too many consecutive errors, excluding prefix", "prefix", prefix)
}
return nil
}
@ -393,7 +417,7 @@ func (s *Source) pageChunker(ctx context.Context, client *s3.S3, chunksChan chan
}
atomic.AddUint64(objectCount, 1)
s.log.V(5).Info("S3 object scanned.", "object_count", objectCount, "page_number", pageNumber)
ctx.Logger().V(5).Info("S3 object scanned.", "object_count", objectCount)
nErr, ok = errorCount.Load(prefix)
if !ok {
nErr = 0