address comments

This commit is contained in:
Ahrav Dutta 2024-10-19 19:41:34 -07:00
parent 62561fbf50
commit 9411abd12d
6 changed files with 89 additions and 89 deletions

View file

@ -33,13 +33,7 @@ func (h *arHandler) HandleFile(ctx logContext.Context, input fileReader) chan Da
go func() {
defer close(dataOrErrChan)
// Update the metrics for the file processing.
start := time.Now()
var err error
defer func() {
h.measureLatencyAndHandleErrors(ctx, start, err, dataOrErrChan)
h.metrics.incFilesProcessed()
}()
// Defer a panic recovery to handle any panics that occur during the AR processing.
defer func() {
@ -51,21 +45,26 @@ func (h *arHandler) HandleFile(ctx logContext.Context, input fileReader) chan Da
panicErr = fmt.Errorf("panic occurred: %v", r)
}
dataOrErrChan <- DataOrErr{
Data: nil,
Err: fmt.Errorf("%w: panic error: %w", ErrCriticalProcessing, panicErr),
Err: fmt.Errorf("%w: panic error: %v", ErrProcessingFatal, panicErr),
}
}
}()
var arReader *deb.Ar
arReader, err = deb.LoadAr(input)
arReader, err := deb.LoadAr(input)
if err != nil {
dataOrErrChan <- DataOrErr{
Err: fmt.Errorf("%w: loading AR error: %v", ErrProcessingFatal, err),
}
return
}
if err = h.processARFiles(ctx, arReader, dataOrErrChan); err != nil {
ctx.Logger().Error(err, "error processing AR files")
err = h.processARFiles(ctx, arReader, dataOrErrChan)
if err == nil {
h.metrics.incFilesProcessed()
}
// Update the metrics for the file processing and handle any errors.
h.measureLatencyAndHandleErrors(ctx, start, err, dataOrErrChan)
}()
return dataOrErrChan
@ -91,15 +90,19 @@ func (h *arHandler) processARFiles(ctx logContext.Context, reader *deb.Ar, dataO
rdr, err := newMimeTypeReader(arEntry.Data)
if err != nil {
return fmt.Errorf("error creating mime-type reader: %w", err)
dataOrErrChan <- DataOrErr{
Err: fmt.Errorf("%w: error creating AR mime-type reader: %v", ErrProcessingWarning, err),
}
h.metrics.incErrors()
continue
}
if err := h.handleNonArchiveContent(fileCtx, rdr, dataOrErrChan); err != nil {
dataOrErrChan <- DataOrErr{
Data: nil,
Err: fmt.Errorf("%w: error handling archive content in AR: %w", ErrNonCriticalProcessing, err),
Err: fmt.Errorf("%w: error handling archive content in AR: %v", ErrProcessingWarning, err),
}
h.metrics.incErrors()
continue
}
h.metrics.incFilesProcessed()

View file

@ -53,7 +53,6 @@ func (h *archiveHandler) HandleFile(ctx logContext.Context, input fileReader) ch
}
go func() {
var err error
defer close(dataOrErrChan)
// The underlying 7zip library may panic when attempting to open an archive.
@ -67,24 +66,21 @@ func (h *archiveHandler) HandleFile(ctx logContext.Context, input fileReader) ch
} else {
panicErr = fmt.Errorf("panic occurred: %v", r)
}
ctx.Logger().Error(panicErr, "Panic occurred when attempting to open archive")
dataOrErrChan <- DataOrErr{
Data: nil,
Err: fmt.Errorf("%w: panic error: %w", ErrCriticalProcessing, panicErr),
Err: fmt.Errorf("%w: panic error: %v", ErrProcessingFatal, panicErr),
}
}
}()
// Update the metrics for the file processing.
start := time.Now()
defer func() {
h.measureLatencyAndHandleErrors(ctx, start, err, dataOrErrChan)
h.metrics.incFilesProcessed()
}()
if err = h.openArchive(ctx, 0, input, dataOrErrChan); err != nil {
ctx.Logger().Error(err, "error unarchiving chunk.")
err := h.openArchive(ctx, 0, input, dataOrErrChan)
if err == nil {
h.metrics.incFilesProcessed()
}
// Update the metrics for the file processing and handle any errors.
h.measureLatencyAndHandleErrors(ctx, start, err, dataOrErrChan)
}()
return dataOrErrChan

View file

@ -38,17 +38,15 @@ func (h *defaultHandler) HandleFile(ctx logContext.Context, input fileReader) ch
go func() {
defer close(dataOrErrChan)
// Update the metrics for the file processing.
start := time.Now()
var err error
defer func() {
h.measureLatencyAndHandleErrors(ctx, start, err, dataOrErrChan)
h.metrics.incFilesProcessed()
}()
if err = h.handleNonArchiveContent(ctx, newMimeTypeReaderFromFileReader(input), dataOrErrChan); err != nil {
ctx.Logger().Error(err, "error handling non-archive content.")
err := h.handleNonArchiveContent(ctx, newMimeTypeReaderFromFileReader(input), dataOrErrChan)
if err == nil {
h.metrics.incFilesProcessed()
}
// Update the metrics for the file processing and handle errors.
h.measureLatencyAndHandleErrors(ctx, start, err, dataOrErrChan)
}()
return dataOrErrChan
@ -66,17 +64,21 @@ func (h *defaultHandler) measureLatencyAndHandleErrors(
h.metrics.observeHandleFileLatency(time.Since(start).Milliseconds())
return
}
dataOrErr := DataOrErr{}
h.metrics.incErrors()
if errors.Is(err, context.DeadlineExceeded) {
h.metrics.incFileProcessingTimeouts()
de := DataOrErr{
Data: nil,
Err: fmt.Errorf("%w: error processing chunk: %w", ErrCriticalProcessing, err),
}
if err := common.CancellableWrite(ctx, dataErrChan, de); err != nil {
dataOrErr.Err = fmt.Errorf("%w: error processing chunk: %v", ErrProcessingFatal, err)
if err := common.CancellableWrite(ctx, dataErrChan, dataOrErr); err != nil {
ctx.Logger().Error(err, "error writing to data channel")
}
return
}
dataOrErr.Err = err
if err := common.CancellableWrite(ctx, dataErrChan, dataOrErr); err != nil {
ctx.Logger().Error(err, "error writing to data channel")
}
}
@ -102,20 +104,18 @@ func (h *defaultHandler) handleNonArchiveContent(
chunkReader := sources.NewChunkReader()
for data := range chunkReader(ctx, reader) {
dataOrErr := DataOrErr{}
if err := data.Error(); err != nil {
h.metrics.incErrors()
de := DataOrErr{
Data: nil,
Err: fmt.Errorf("%w: error reading chunk: %w", ErrNonCriticalProcessing, err),
}
if writeErr := common.CancellableWrite(ctx, dataOrErrChan, de); writeErr != nil {
return writeErr
dataOrErr.Err = fmt.Errorf("%w: error reading chunk: %v", ErrProcessingWarning, err)
if writeErr := common.CancellableWrite(ctx, dataOrErrChan, dataOrErr); writeErr != nil {
return fmt.Errorf("%w: error writing to data channel: %v", ErrProcessingFatal, writeErr)
}
continue
}
de := DataOrErr{Data: data.Bytes(), Err: nil}
if err := common.CancellableWrite(ctx, dataOrErrChan, de); err != nil {
dataOrErr.Data = data.Bytes()
if err := common.CancellableWrite(ctx, dataOrErrChan, dataOrErr); err != nil {
return err
}
h.metrics.incBytesProcessed(len(data.Bytes()))

View file

@ -42,11 +42,13 @@ type fileReader struct {
var (
ErrEmptyReader = errors.New("reader is empty")
// ErrCriticalProcessing indicates a critical error that should halt processing.
ErrCriticalProcessing = errors.New("critical processing error")
// ErrProcessingFatal indicates a severe error that requires stopping the file processing.
// ErrNonCriticalProcessing indicates a non-critical error that can be logged and processing can continue.
ErrNonCriticalProcessing = errors.New("non-critical processing error")
ErrProcessingFatal = errors.New("fatal error processing file")
// ErrProcessingWarning indicates a recoverable error that can be logged,
// allowing processing to continue.
ErrProcessingWarning = errors.New("error processing file")
)
// mimeTypeReader wraps an io.Reader with MIME type information.
@ -90,9 +92,7 @@ func newMimeTypeReader(r io.Reader) (mimeTypeReader, error) {
// newFileReader creates a fileReader from an io.Reader, optionally using BufferedFileWriter for certain formats.
// The caller is responsible for closing the reader when it is no longer needed.
func newFileReader(r io.Reader) (fileReader, error) {
var fReader fileReader
func newFileReader(r io.Reader) (fReader fileReader, err error) {
// To detect the MIME type of the input data, we need a reader that supports seeking.
// This allows us to read the data multiple times if necessary without losing the original position.
// We use a BufferedReaderSeeker to wrap the original reader, enabling this functionality.
@ -100,7 +100,6 @@ func newFileReader(r io.Reader) (fileReader, error) {
// If an error occurs during MIME type detection, it is important we close the BufferedReaderSeeker
// to release any resources it holds (checked out buffers or temp file).
var err error
defer func() {
if err != nil {
if closeErr := fReader.Close(); closeErr != nil {
@ -109,14 +108,15 @@ func newFileReader(r io.Reader) (fileReader, error) {
}
}()
mime, err := mimetype.DetectReader(fReader)
var mime *mimetype.MIME
mime, err = mimetype.DetectReader(fReader)
if err != nil {
return fReader, fmt.Errorf("unable to detect MIME type: %w", err)
}
fReader.mime = mime
// Reset the reader to the beginning because DetectReader consumes the reader.
if _, err := fReader.Seek(0, io.SeekStart); err != nil {
if _, err = fReader.Seek(0, io.SeekStart); err != nil {
return fReader, fmt.Errorf("error resetting reader after MIME detection: %w", err)
}
@ -126,7 +126,8 @@ func newFileReader(r io.Reader) (fileReader, error) {
return fReader, nil
}
format, _, err := archiver.Identify("", fReader)
var format archiver.Format
format, _, err = archiver.Identify("", fReader)
switch {
case err == nil:
fReader.isGenericArchive = true
@ -141,7 +142,7 @@ func newFileReader(r io.Reader) (fileReader, error) {
// Reset the reader to the beginning again to allow the handler to read from the start.
// This is necessary because Identify consumes the reader.
if _, err := fReader.Seek(0, io.SeekStart); err != nil {
if _, err = fReader.Seek(0, io.SeekStart); err != nil {
return fReader, fmt.Errorf("error resetting reader after archive identification: %w", err)
}
@ -305,7 +306,7 @@ func SetArchiveMaxTimeout(timeout time.Duration) { maxTimeout = timeout }
// - If the reader is nil
// - If there's an error creating the file reader
// - If there's an error closing the reader
// - If a critical error occurs during chunk processing (context cancellation, deadline exceeded, or ErrCriticalProcessing)
// - If a critical error occurs during chunk processing (context cancellation, deadline exceeded, or ErrProcessingFatal)
// - If there's an error reporting a chunk
//
// Non-critical errors during chunk processing are logged
@ -371,22 +372,22 @@ func handleChunksWithError(
) error {
for {
select {
case de, ok := <-dataErrChan:
case dataOrErr, ok := <-dataErrChan:
if !ok {
// Channel closed, processing complete.
ctx.Logger().V(5).Info("dataErrChan closed, all chunks processed")
return nil
}
if de.Err != nil {
if isCriticalError(de.Err) {
return de.Err
if dataOrErr.Err != nil {
if isCriticalError(dataOrErr.Err) {
return dataOrErr.Err
}
ctx.Logger().Error(de.Err, "non-critical error processing chunk")
ctx.Logger().Error(dataOrErr.Err, "non-critical error processing chunk")
continue
}
if len(de.Data) > 0 {
if len(dataOrErr.Data) > 0 {
chunk := *chunkSkel
chunk.Data = de.Data
chunk.Data = dataOrErr.Data
if err := reporter.ChunkOk(ctx, chunk); err != nil {
return fmt.Errorf("error reporting chunk: %w", err)
}
@ -400,15 +401,15 @@ func handleChunksWithError(
// isCriticalError determines whether the given error is a critical error that should
// terminate processing, or a non-critical error that can be logged and ignored.
// Critical errors include context cancellation, deadline exceeded, and the
// ErrCriticalProcessing error. Non-critical errors include the ErrNonCriticalProcessing
// ErrProcessingFatal error. Non-critical errors include the ErrProcessingWarning
// error. All other errors are treated as non-critical.
func isCriticalError(err error) bool {
switch {
case errors.Is(err, context.Canceled) ||
errors.Is(err, context.DeadlineExceeded) ||
errors.Is(err, ErrCriticalProcessing):
errors.Is(err, ErrProcessingFatal):
return true
case errors.Is(err, ErrNonCriticalProcessing):
case errors.Is(err, ErrProcessingWarning):
return false
default:
return false

View file

@ -799,12 +799,12 @@ func TestHandleChunksWithError(t *testing.T) {
}{
{
name: "Non-Critical Error",
input: []DataOrErr{{Err: ErrNonCriticalProcessing}},
input: []DataOrErr{{Err: ErrProcessingWarning}},
},
{
name: "Critical Error",
input: []DataOrErr{{Err: ErrCriticalProcessing}},
expectedErr: ErrCriticalProcessing,
input: []DataOrErr{{Err: ErrProcessingFatal}},
expectedErr: ErrProcessingFatal,
},
{
name: "No Error",

View file

@ -33,13 +33,7 @@ func (h *rpmHandler) HandleFile(ctx logContext.Context, input fileReader) chan D
go func() {
defer close(dataOrErrChan)
// Update the metrics for the file processing.
start := time.Now()
var err error
defer func() {
h.measureLatencyAndHandleErrors(ctx, start, err, dataOrErrChan)
h.metrics.incFilesProcessed()
}()
// Defer a panic recovery to handle any panics that occur during the RPM processing.
defer func() {
@ -51,27 +45,34 @@ func (h *rpmHandler) HandleFile(ctx logContext.Context, input fileReader) chan D
panicErr = fmt.Errorf("panic occurred: %v", r)
}
dataOrErrChan <- DataOrErr{
Data: nil,
Err: fmt.Errorf("%w: panic error: %w", ErrCriticalProcessing, panicErr),
Err: fmt.Errorf("%w: panic error: %v", ErrProcessingFatal, panicErr),
}
}
}()
var rpm *rpmutils.Rpm
rpm, err = rpmutils.ReadRpm(input)
rpm, err := rpmutils.ReadRpm(input)
if err != nil {
dataOrErrChan <- DataOrErr{
Err: fmt.Errorf("%w: reading rpm error: %v", ErrProcessingFatal, err),
}
return
}
var reader rpmutils.PayloadReader
reader, err = rpm.PayloadReaderExtended()
reader, err := rpm.PayloadReaderExtended()
if err != nil {
dataOrErrChan <- DataOrErr{
Err: fmt.Errorf("%w: uncompressing rpm error: %v", ErrProcessingFatal, err),
}
return
}
if err = h.processRPMFiles(ctx, reader, dataOrErrChan); err != nil {
ctx.Logger().Error(err, "error processing RPM files")
err = h.processRPMFiles(ctx, reader, dataOrErrChan)
if err == nil {
h.metrics.incFilesProcessed()
}
// Update the metrics for the file processing and handle any errors.
h.measureLatencyAndHandleErrors(ctx, start, err, dataOrErrChan)
}()
return dataOrErrChan
@ -106,8 +107,7 @@ func (h *rpmHandler) processRPMFiles(
if err := h.handleNonArchiveContent(fileCtx, rdr, dataOrErrChan); err != nil {
dataOrErrChan <- DataOrErr{
Data: nil,
Err: fmt.Errorf("%w: error processing RPM archive: %w", ErrNonCriticalProcessing, err),
Err: fmt.Errorf("%w: error processing RPM archive: %v", ErrProcessingWarning, err),
}
h.metrics.incErrors()
}