mirror of
https://github.com/trufflesecurity/trufflehog.git
synced 2024-11-10 07:04:24 +00:00
Filter unique detectors by keywords in chunk (#1711)
* pre filter detectors that include the keywords in the chunk. * Optimize the engine to prevent iterating overing all detectors. * use sync.Map for concurrent access. * lint. * use correct verify. * allow versioned detectors. * Break apart Start. * cleanup. * Update benchmark. * add comment. * remove Engine prefix. * update comments. * use regular map. * delete the pool. * remove old code. * refactor ahocorasickcore into own file. * update comments * move structs to ahocorasickcore * update comments * fix * address comments * exported some methods and constructor since it will need to be be used by the enterprise pipeline as well * remove extra log
This commit is contained in:
parent
6c75e45958
commit
68f28a0e34
7 changed files with 236 additions and 119 deletions
|
@ -4,6 +4,7 @@ import (
|
|||
"bytes"
|
||||
"encoding/base64"
|
||||
|
||||
"github.com/trufflesecurity/trufflehog/v3/pkg/pb/detectorspb"
|
||||
"github.com/trufflesecurity/trufflehog/v3/pkg/sources"
|
||||
)
|
||||
|
||||
|
@ -25,7 +26,8 @@ func init() {
|
|||
}
|
||||
}
|
||||
|
||||
func (d *Base64) FromChunk(chunk *sources.Chunk) *sources.Chunk {
|
||||
func (d *Base64) FromChunk(chunk *sources.Chunk) *DecodableChunk {
|
||||
decodableChunk := &DecodableChunk{Chunk: chunk, DecoderType: detectorspb.DecoderType_BASE64}
|
||||
encodedSubstrings := getSubstringsOfCharacterSet(chunk.Data, 20, b64CharsetMapping, b64EndChars)
|
||||
decodedSubstrings := make(map[string][]byte)
|
||||
|
||||
|
@ -61,7 +63,7 @@ func (d *Base64) FromChunk(chunk *sources.Chunk) *sources.Chunk {
|
|||
}
|
||||
result.Write(chunk.Data[start:])
|
||||
chunk.Data = result.Bytes()
|
||||
return chunk
|
||||
return decodableChunk
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package decoders
|
||||
|
||||
import (
|
||||
"github.com/trufflesecurity/trufflehog/v3/pkg/pb/detectorspb"
|
||||
"github.com/trufflesecurity/trufflehog/v3/pkg/sources"
|
||||
)
|
||||
|
||||
|
@ -13,8 +14,15 @@ func DefaultDecoders() []Decoder {
|
|||
}
|
||||
}
|
||||
|
||||
// DecodableChunk is a chunk that includes the type of decoder used.
|
||||
// This allows us to avoid a type assertion on each decoder.
|
||||
type DecodableChunk struct {
|
||||
*sources.Chunk
|
||||
DecoderType detectorspb.DecoderType
|
||||
}
|
||||
|
||||
type Decoder interface {
|
||||
FromChunk(chunk *sources.Chunk) *sources.Chunk
|
||||
FromChunk(chunk *sources.Chunk) *DecodableChunk
|
||||
}
|
||||
|
||||
// Fuzz is an entrypoint for go-fuzz, which is an AFL-style fuzzing tool.
|
||||
|
|
|
@ -5,22 +5,24 @@ import (
|
|||
"encoding/binary"
|
||||
"unicode/utf8"
|
||||
|
||||
"github.com/trufflesecurity/trufflehog/v3/pkg/pb/detectorspb"
|
||||
"github.com/trufflesecurity/trufflehog/v3/pkg/sources"
|
||||
)
|
||||
|
||||
type UTF16 struct{}
|
||||
|
||||
func (d *UTF16) FromChunk(chunk *sources.Chunk) *sources.Chunk {
|
||||
func (d *UTF16) FromChunk(chunk *sources.Chunk) *DecodableChunk {
|
||||
if chunk == nil || len(chunk.Data) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
decodableChunk := &DecodableChunk{Chunk: chunk, DecoderType: detectorspb.DecoderType_UTF16}
|
||||
if utf16Data, err := utf16ToUTF8(chunk.Data); err == nil {
|
||||
if len(utf16Data) == 0 {
|
||||
return nil
|
||||
}
|
||||
chunk.Data = utf16Data
|
||||
return chunk
|
||||
return decodableChunk
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -4,22 +4,25 @@ import (
|
|||
"bytes"
|
||||
"unicode/utf8"
|
||||
|
||||
"github.com/trufflesecurity/trufflehog/v3/pkg/pb/detectorspb"
|
||||
"github.com/trufflesecurity/trufflehog/v3/pkg/sources"
|
||||
)
|
||||
|
||||
type UTF8 struct{}
|
||||
|
||||
func (d *UTF8) FromChunk(chunk *sources.Chunk) *sources.Chunk {
|
||||
func (d *UTF8) FromChunk(chunk *sources.Chunk) *DecodableChunk {
|
||||
if chunk == nil || len(chunk.Data) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
decodableChunk := &DecodableChunk{Chunk: chunk, DecoderType: detectorspb.DecoderType_PLAIN}
|
||||
|
||||
if !utf8.Valid(chunk.Data) {
|
||||
chunk.Data = extractSubstrings(chunk.Data)
|
||||
return chunk
|
||||
return decodableChunk
|
||||
}
|
||||
|
||||
return chunk
|
||||
return decodableChunk
|
||||
}
|
||||
|
||||
// extractSubstrings performs similarly to the strings binutil,
|
||||
|
|
111
pkg/engine/ahocorasickcore.go
Normal file
111
pkg/engine/ahocorasickcore.go
Normal file
|
@ -0,0 +1,111 @@
|
|||
package engine
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
ahocorasick "github.com/BobuSumisu/aho-corasick"
|
||||
|
||||
"github.com/trufflesecurity/trufflehog/v3/pkg/context"
|
||||
"github.com/trufflesecurity/trufflehog/v3/pkg/detectors"
|
||||
"github.com/trufflesecurity/trufflehog/v3/pkg/pb/detectorspb"
|
||||
)
|
||||
|
||||
// detectorKey is used to identify a detector in the keywordsToDetectors map.
|
||||
// Multiple detectors can have the same detector type but different versions.
|
||||
// This allows us to identify a detector by its type and version.
|
||||
type detectorKey struct {
|
||||
detectorType detectorspb.DetectorType
|
||||
version int
|
||||
}
|
||||
|
||||
// detectorInfo is used to store a detector and whether it should be verified.
|
||||
type detectorInfo struct {
|
||||
detectors.Detector
|
||||
shouldVerify bool
|
||||
}
|
||||
|
||||
// AhoCorasickCore encapsulates the operations and data structures used for keyword matching via the
|
||||
// Aho-Corasick algorithm. It is responsible for constructing and managing the trie for efficient
|
||||
// substring searches, as well as mapping keywords to their associated detectors for rapid lookups.
|
||||
type AhoCorasickCore struct {
|
||||
// prefilter is a ahocorasick struct used for doing efficient string
|
||||
// matching given a set of words. (keywords from the rules in the config)
|
||||
prefilter ahocorasick.Trie
|
||||
// Maps for efficient lookups during detection.
|
||||
detectorTypeToDetectorInfo map[detectorKey]detectorInfo
|
||||
detectors map[bool][]detectors.Detector
|
||||
keywordsToDetectors map[string][]detectorKey
|
||||
}
|
||||
|
||||
// NewAhoCorasickCore allocates and initializes a new instance of AhoCorasickCore.
|
||||
// It creates an empty keyword-to-detectors map for future string matching operations.
|
||||
// The map detectorTypeToDetectorInfo is pre-allocated based on the size of detectors
|
||||
// provided, for efficient storage and lookup of detector information.
|
||||
func NewAhoCorasickCore(detectors map[bool][]detectors.Detector) *AhoCorasickCore {
|
||||
return &AhoCorasickCore{
|
||||
keywordsToDetectors: make(map[string][]detectorKey),
|
||||
detectors: detectors,
|
||||
detectorTypeToDetectorInfo: make(map[detectorKey]detectorInfo, len(detectors[true])+len(detectors[false])),
|
||||
}
|
||||
}
|
||||
|
||||
// Setup initializes the internal state of AhoCorasickCore to prepare it for keyword matching.
|
||||
// This involves pre-filtering setup and lookup optimization, critical for the engine's performance.
|
||||
func (ac *AhoCorasickCore) Setup(ctx context.Context) {
|
||||
// Prepare maps for fast detector lookups, instead of scanning through an array of detectors for every chunk.
|
||||
var keywords []string
|
||||
for verify, detectorsSet := range ac.detectors {
|
||||
for _, d := range detectorsSet {
|
||||
key := createDetectorKey(d)
|
||||
ac.detectorTypeToDetectorInfo[key] = detectorInfo{Detector: d, shouldVerify: verify}
|
||||
keywords = ac.extractAndMapKeywords(d, key, keywords)
|
||||
}
|
||||
}
|
||||
|
||||
// Use the Ahocorasick algorithm to create a trie structure for efficient keyword matching.
|
||||
// This ensures that we can rapidly match against a vast set of keywords without individually comparing each one.
|
||||
ac.prefilter = *ahocorasick.NewTrieBuilder().AddStrings(keywords).Build()
|
||||
ctx.Logger().V(4).Info("AhoCorasickCore Setup complete")
|
||||
}
|
||||
|
||||
// createDetectorKey creates a unique key for each detector. This key based on type and version,
|
||||
// it ensures faster lookups and reduces redundancy in our main detector store.
|
||||
func createDetectorKey(d detectors.Detector) detectorKey {
|
||||
detectorType := d.Type()
|
||||
var version int
|
||||
if v, ok := d.(detectors.Versioner); ok {
|
||||
version = v.Version()
|
||||
}
|
||||
return detectorKey{detectorType: detectorType, version: version}
|
||||
}
|
||||
|
||||
// extractAndMapKeywords captures keywords associated with each detector and maps them.
|
||||
// This allows us to quickly determine which detectors are relevant based on the presence of certain keywords.
|
||||
func (ac *AhoCorasickCore) extractAndMapKeywords(d detectors.Detector, key detectorKey, keywords []string) []string {
|
||||
for _, kw := range d.Keywords() {
|
||||
kwLower := strings.ToLower(kw)
|
||||
keywords = append(keywords, kwLower)
|
||||
ac.keywordsToDetectors[kwLower] = append(ac.keywordsToDetectors[kwLower], key)
|
||||
}
|
||||
return keywords
|
||||
}
|
||||
|
||||
// MatchString performs a string match using the Aho-Corasick algorithm, returning an array of matches.
|
||||
// Designed for internal use within the AhoCorasickCore component.
|
||||
func (ac *AhoCorasickCore) MatchString(input string) []*ahocorasick.Match {
|
||||
return ac.prefilter.MatchString(strings.ToLower(input))
|
||||
}
|
||||
|
||||
// PopulateDetectorsByMatch populates the given detectorMap based on the Aho-Corasick match results.
|
||||
// This method is designed to reuse the same map for performance optimization,
|
||||
// reducing the need for repeated allocations within each detector worker in the engine.
|
||||
func (ac *AhoCorasickCore) PopulateDetectorsByMatch(match *ahocorasick.Match, detectors map[detectorspb.DetectorType]detectorInfo) bool {
|
||||
matchedKeys, ok := ac.keywordsToDetectors[match.MatchString()]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
for _, key := range matchedKeys {
|
||||
detectors[key.detectorType] = ac.detectorTypeToDetectorInfo[key]
|
||||
}
|
||||
return true
|
||||
}
|
|
@ -3,14 +3,11 @@ package engine
|
|||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
ahocorasick "github.com/BobuSumisu/aho-corasick"
|
||||
lru "github.com/hashicorp/golang-lru"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
|
@ -66,9 +63,8 @@ type Engine struct {
|
|||
onlyVerified bool
|
||||
printAvgDetectorTime bool
|
||||
|
||||
// prefilter is a ahocorasick struct used for doing efficient string
|
||||
// matching given a set of words (keywords from the rules in the config)
|
||||
prefilter ahocorasick.Trie
|
||||
// ahoCorasickHandler manages the Aho-Corasick trie and related keyword lookups.
|
||||
ahoCorasickCore *AhoCorasickCore
|
||||
|
||||
// Engine synchronization primitives.
|
||||
sourceManager *sources.SourceManager
|
||||
|
@ -93,9 +89,10 @@ type Engine struct {
|
|||
dedupeCache *lru.Cache
|
||||
}
|
||||
|
||||
type EngineOption func(*Engine)
|
||||
// Option is used to configure the engine during initialization using functional options.
|
||||
type Option func(*Engine)
|
||||
|
||||
func WithConcurrency(concurrency uint8) EngineOption {
|
||||
func WithConcurrency(concurrency uint8) Option {
|
||||
return func(e *Engine) {
|
||||
e.concurrency = concurrency
|
||||
}
|
||||
|
@ -103,7 +100,7 @@ func WithConcurrency(concurrency uint8) EngineOption {
|
|||
|
||||
const ignoreTag = "trufflehog:ignore"
|
||||
|
||||
func WithDetectors(verify bool, d ...detectors.Detector) EngineOption {
|
||||
func WithDetectors(verify bool, d ...detectors.Detector) Option {
|
||||
return func(e *Engine) {
|
||||
if e.detectors == nil {
|
||||
e.detectors = make(map[bool][]detectors.Detector)
|
||||
|
@ -116,7 +113,7 @@ func WithDetectors(verify bool, d ...detectors.Detector) EngineOption {
|
|||
}
|
||||
}
|
||||
|
||||
func WithDecoders(decoders ...decoders.Decoder) EngineOption {
|
||||
func WithDecoders(decoders ...decoders.Decoder) Option {
|
||||
return func(e *Engine) {
|
||||
e.decoders = decoders
|
||||
}
|
||||
|
@ -124,14 +121,14 @@ func WithDecoders(decoders ...decoders.Decoder) EngineOption {
|
|||
|
||||
// WithFilterUnverified sets the filterUnverified flag on the engine. If set to
|
||||
// true, the engine will only return the first unverified result for a chunk for a detector.
|
||||
func WithFilterUnverified(filter bool) EngineOption {
|
||||
func WithFilterUnverified(filter bool) Option {
|
||||
return func(e *Engine) {
|
||||
e.filterUnverified = filter
|
||||
}
|
||||
}
|
||||
|
||||
// WithFilterEntropy filters out unverified results using Shannon entropy.
|
||||
func WithFilterEntropy(entropy float64) EngineOption {
|
||||
func WithFilterEntropy(entropy float64) Option {
|
||||
return func(e *Engine) {
|
||||
if entropy > 0 {
|
||||
e.filterEntropy = &entropy
|
||||
|
@ -141,7 +138,7 @@ func WithFilterEntropy(entropy float64) EngineOption {
|
|||
|
||||
// WithOnlyVerified sets the onlyVerified flag on the engine. If set to true,
|
||||
// the engine will only print verified results.
|
||||
func WithOnlyVerified(onlyVerified bool) EngineOption {
|
||||
func WithOnlyVerified(onlyVerified bool) Option {
|
||||
return func(e *Engine) {
|
||||
e.onlyVerified = onlyVerified
|
||||
}
|
||||
|
@ -153,7 +150,7 @@ func WithOnlyVerified(onlyVerified bool) EngineOption {
|
|||
// the engine is configured to print the results.
|
||||
// Calculating the average time taken by each detector is an expensive operation
|
||||
// and should be avoided unless specified by the user.
|
||||
func WithPrintAvgDetectorTime(printAvgDetectorTime bool) EngineOption {
|
||||
func WithPrintAvgDetectorTime(printAvgDetectorTime bool) Option {
|
||||
return func(e *Engine) {
|
||||
e.printAvgDetectorTime = printAvgDetectorTime
|
||||
}
|
||||
|
@ -163,7 +160,7 @@ func WithPrintAvgDetectorTime(printAvgDetectorTime bool) EngineOption {
|
|||
// the filterFunc returns true, the detector will be included for scanning.
|
||||
// This option applies to the existing list of detectors configured, so the
|
||||
// order this option appears matters. All filtering happens before scanning.
|
||||
func WithFilterDetectors(filterFunc func(detectors.Detector) bool) EngineOption {
|
||||
func WithFilterDetectors(filterFunc func(detectors.Detector) bool) Option {
|
||||
return func(e *Engine) {
|
||||
// If no detectors are configured, do nothing.
|
||||
if e.detectors == nil {
|
||||
|
@ -175,7 +172,7 @@ func WithFilterDetectors(filterFunc func(detectors.Detector) bool) EngineOption
|
|||
}
|
||||
|
||||
// WithPrinter sets the Printer on the engine.
|
||||
func WithPrinter(printer Printer) EngineOption {
|
||||
func WithPrinter(printer Printer) Option {
|
||||
return func(e *Engine) {
|
||||
e.printer = printer
|
||||
}
|
||||
|
@ -269,32 +266,60 @@ func (e *Engine) DetectorAvgTime() map[string][]time.Duration {
|
|||
return avgTime
|
||||
}
|
||||
|
||||
// Start the engine with options.
|
||||
func Start(ctx context.Context, options ...EngineOption) (*Engine, error) {
|
||||
const (
|
||||
defaultChannelBuffer = 1
|
||||
// TODO (ahrav): Determine the optimal cache size.
|
||||
cacheSize = 512 // number of entries in the LRU cache
|
||||
)
|
||||
// Start initializes and activates the engine's processing pipeline.
|
||||
// It sets up various default configurations, prepares lookup structures for
|
||||
// detectors, conducts basic sanity checks, and kickstarts all necessary workers.
|
||||
// Once started, the engine begins processing input data to identify secrets.
|
||||
func Start(ctx context.Context, options ...Option) (*Engine, error) {
|
||||
e := &Engine{}
|
||||
|
||||
if err := e.initialize(ctx, options...); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
e.setDefaults(ctx)
|
||||
ctx.Logger().V(4).Info("setting up aho-corasick core")
|
||||
e.ahoCorasickCore.Setup(ctx)
|
||||
e.sanityChecks(ctx)
|
||||
e.startWorkers(ctx)
|
||||
|
||||
return e, nil
|
||||
}
|
||||
|
||||
const defaultChannelBuffer = 1
|
||||
|
||||
// initialize prepares the engine's internal structures. The LRU cache optimizes
|
||||
// deduplication efforts, allowing the engine to quickly check if a chunk has
|
||||
// been processed before, thereby saving computational overhead.
|
||||
func (e *Engine) initialize(ctx context.Context, options ...Option) error {
|
||||
// TODO (ahrav): Determine the optimal cache size.
|
||||
const cacheSize = 512 // number of entries in the LRU cache
|
||||
|
||||
cache, err := lru.New(cacheSize)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to initialize LRU cache: %w", err)
|
||||
return fmt.Errorf("failed to initialize LRU cache: %w", err)
|
||||
}
|
||||
|
||||
e := &Engine{
|
||||
detectableChunksChan: make(chan detectableChunk, defaultChannelBuffer),
|
||||
results: make(chan detectors.ResultWithMetadata, defaultChannelBuffer),
|
||||
dedupeCache: cache,
|
||||
printer: new(output.PlainPrinter), // default printer
|
||||
metrics: runtimeMetrics{Metrics: Metrics{scanStartTime: time.Now()}},
|
||||
}
|
||||
// Channels are used for communication between different parts of the engine,
|
||||
// ensuring that data flows smoothly without race conditions.
|
||||
e.detectableChunksChan = make(chan detectableChunk, defaultChannelBuffer)
|
||||
e.results = make(chan detectors.ResultWithMetadata, defaultChannelBuffer)
|
||||
e.dedupeCache = cache
|
||||
e.printer = new(output.PlainPrinter)
|
||||
e.metrics = runtimeMetrics{Metrics: Metrics{scanStartTime: time.Now()}}
|
||||
|
||||
for _, option := range options {
|
||||
option(e)
|
||||
}
|
||||
ctx.Logger().V(4).Info("engine initialized")
|
||||
e.ahoCorasickCore = NewAhoCorasickCore(e.detectors)
|
||||
|
||||
// Set defaults.
|
||||
return nil
|
||||
}
|
||||
|
||||
// setDefaults ensures that if specific engine properties aren't provided,
|
||||
// they're set to reasonable default values. It makes the engine robust to
|
||||
// incomplete configuration.
|
||||
func (e *Engine) setDefaults(ctx context.Context) {
|
||||
if e.concurrency == 0 {
|
||||
numCPU := runtime.NumCPU()
|
||||
ctx.Logger().Info("No concurrency specified, defaulting to max", "cpu", numCPU)
|
||||
|
@ -302,12 +327,13 @@ func Start(ctx context.Context, options ...EngineOption) (*Engine, error) {
|
|||
}
|
||||
ctx.Logger().V(3).Info("engine started", "workers", e.concurrency)
|
||||
|
||||
// Create SourceManager.
|
||||
e.sourceManager = sources.NewManager(
|
||||
sources.WithConcurrentSources(int(e.concurrency)),
|
||||
sources.WithConcurrentUnits(int(e.concurrency)),
|
||||
sources.WithBufferedOutput(defaultChannelBuffer),
|
||||
)
|
||||
|
||||
// Default decoders handle common encoding formats.
|
||||
if len(e.decoders) == 0 {
|
||||
e.decoders = decoders.DefaultDecoders()
|
||||
}
|
||||
|
@ -317,46 +343,30 @@ func Start(ctx context.Context, options ...EngineOption) (*Engine, error) {
|
|||
e.detectors[true] = DefaultDetectors()
|
||||
e.detectors[false] = []detectors.Detector{}
|
||||
}
|
||||
ctx.Logger().V(4).Info("default engine options set")
|
||||
}
|
||||
|
||||
// build ahocorasick prefilter for efficient string matching
|
||||
// on keywords
|
||||
keywords := []string{}
|
||||
for _, d := range e.detectors[false] {
|
||||
for _, kw := range d.Keywords() {
|
||||
keywords = append(keywords, strings.ToLower(kw))
|
||||
}
|
||||
}
|
||||
for _, d := range e.detectors[true] {
|
||||
for _, kw := range d.Keywords() {
|
||||
keywords = append(keywords, strings.ToLower(kw))
|
||||
}
|
||||
}
|
||||
e.prefilter = *ahocorasick.NewTrieBuilder().AddStrings(keywords).Build()
|
||||
|
||||
ctx.Logger().V(3).Info("loaded decoders", "count", len(e.decoders))
|
||||
ctx.Logger().V(3).Info("loaded detectors",
|
||||
"total", len(e.detectors[true])+len(e.detectors[false]),
|
||||
"verification_enabled", len(e.detectors[true]),
|
||||
"verification_disabled", len(e.detectors[false]),
|
||||
)
|
||||
|
||||
// Sanity check detectors for duplicate configuration. Only log in case
|
||||
// a detector has been configured in a way that isn't represented by
|
||||
// the DetectorID (type and version).
|
||||
{
|
||||
dets := append(e.detectors[true], e.detectors[false]...)
|
||||
seenDetectors := make(map[config.DetectorID]struct{}, len(dets))
|
||||
for _, det := range dets {
|
||||
id := config.GetDetectorID(det)
|
||||
if _, ok := seenDetectors[id]; ok && id.ID != detectorspb.DetectorType_CustomRegex {
|
||||
ctx.Logger().Info("possible duplicate detector configured", "detector", id)
|
||||
}
|
||||
seenDetectors[id] = struct{}{}
|
||||
// Sanity check detectors for duplicate configuration. Only log in case
|
||||
// a detector has been configured in a way that isn't represented by
|
||||
// the DetectorID (type and version).
|
||||
func (e *Engine) sanityChecks(ctx context.Context) {
|
||||
dets := append(e.detectors[true], e.detectors[false]...)
|
||||
seenDetectors := make(map[config.DetectorID]struct{}, len(dets))
|
||||
for _, det := range dets {
|
||||
id := config.GetDetectorID(det)
|
||||
if _, ok := seenDetectors[id]; ok && id.ID != detectorspb.DetectorType_CustomRegex {
|
||||
ctx.Logger().Info("possible duplicate detector configured", "detector", id)
|
||||
}
|
||||
seenDetectors[id] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
// startWorkers initiates all necessary workers. Workers handle processing of
|
||||
// chunks concurrently. Separating the initialization of different types of
|
||||
// workers helps in scalability and makes it easier to diagnose issues.
|
||||
func (e *Engine) startWorkers(ctx context.Context) {
|
||||
// Scanner workers process input data and extract chunks for detectors.
|
||||
ctx.Logger().V(2).Info("starting scanner workers", "count", e.concurrency)
|
||||
// Run the Secret scanner workers and Notifier pipelines.
|
||||
for worker := uint64(0); worker < uint64(e.concurrency); worker++ {
|
||||
e.workersWg.Add(1)
|
||||
go func() {
|
||||
|
@ -367,6 +377,7 @@ func Start(ctx context.Context, options ...EngineOption) (*Engine, error) {
|
|||
}()
|
||||
}
|
||||
|
||||
// Detector workers apply keyword matching, regexes and API calls to detect secrets in chunks.
|
||||
const detectorWorkerMultiplier = 50
|
||||
ctx.Logger().V(2).Info("starting detector workers", "count", e.concurrency*detectorWorkerMultiplier)
|
||||
for worker := uint64(0); worker < uint64(e.concurrency*detectorWorkerMultiplier); worker++ {
|
||||
|
@ -379,6 +390,7 @@ func Start(ctx context.Context, options ...EngineOption) (*Engine, error) {
|
|||
}()
|
||||
}
|
||||
|
||||
// Notifier workers communicate detected issues to the user or any downstream systems.
|
||||
// We want 1/4th of the notifier workers as the number of scanner workers.
|
||||
const notifierWorkerRatio = 4
|
||||
maxNotifierWorkers := 1
|
||||
|
@ -395,8 +407,6 @@ func Start(ctx context.Context, options ...EngineOption) (*Engine, error) {
|
|||
e.notifyResults(ctx)
|
||||
}()
|
||||
}
|
||||
|
||||
return e, nil
|
||||
}
|
||||
|
||||
// Finish waits for running sources to complete and workers to finish scanning
|
||||
|
@ -445,64 +455,42 @@ type detectableChunk struct {
|
|||
func (e *Engine) detectorWorker(ctx context.Context) {
|
||||
var wgDetect sync.WaitGroup
|
||||
|
||||
// Reuse the same map to avoid allocations.
|
||||
const avgDetectorsPerChunk = 2
|
||||
chunkSpecificDetectors := make(map[detectorspb.DetectorType]detectorInfo, avgDetectorsPerChunk)
|
||||
for originalChunk := range e.ChunksChan() {
|
||||
for chunk := range sources.Chunker(originalChunk) {
|
||||
matchedKeywords := make(map[string]struct{})
|
||||
atomic.AddUint64(&e.metrics.BytesScanned, uint64(len(chunk.Data)))
|
||||
for _, decoder := range e.decoders {
|
||||
var decoderType detectorspb.DecoderType
|
||||
switch decoder.(type) {
|
||||
case *decoders.UTF8:
|
||||
decoderType = detectorspb.DecoderType_PLAIN
|
||||
case *decoders.Base64:
|
||||
decoderType = detectorspb.DecoderType_BASE64
|
||||
case *decoders.UTF16:
|
||||
decoderType = detectorspb.DecoderType_UTF16
|
||||
default:
|
||||
ctx.Logger().Info("unknown decoder type", "type", reflect.TypeOf(decoder).String())
|
||||
decoderType = detectorspb.DecoderType_UNKNOWN
|
||||
}
|
||||
|
||||
decoded := decoder.FromChunk(chunk)
|
||||
|
||||
if decoded == nil {
|
||||
ctx.Logger().V(4).Info("no decoder found for chunk", "chunk", chunk)
|
||||
continue
|
||||
}
|
||||
|
||||
// build a map of all keywords that were matched in the chunk
|
||||
for _, m := range e.prefilter.MatchString(strings.ToLower(string(decoded.Data))) {
|
||||
matchedKeywords[strings.ToLower(m.MatchString())] = struct{}{}
|
||||
for _, match := range e.ahoCorasickCore.MatchString(string(decoded.Chunk.Data)) {
|
||||
if !e.ahoCorasickCore.PopulateDetectorsByMatch(match, chunkSpecificDetectors) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
for verify, detectorsSet := range e.detectors {
|
||||
for _, detector := range detectorsSet {
|
||||
chunkContainsKeyword := false
|
||||
for _, kw := range detector.Keywords() {
|
||||
if _, ok := matchedKeywords[strings.ToLower(kw)]; ok {
|
||||
chunkContainsKeyword = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !chunkContainsKeyword {
|
||||
continue
|
||||
}
|
||||
|
||||
decoded.Verify = verify
|
||||
wgDetect.Add(1)
|
||||
e.detectableChunksChan <- detectableChunk{
|
||||
chunk: *decoded,
|
||||
detector: detector,
|
||||
decoder: decoderType,
|
||||
wgDoneFn: wgDetect.Done,
|
||||
}
|
||||
for k, detector := range chunkSpecificDetectors {
|
||||
decoded.Chunk.Verify = detector.shouldVerify
|
||||
wgDetect.Add(1)
|
||||
e.detectableChunksChan <- detectableChunk{
|
||||
chunk: *decoded.Chunk,
|
||||
detector: detector,
|
||||
decoder: decoded.DecoderType,
|
||||
wgDoneFn: wgDetect.Done,
|
||||
}
|
||||
delete(chunkSpecificDetectors, k)
|
||||
}
|
||||
}
|
||||
}
|
||||
atomic.AddUint64(&e.metrics.ChunksScanned, 1)
|
||||
}
|
||||
wgDetect.Wait()
|
||||
ctx.Logger().V(4).Info("finished scanning chunks")
|
||||
}
|
||||
|
||||
func (e *Engine) detectChunks(ctx context.Context) {
|
||||
|
|
|
@ -2,6 +2,7 @@ package engine
|
|||
|
||||
import (
|
||||
"os"
|
||||
"runtime"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
@ -120,9 +121,10 @@ func BenchmarkGitEngine(b *testing.B) {
|
|||
defer cancel()
|
||||
|
||||
e, err := Start(ctx,
|
||||
WithConcurrency(1),
|
||||
WithConcurrency(uint8(runtime.NumCPU())),
|
||||
WithDecoders(decoders.DefaultDecoders()...),
|
||||
WithDetectors(false, DefaultDetectors()...),
|
||||
WithPrinter(new(discardPrinter)),
|
||||
)
|
||||
assert.Nil(b, err)
|
||||
|
||||
|
@ -133,6 +135,7 @@ func BenchmarkGitEngine(b *testing.B) {
|
|||
}
|
||||
}()
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
// TODO: this is measuring the time it takes to initialize the source
|
||||
// and not to do the full scan
|
||||
|
|
Loading…
Reference in a new issue