This commit is contained in:
Ahrav Dutta 2024-10-28 11:50:47 -07:00
parent fea6f187d9
commit 9a446be55b
4 changed files with 21 additions and 5 deletions

View file

@ -59,7 +59,7 @@ func (h *arHandler) HandleFile(ctx logContext.Context, input fileReader) chan Da
}
// Update the metrics for the file processing and handle any errors.
h.measureLatencyAndHandleErrors(start, err)
h.measureLatencyAndHandleErrors(ctx, start, err, dataOrErrChan)
}()
return dataOrErrChan

View file

@ -77,7 +77,7 @@ func (h *archiveHandler) HandleFile(ctx logContext.Context, input fileReader) ch
}
// Update the metrics for the file processing and handle any errors.
h.measureLatencyAndHandleErrors(start, err)
h.measureLatencyAndHandleErrors(ctx, start, err, dataOrErrChan)
}()
return dataOrErrChan

View file

@ -45,7 +45,7 @@ func (h *defaultHandler) HandleFile(ctx logContext.Context, input fileReader) ch
}
// Update the metrics for the file processing and handle errors.
h.measureLatencyAndHandleErrors(start, err)
h.measureLatencyAndHandleErrors(ctx, start, err, dataOrErrChan)
}()
return dataOrErrChan
@ -53,15 +53,31 @@ func (h *defaultHandler) HandleFile(ctx logContext.Context, input fileReader) ch
// measureLatencyAndHandleErrors measures the latency of the file processing and updates the metrics accordingly.
// It also records errors and timeouts in the metrics.
func (h *defaultHandler) measureLatencyAndHandleErrors(start time.Time, err error) {
func (h *defaultHandler) measureLatencyAndHandleErrors(
ctx logContext.Context,
start time.Time,
err error,
dataErrChan chan DataOrErr,
) {
if err == nil {
h.metrics.observeHandleFileLatency(time.Since(start).Milliseconds())
return
}
dataOrErr := DataOrErr{}
h.metrics.incErrors()
if errors.Is(err, context.DeadlineExceeded) {
h.metrics.incFileProcessingTimeouts()
dataOrErr.Err = fmt.Errorf("%w: error processing chunk", err)
if err := common.CancellableWrite(ctx, dataErrChan, dataOrErr); err != nil {
ctx.Logger().Error(err, "error writing to data channel")
}
return
}
dataOrErr.Err = err
if err := common.CancellableWrite(ctx, dataErrChan, dataOrErr); err != nil {
ctx.Logger().Error(err, "error writing to data channel")
}
}

View file

@ -65,7 +65,7 @@ func (h *rpmHandler) HandleFile(ctx logContext.Context, input fileReader) chan D
}
// Update the metrics for the file processing and handle any errors.
h.measureLatencyAndHandleErrors(start, err)
h.measureLatencyAndHandleErrors(ctx, start, err, dataOrErrChan)
}()
return dataOrErrChan