Rewrite SourceUnitEnumerator to use UnitReporter instead of a channel (#1485)

This commit is contained in:
Miccah 2023-07-13 13:48:33 -05:00 committed by GitHub
parent a9213a1103
commit 4b7f94dea1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 17 additions and 29 deletions

View file

@ -220,10 +220,10 @@ func (s *Source) scanFile(ctx context.Context, path string, chunksChan chan *sou
// Enumerate implements SourceUnitEnumerator interface. This implementation simply // Enumerate implements SourceUnitEnumerator interface. This implementation simply
// passes the configured paths as the source unit, whether it be a single // passes the configured paths as the source unit, whether it be a single
// filepath or a directory. // filepath or a directory.
func (s *Source) Enumerate(ctx context.Context, units chan<- sources.EnumerationResult) error { func (s *Source) Enumerate(ctx context.Context, reporter sources.UnitReporter) error {
for _, path := range s.paths { for _, path := range s.paths {
item := sources.CommonEnumerationOk(path) item := sources.CommonSourceUnit{ID: path}
if err := common.CancellableWrite(ctx, units, item); err != nil { if err := reporter.UnitOk(ctx, item); err != nil {
return err return err
} }
} }

View file

@ -53,21 +53,22 @@ type SourceUnitUnmarshaller interface {
// SourceUnitEnumerator defines an optional interface a Source can implement to // SourceUnitEnumerator defines an optional interface a Source can implement to
// support enumerating an initialized Source into SourceUnits. // support enumerating an initialized Source into SourceUnits.
type SourceUnitEnumerator interface { type SourceUnitEnumerator interface {
// Enumerate enumerates the initialized Source, outputting units. This // Enumerate creates 0 or more units from an initialized source,
// method is synchronous but can be called in a goroutine to support // reporting them or any errors to the UnitReporter. This method is
// concurrent enumeration and chunking. An error should only be // synchronous but can be called in a goroutine to support concurrent
// returned from this method in the case of context cancellation. All // enumeration and chunking. An error should only be returned from this
// other errors related to unit enumeration are tracked in the // method in the case of context cancellation, fatal source errors, or
// EnumerationResult. // errors returned by the reporter All other errors related to unit
Enumerate(ctx context.Context, units chan<- EnumerationResult) error // enumeration are tracked by the UnitReporter.
Enumerate(ctx context.Context, reporter UnitReporter) error
} }
// EnumerationResult is the result of an enumeration, containing the unit and // UnitReporter defines the interface a source will use to report whether a
// error if any. Unit and Error are mutually exclusive (only one will be // unit was found during enumeration. Either method may be called any number of
// non-nil). // times. Implementors of this interface should allow for concurrent calls.
type EnumerationResult struct { type UnitReporter interface {
Unit SourceUnit UnitOk(ctx context.Context, unit SourceUnit) error
Error error UnitErr(ctx context.Context, err error) error
} }
// SourceUnit is an object that represents a Source's unit of work. This is // SourceUnit is an object that represents a Source's unit of work. This is
@ -243,16 +244,3 @@ func (p *Progress) GetProgress() *Progress {
defer p.mut.Unlock() defer p.mut.Unlock()
return p return p
} }
// CommonEnumerationOk is a helper function to construct an EnumerationResult
// using a CommonSourceUnit.
func CommonEnumerationOk(id string) EnumerationResult {
unit := CommonSourceUnit{ID: id}
return EnumerationResult{Unit: unit}
}
// EnumerationErr is a helper function to construct an EnumerationResult from
// an error.
func EnumerationErr(err error) EnumerationResult {
return EnumerationResult{Error: err}
}