Fix bug in chunker that surfaces with a flaky passed in io.Reader (#1838)

* Fix bug in chunker that surfaces with a flaky passed in io.Reader

The chunker was previously expecting the passed in io.Reader to always
successfully read a full buffer of data, however it's valid for a Reader
to return less data than requested. When this happens, the chunker would
peek the same data that it then reads in the next iteration of the loop,
causing the same data to be scanned twice.

Co-authored-by: ahrav <ahravdutta02@gmail.com>

* Fix EOF error check

* Use io.ReadFull in Chunker

---------

Co-authored-by: ahrav <ahravdutta02@gmail.com>
This commit is contained in:
Miccah 2023-10-02 09:38:23 -07:00 committed by GitHub
parent a750b8ef2b
commit 0d451aa806
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 41 additions and 7 deletions

View file

@ -34,7 +34,7 @@ func Chunker(originalChunk *Chunk) chan *Chunk {
chunkBytes := make([]byte, TotalChunkSize)
chunk := *originalChunk
chunkBytes = chunkBytes[:ChunkSize]
n, err := reader.Read(chunkBytes)
n, err := io.ReadFull(reader, chunkBytes)
if n > 0 {
peekData, _ := reader.Peek(TotalChunkSize - n)
chunkBytes = append(chunkBytes[:n], peekData...)
@ -135,7 +135,7 @@ func readInChunks(ctx context.Context, reader io.Reader, config *chunkReaderConf
chunkRes := ChunkResult{}
chunkBytes := make([]byte, config.totalSize)
chunkBytes = chunkBytes[:config.chunkSize]
n, err := chunkReader.Read(chunkBytes)
n, err := io.ReadFull(chunkReader, chunkBytes)
if n > 0 {
peekData, _ := chunkReader.Peek(config.totalSize - n)
chunkBytes = append(chunkBytes[:n], peekData...)
@ -143,14 +143,16 @@ func readInChunks(ctx context.Context, reader io.Reader, config *chunkReaderConf
}
// If there is an error other than EOF, or if we have read some bytes, send the chunk.
if err != nil && !errors.Is(err, io.EOF) || n > 0 {
if err != nil && !errors.Is(err, io.EOF) {
ctx.Logger().Error(err, "error reading chunk")
chunkRes.err = err
}
// io.ReadFull will only return io.EOF when n == 0.
if isErrAndNotEOF(err) {
ctx.Logger().Error(err, "error reading chunk")
chunkRes.err = err
chunkResultChan <- chunkRes
} else if n > 0 {
chunkResultChan <- chunkRes
}
// Return on any type of error.
if err != nil {
return
}
@ -158,3 +160,14 @@ func readInChunks(ctx context.Context, reader io.Reader, config *chunkReaderConf
}()
return chunkResultChan
}
// reportableErr checks whether the error is one we are interested in flagging.
func isErrAndNotEOF(err error) bool {
if err == nil {
return false
}
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
return false
}
return true
}

View file

@ -5,6 +5,7 @@ import (
"io"
"strings"
"testing"
"testing/iotest"
diskbufferreader "github.com/bill-rich/disk-buffer-reader"
"github.com/stretchr/testify/assert"
@ -196,3 +197,23 @@ func BenchmarkChunkReader(b *testing.B) {
assert.Nil(b, err)
}
}
func TestFlakyChunkReader(t *testing.T) {
a := "aaaa"
b := "bbbb"
reader := iotest.OneByteReader(strings.NewReader(a + b))
chunkReader := NewChunkReader()
chunkResChan := chunkReader(context.TODO(), reader)
var chunks []ChunkResult
for chunk := range chunkResChan {
chunks = append(chunks, chunk)
}
assert.Equal(t, 1, len(chunks))
chunk := chunks[0]
assert.NoError(t, chunk.Error())
assert.Equal(t, a+b, string(chunk.Bytes()))
}