This test has a race condition. This change makes it less likely to cause a test failure, and is a stopgap measure to de-flake the test while we investigate the underlying issue.
* Add Display method to SourceUnit and Kind member to the CommonSourceUnit
* Make SourceUnitID return the ID and a kind
These two values together uniquely represent a unit.
* Refactor UnitHook to block the scan if finished metrics aren't handled
* Log once when back-pressure is detected
* Add hook channel size metric
* Use plural "metrics" for consistency
* Replace LRU cache with map
* [chore] Fix SourceManager flaky test
Sorting by EndTime is not deterministic, however sorting by StartTime
should be. StartTime is set in a goroutine that's limited by
WithConcurrentUnits, so it should happen in order that the units are
received.
* Sort by unit ID
* Add UnitHook and NoopHook implementations
The UnitHook tracks metrics per unit of a job, and emits them on a
channel once finished. It should work even if the Source does not
support source units.
* Refactor channel to use an LRU cache instead
An LRU cache has a more favorable failure mode than the channel. With
the channel, if the consumer stopped consuming metrics, scanning would
block. With the LRU cache, metrics will be dropped when space runs out
and a log message emitted.
The previous implementation used int64 for both, which can be mixed up
easily. Using distinct types adds a layer of type safety checked by the
compiler.
* Refactor SourceManager to remove Enrollment
Initializing the Source will be the responsibility of the caller. The
SourceManager exposes a GetIDs method for getting a source and job ID.
* Update tests
* Update engine usage
* Update apiClient interface to have one GetIDs method
* Update SourceManager usage in engine
The source manager initialization function was defined as `sourceID`
followed by `jobID`, while the source initialization function is the
reverse. This is confusing and easy to mix up since the parameters are
the same type.
This commit adds a test to make sure the source manager initializes in
the correct order, but it doesn't prevent the library user to make the
same mistake. We may want to consider using different types.
* Add SourceManager to Engine struct
* Update Engine methods to use the SourceManager
* Fix GCS test
The original was testing that `Init()` errors weren't surfaced in
`Finish()`, but the `SourceManager` changed that behavior.
* JobProgress race fixes
* Add contextual values
* Remove unused code
* Add debug logs
* Rename WithConcurrency to WithConcurrentSources
* Always forward chunks to the output chunks channel
* Support fatal errors in job reports
* WIP: JobReporter and JobInspector
* WIP: JobReportHook and JobReportRef
* Add ChunkError type and asyncRun helper method
* Rename JobReport to JobProgress
* Return a closed channel from Done when the JobProgress is nil
* Comment catchFirstFatal function
* Miscellaneous SourceManager updates
* Own the chunks channel instead of accepting it as an input
* Add Chunks and Wait methods
* Fix bug in Enroll so it actually returns the handle
* Add context.Context parameter to the SourceInitFunc type
* Add SourceManager tests for Run and Wait methods
* Rename man variables to mgr