mirror of
https://github.com/anchore/syft
synced 2024-11-10 06:14:16 +00:00
Add java/npm to inline comparison (#235)
* Adds java and npm package comparison * Adds probable matching of extra packages syft found and missing packages that syft did not find (but inline did). This way there is a section of output that fuzzy-matches the package names to get a better sense of "real" problems (actual missing packages) vs slightly mismatched metadata during troubleshooting. * Adds a set or probable missing packages to the report based on the probable matches (again, to aid in troubleshooting) * Fixes image reference clean function to support references with registries * Only shows metadata differences when the package was found by both inline and syft * Splits the inline-compare code into more manageable pieces Signed-off-by: Alex Goodman <alex.goodman@anchore.com>
This commit is contained in:
parent
f19cb03aa0
commit
f9407d0ce4
9 changed files with 563 additions and 241 deletions
1
test/inline-compare/.gitignore
vendored
1
test/inline-compare/.gitignore
vendored
|
@ -1,2 +1,3 @@
|
|||
*.json
|
||||
*.pyc
|
||||
inline-reports
|
|
@ -2,7 +2,7 @@ ifndef SYFT_CMD
|
|||
SYFT_CMD = go run ../../main.go
|
||||
endif
|
||||
|
||||
IMAGE_CLEAN = $(shell echo $(COMPARE_IMAGE) | tr ":" "_")
|
||||
IMAGE_CLEAN = $(shell basename $(COMPARE_IMAGE) | tr ":" "_" )
|
||||
SYFT_DIR = syft-reports
|
||||
SYFT_REPORT = $(SYFT_DIR)/$(IMAGE_CLEAN).json
|
||||
INLINE_DIR = inline-reports
|
||||
|
|
|
@ -1,14 +1,16 @@
|
|||
#!/usr/bin/env python3
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import collections
|
||||
|
||||
import utils.package
|
||||
from utils.format import Colors, print_rows
|
||||
from utils.inline import InlineScan
|
||||
from utils.syft import Syft
|
||||
|
||||
QUALITY_GATE_THRESHOLD = 0.95
|
||||
INDENT = " "
|
||||
IMAGE_QUALITY_GATE = collections.defaultdict(lambda: QUALITY_GATE_THRESHOLD, **{
|
||||
|
||||
})
|
||||
IMAGE_QUALITY_GATE = collections.defaultdict(lambda: QUALITY_GATE_THRESHOLD, **{})
|
||||
|
||||
# We additionally fail if an image is above a particular threshold. Why? We expect the lower threshold to be 90%,
|
||||
# however additional functionality in grype is still being implemented, so this threshold may not be able to be met.
|
||||
|
@ -16,271 +18,198 @@ IMAGE_QUALITY_GATE = collections.defaultdict(lambda: QUALITY_GATE_THRESHOLD, **{
|
|||
# issues/enhancements are done we want to ensure that the lower threshold is bumped up to catch regression. The only way
|
||||
# to do this is to select an upper threshold for images with known threshold values, so we have a failure that
|
||||
# loudly indicates the lower threshold should be bumped.
|
||||
IMAGE_UPPER_THRESHOLD = collections.defaultdict(lambda: 1, **{
|
||||
|
||||
})
|
||||
Metadata = collections.namedtuple("Metadata", "version")
|
||||
Package = collections.namedtuple("Package", "name type")
|
||||
IMAGE_UPPER_THRESHOLD = collections.defaultdict(lambda: 1, **{})
|
||||
|
||||
|
||||
class InlineScan:
|
||||
|
||||
report_tmpl = "{image}-{report}.json"
|
||||
|
||||
def __init__(self, image, report_dir="./"):
|
||||
self.report_dir = report_dir
|
||||
self.image = image
|
||||
|
||||
def packages(self):
|
||||
python_packages, python_metadata = self._python_packages()
|
||||
gem_packages, gem_metadata = self._gem_packages()
|
||||
os_packages, os_metadata = self._os_packages()
|
||||
return python_packages | os_packages | gem_packages , {**python_metadata, **os_metadata, **gem_metadata}
|
||||
|
||||
def _report_path(self, report):
|
||||
return os.path.join(
|
||||
self.report_dir,
|
||||
self.report_tmpl.format(image=self.image.replace(":", "_"), report=report),
|
||||
def report(analysis):
|
||||
if analysis.extra_packages:
|
||||
rows = []
|
||||
print(
|
||||
Colors.bold + "Syft found extra packages:",
|
||||
Colors.reset,
|
||||
"Syft discovered packages that Inline did not",
|
||||
)
|
||||
for package in sorted(list(analysis.extra_packages)):
|
||||
rows.append([INDENT, repr(package)])
|
||||
print_rows(rows)
|
||||
print()
|
||||
|
||||
def _enumerate_section(self, report, section):
|
||||
report_path = self._report_path(report=report)
|
||||
os_report_path = self._report_path(report="content-os")
|
||||
|
||||
if os.path.exists(os_report_path) and not os.path.exists(report_path):
|
||||
# if the OS report is there but the target report is not, that is engine's way of saying "no findings"
|
||||
return
|
||||
|
||||
with open(report_path) as json_file:
|
||||
data = json.load(json_file)
|
||||
for entry in data[section]:
|
||||
yield entry
|
||||
|
||||
def _python_packages(self):
|
||||
packages = set()
|
||||
metadata = collections.defaultdict(dict)
|
||||
for entry in self._enumerate_section(
|
||||
report="content-python", section="content"
|
||||
):
|
||||
package = Package(name=entry["package"], type=entry["type"].lower(),)
|
||||
packages.add(package)
|
||||
metadata[package.type][package] = Metadata(version=entry["version"])
|
||||
|
||||
return packages, metadata
|
||||
|
||||
def _gem_packages(self):
|
||||
packages = set()
|
||||
metadata = collections.defaultdict(dict)
|
||||
for entry in self._enumerate_section(
|
||||
report="content-gem", section="content"
|
||||
):
|
||||
package = Package(name=entry["package"], type=entry["type"].lower(),)
|
||||
packages.add(package)
|
||||
metadata[package.type][package] = Metadata(version=entry["version"])
|
||||
|
||||
return packages, metadata
|
||||
|
||||
def _os_packages(self):
|
||||
packages = set()
|
||||
metadata = collections.defaultdict(dict)
|
||||
for entry in self._enumerate_section(report="content-os", section="content"):
|
||||
package = Package(name=entry["package"], type=entry["type"].lower())
|
||||
packages.add(package)
|
||||
metadata[package.type][package] = Metadata(version=entry["version"])
|
||||
|
||||
return packages, metadata
|
||||
|
||||
|
||||
class Syft:
|
||||
|
||||
report_tmpl = "{image}.json"
|
||||
|
||||
def __init__(self, image, report_dir="./"):
|
||||
self.report_path = os.path.join(
|
||||
report_dir, self.report_tmpl.format(image=image.replace(":", "_"))
|
||||
if analysis.missing_packages:
|
||||
rows = []
|
||||
print(
|
||||
Colors.bold + "Syft missed packages:",
|
||||
Colors.reset,
|
||||
"Inline discovered packages that Syft did not",
|
||||
)
|
||||
for package in sorted(list(analysis.missing_packages)):
|
||||
rows.append([INDENT, repr(package)])
|
||||
print_rows(rows)
|
||||
print()
|
||||
|
||||
def _enumerate_section(self, section):
|
||||
with open(self.report_path) as json_file:
|
||||
data = json.load(json_file)
|
||||
for entry in data[section]:
|
||||
yield entry
|
||||
if analysis.missing_metadata:
|
||||
rows = []
|
||||
print(
|
||||
Colors.bold + "Syft mismatched metadata:",
|
||||
Colors.reset,
|
||||
"the packages between Syft and Inline are the same, the metadata is not",
|
||||
)
|
||||
for inline_metadata_pair in sorted(list(analysis.missing_metadata)):
|
||||
pkg, metadata = inline_metadata_pair
|
||||
if pkg not in analysis.syft_data.metadata[pkg.type]:
|
||||
continue
|
||||
syft_metadata_item = analysis.syft_data.metadata[pkg.type][pkg]
|
||||
rows.append(
|
||||
[
|
||||
INDENT,
|
||||
"for:",
|
||||
repr(pkg),
|
||||
":",
|
||||
repr(syft_metadata_item),
|
||||
"!=",
|
||||
repr(metadata),
|
||||
]
|
||||
)
|
||||
if rows:
|
||||
print_rows(rows)
|
||||
else:
|
||||
print(
|
||||
INDENT,
|
||||
"There are mismatches, but only due to packages Syft did not find (but inline did).",
|
||||
)
|
||||
print()
|
||||
|
||||
def packages(self):
|
||||
packages = set()
|
||||
metadata = collections.defaultdict(dict)
|
||||
for entry in self._enumerate_section(section="artifacts"):
|
||||
if analysis.similar_missing_packages:
|
||||
rows = []
|
||||
print(
|
||||
Colors.bold + "Probably pairings of missing/extra packages:",
|
||||
Colors.reset,
|
||||
"to aid in troubleshooting missed/extra packages",
|
||||
)
|
||||
for similar_packages in analysis.similar_missing_packages:
|
||||
rows.append(
|
||||
[
|
||||
INDENT,
|
||||
repr(similar_packages.pkg),
|
||||
"--->",
|
||||
repr(similar_packages.missed),
|
||||
]
|
||||
)
|
||||
print_rows(rows)
|
||||
print()
|
||||
|
||||
# normalize to inline
|
||||
pkg_type = entry["type"].lower()
|
||||
if pkg_type in ("wheel", "egg"):
|
||||
pkg_type = "python"
|
||||
elif pkg_type in ("deb",):
|
||||
pkg_type = "dpkg"
|
||||
elif pkg_type in ("java-archive",):
|
||||
pkg_type = "java"
|
||||
elif pkg_type in ("apk",):
|
||||
pkg_type = "apkg"
|
||||
if analysis.unmatched_missing_packages and analysis.extra_packages:
|
||||
rows = []
|
||||
print(
|
||||
Colors.bold + "Probably missed packages:",
|
||||
Colors.reset,
|
||||
"a probable pair was not found",
|
||||
)
|
||||
for p in analysis.unmatched_missing_packages:
|
||||
rows.append([INDENT, repr(p)])
|
||||
print_rows(rows)
|
||||
print()
|
||||
|
||||
package = Package(name=entry["name"], type=pkg_type,)
|
||||
print(Colors.bold + "Summary:", Colors.reset)
|
||||
print(" Inline Packages : %d" % len(analysis.inline_data.packages))
|
||||
print(" Syft Packages : %d" % len(analysis.syft_data.packages))
|
||||
print(
|
||||
" (extra) : %d (note: this is ignored in the analysis!)"
|
||||
% len(analysis.extra_packages)
|
||||
)
|
||||
print(" (missing) : %d" % len(analysis.missing_packages))
|
||||
print()
|
||||
|
||||
packages.add(package)
|
||||
metadata[package.type][package] = Metadata(version=entry["version"])
|
||||
return packages, metadata
|
||||
if analysis.unmatched_missing_packages and analysis.extra_packages:
|
||||
print(
|
||||
" Probable Package Matches : %d (matches not made, but were probably found by both Inline and Syft)"
|
||||
% len(analysis.similar_missing_packages)
|
||||
)
|
||||
print(
|
||||
" Probable Packages Matched : %2.3f %% (%d/%d packages)"
|
||||
% (
|
||||
analysis.percent_probable_overlapping_packages,
|
||||
len(analysis.overlapping_packages)
|
||||
+ len(analysis.similar_missing_packages),
|
||||
len(analysis.inline_data.packages),
|
||||
)
|
||||
)
|
||||
print(
|
||||
" Probable Packages Missing : %d "
|
||||
% len(analysis.unmatched_missing_packages)
|
||||
)
|
||||
print()
|
||||
print(
|
||||
" Baseline Packages Matched : %2.3f %% (%d/%d packages)"
|
||||
% (
|
||||
analysis.percent_overlapping_packages,
|
||||
len(analysis.overlapping_packages),
|
||||
len(analysis.inline_data.packages),
|
||||
)
|
||||
)
|
||||
print(
|
||||
" Baseline Metadata Matched : %2.3f %% (%d/%d metadata)"
|
||||
% (
|
||||
analysis.percent_overlapping_metadata,
|
||||
len(analysis.overlapping_metadata),
|
||||
len(analysis.inline_metadata),
|
||||
)
|
||||
)
|
||||
|
||||
overall_score = (
|
||||
analysis.percent_overlapping_packages + analysis.percent_overlapping_metadata
|
||||
) / 2.0
|
||||
|
||||
def print_rows(rows):
|
||||
if not rows:
|
||||
return
|
||||
widths = []
|
||||
for col, _ in enumerate(rows[0]):
|
||||
width = max(len(row[col]) for row in rows) + 2 # padding
|
||||
widths.append(width)
|
||||
for row in rows:
|
||||
print("".join(word.ljust(widths[col_idx]) for col_idx, word in enumerate(row)))
|
||||
print(Colors.bold + " Overall Score: %2.1f %%" % overall_score, Colors.reset)
|
||||
|
||||
|
||||
def main(image):
|
||||
print(colors.bold+"Image:", image, colors.reset)
|
||||
cwd = os.path.dirname(os.path.abspath(__file__))
|
||||
|
||||
inline = InlineScan(image=image, report_dir="inline-reports")
|
||||
inline_packages, inline_metadata = inline.packages()
|
||||
# parse the inline-scan and syft reports on disk
|
||||
inline = InlineScan(image=image, report_dir=os.path.join(cwd, "inline-reports"))
|
||||
syft = Syft(image=image, report_dir=os.path.join(cwd, "syft-reports"))
|
||||
|
||||
syft = Syft(image=image, report_dir="syft-reports")
|
||||
syft_packages, syft_metadata = syft.packages()
|
||||
|
||||
if len(inline_packages) == 0:
|
||||
# we are purposefully selecting test images that are guaranteed to have packages, so this should never happen
|
||||
print(colors.bold + colors.fg.red + "inline found no packages!", colors.reset)
|
||||
return 1
|
||||
|
||||
if len(syft_packages) == 0 and len(inline_packages) == 0:
|
||||
print("nobody found any packages")
|
||||
return 0
|
||||
|
||||
same_packages = syft_packages & inline_packages
|
||||
percent_overlap_packages = (
|
||||
float(len(same_packages)) / float(len(inline_packages))
|
||||
) * 100.0
|
||||
|
||||
bonus_packages = syft_packages - inline_packages
|
||||
missing_packages = inline_packages - syft_packages
|
||||
|
||||
inline_metadata_set = set()
|
||||
for package in inline_packages:
|
||||
metadata = inline_metadata[package.type][package]
|
||||
inline_metadata_set.add((package, metadata))
|
||||
|
||||
syft_overlap_metadata_set = set()
|
||||
for package in syft_packages:
|
||||
metadata = syft_metadata[package.type][package]
|
||||
# we only want to really count mismatched metadata for packages that are at least found by inline
|
||||
if package in inline_metadata.get(package.type, []):
|
||||
syft_overlap_metadata_set.add((package, metadata))
|
||||
|
||||
same_metadata = syft_overlap_metadata_set & inline_metadata_set
|
||||
percent_overlap_metadata = (
|
||||
float(len(same_metadata)) / float(len(inline_metadata_set))
|
||||
) * 100.0
|
||||
missing_metadata = inline_metadata_set - same_metadata
|
||||
|
||||
if bonus_packages:
|
||||
rows = []
|
||||
print(colors.bold + "Syft found extra packages:", colors.reset)
|
||||
for package in sorted(list(bonus_packages)):
|
||||
rows.append([INDENT, repr(package)])
|
||||
print_rows(rows)
|
||||
print()
|
||||
|
||||
if missing_packages:
|
||||
rows = []
|
||||
print(colors.bold + "Syft missed packages:", colors.reset)
|
||||
for package in sorted(list(missing_packages)):
|
||||
rows.append([INDENT, repr(package)])
|
||||
print_rows(rows)
|
||||
print()
|
||||
|
||||
if missing_metadata:
|
||||
rows = []
|
||||
print(colors.bold + "Syft mismatched metadata:", colors.reset)
|
||||
for inline_metadata_pair in sorted(list(missing_metadata)):
|
||||
pkg, metadata = inline_metadata_pair
|
||||
if pkg in syft_metadata[pkg.type]:
|
||||
syft_metadata_item = syft_metadata[pkg.type][pkg]
|
||||
else:
|
||||
syft_metadata_item = "--- MISSING ---"
|
||||
rows.append([INDENT, "for:", repr(pkg), ":", repr(syft_metadata_item), "!=", repr(metadata)])
|
||||
print_rows(rows)
|
||||
print()
|
||||
|
||||
print(colors.bold+"Summary:", colors.reset)
|
||||
print(" Image: %s" % image)
|
||||
print(" Inline Packages : %d" % len(inline_packages))
|
||||
print(" Syft Packages : %d" % len(syft_packages))
|
||||
print(" (extra) : %d" % len(bonus_packages))
|
||||
print(" (missing) : %d" % len(missing_packages))
|
||||
print(
|
||||
" Baseline Packages Matched: %2.3f %% (%d/%d packages)"
|
||||
% (percent_overlap_packages, len(same_packages), len(inline_packages))
|
||||
)
|
||||
print(
|
||||
" Baseline Metadata Matched: %2.3f %% (%d/%d metadata)"
|
||||
% (percent_overlap_metadata, len(same_metadata), len(inline_metadata_set))
|
||||
# analyze the raw data to generate all derivative data for the report and quality gate
|
||||
analysis = utils.package.Analysis(
|
||||
syft_data=syft.packages(), inline_data=inline.packages()
|
||||
)
|
||||
|
||||
overall_score = (percent_overlap_packages + percent_overlap_metadata) / 2.0
|
||||
|
||||
print(colors.bold + " Overall Score: %2.1f %%" % overall_score, colors.reset)
|
||||
# show some useful report data for debugging / warm fuzzies
|
||||
report(analysis)
|
||||
|
||||
# enforce a quality gate based on the comparison of package values and metadata values
|
||||
upper_gate_value = IMAGE_UPPER_THRESHOLD[image] * 100
|
||||
lower_gate_value = IMAGE_QUALITY_GATE[image] * 100
|
||||
if overall_score < lower_gate_value:
|
||||
print(colors.bold + " Quality Gate: " + colors.fg.red + "FAILED (is not >= %d %%)\n" % lower_gate_value, colors.reset)
|
||||
if analysis.quality_gate_score < lower_gate_value:
|
||||
print(
|
||||
Colors.bold
|
||||
+ " Quality Gate: "
|
||||
+ Colors.FG.red
|
||||
+ "FAILED (is not >= %d %%)\n" % lower_gate_value,
|
||||
Colors.reset,
|
||||
)
|
||||
return 1
|
||||
elif overall_score > upper_gate_value:
|
||||
print(colors.bold + " Quality Gate: " + colors.fg.orange + "FAILED (lower threshold is artificially low and should be updated)\n", colors.reset)
|
||||
elif analysis.quality_gate_score > upper_gate_value:
|
||||
print(
|
||||
Colors.bold
|
||||
+ " Quality Gate: "
|
||||
+ Colors.FG.orange
|
||||
+ "FAILED (lower threshold is artificially low and should be updated)\n",
|
||||
Colors.reset,
|
||||
)
|
||||
return 1
|
||||
else:
|
||||
print(colors.bold + " Quality Gate: " + colors.fg.green + "pass (>= %d %%)\n" % lower_gate_value, colors.reset)
|
||||
print(
|
||||
Colors.bold
|
||||
+ " Quality Gate: "
|
||||
+ Colors.FG.green
|
||||
+ "pass (>= %d %%)\n" % lower_gate_value,
|
||||
Colors.reset,
|
||||
)
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
class colors:
|
||||
reset='\033[0m'
|
||||
bold='\033[01m'
|
||||
disable='\033[02m'
|
||||
underline='\033[04m'
|
||||
reverse='\033[07m'
|
||||
strikethrough='\033[09m'
|
||||
invisible='\033[08m'
|
||||
class fg:
|
||||
black='\033[30m'
|
||||
red='\033[31m'
|
||||
green='\033[32m'
|
||||
orange='\033[33m'
|
||||
blue='\033[34m'
|
||||
purple='\033[35m'
|
||||
cyan='\033[36m'
|
||||
lightgrey='\033[37m'
|
||||
darkgrey='\033[90m'
|
||||
lightred='\033[91m'
|
||||
lightgreen='\033[92m'
|
||||
yellow='\033[93m'
|
||||
lightblue='\033[94m'
|
||||
pink='\033[95m'
|
||||
lightcyan='\033[96m'
|
||||
class bg:
|
||||
black='\033[40m'
|
||||
red='\033[41m'
|
||||
green='\033[42m'
|
||||
orange='\033[43m'
|
||||
blue='\033[44m'
|
||||
purple='\033[45m'
|
||||
cyan='\033[46m'
|
||||
lightgrey='\033[47m'
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if len(sys.argv) != 2:
|
||||
sys.exit("provide an image")
|
||||
|
|
0
test/inline-compare/utils/__init__.py
Normal file
0
test/inline-compare/utils/__init__.py
Normal file
46
test/inline-compare/utils/format.py
Normal file
46
test/inline-compare/utils/format.py
Normal file
|
@ -0,0 +1,46 @@
|
|||
class Colors:
|
||||
reset = "\033[0m"
|
||||
bold = "\033[01m"
|
||||
disable = "\033[02m"
|
||||
underline = "\033[04m"
|
||||
reverse = "\033[07m"
|
||||
strikethrough = "\033[09m"
|
||||
invisible = "\033[08m"
|
||||
|
||||
class FG:
|
||||
black = "\033[30m"
|
||||
red = "\033[31m"
|
||||
green = "\033[32m"
|
||||
orange = "\033[33m"
|
||||
blue = "\033[34m"
|
||||
purple = "\033[35m"
|
||||
cyan = "\033[36m"
|
||||
lightgrey = "\033[37m"
|
||||
darkgrey = "\033[90m"
|
||||
lightred = "\033[91m"
|
||||
lightgreen = "\033[92m"
|
||||
yellow = "\033[93m"
|
||||
lightblue = "\033[94m"
|
||||
pink = "\033[95m"
|
||||
lightcyan = "\033[96m"
|
||||
|
||||
class BG:
|
||||
black = "\033[40m"
|
||||
red = "\033[41m"
|
||||
green = "\033[42m"
|
||||
orange = "\033[43m"
|
||||
blue = "\033[44m"
|
||||
purple = "\033[45m"
|
||||
cyan = "\033[46m"
|
||||
lightgrey = "\033[47m"
|
||||
|
||||
|
||||
def print_rows(rows):
|
||||
if not rows:
|
||||
return
|
||||
widths = []
|
||||
for col, _ in enumerate(rows[0]):
|
||||
width = max(len(row[col]) for row in rows) + 2 # padding
|
||||
widths.append(width)
|
||||
for row in rows:
|
||||
print("".join(word.ljust(widths[col_idx]) for col_idx, word in enumerate(row)))
|
5
test/inline-compare/utils/image.py
Normal file
5
test/inline-compare/utils/image.py
Normal file
|
@ -0,0 +1,5 @@
|
|||
import os
|
||||
|
||||
|
||||
def clean(image: str) -> str:
|
||||
return os.path.basename(image.replace(":", "_"))
|
131
test/inline-compare/utils/inline.py
Normal file
131
test/inline-compare/utils/inline.py
Normal file
|
@ -0,0 +1,131 @@
|
|||
import os
|
||||
import json
|
||||
import collections
|
||||
|
||||
import utils.package
|
||||
import utils.image
|
||||
|
||||
|
||||
class InlineScan:
|
||||
"""
|
||||
Class for parsing inlnie-scan output files into a set of packages and package metadata.
|
||||
"""
|
||||
report_tmpl = "{image}-{report}.json"
|
||||
|
||||
def __init__(self, image, report_dir):
|
||||
self.report_dir = report_dir
|
||||
self.image = image
|
||||
|
||||
def packages(self):
|
||||
python_packages, python_metadata = self._python_packages()
|
||||
gem_packages, gem_metadata = self._gem_packages()
|
||||
java_packages, java_metadata = self._java_packages()
|
||||
npm_packages, npm_metadata = self._npm_packages()
|
||||
os_packages, os_metadata = self._os_packages()
|
||||
|
||||
packages = (
|
||||
python_packages | os_packages | gem_packages | java_packages | npm_packages
|
||||
)
|
||||
metadata = {
|
||||
**python_metadata,
|
||||
**os_metadata,
|
||||
**gem_metadata,
|
||||
**java_metadata,
|
||||
**npm_metadata,
|
||||
}
|
||||
|
||||
return utils.package.Info(packages=frozenset(packages), metadata=metadata)
|
||||
|
||||
def _report_path(self, report):
|
||||
return os.path.join(
|
||||
self.report_dir,
|
||||
self.report_tmpl.format(image=utils.image.clean(self.image), report=report),
|
||||
)
|
||||
|
||||
def _enumerate_section(self, report, section):
|
||||
report_path = self._report_path(report=report)
|
||||
os_report_path = self._report_path(report="content-os")
|
||||
|
||||
if os.path.exists(os_report_path) and not os.path.exists(report_path):
|
||||
# if the OS report is there but the target report is not, that is engine's way of saying "no findings"
|
||||
return
|
||||
|
||||
with open(report_path) as json_file:
|
||||
data = json.load(json_file)
|
||||
for entry in data[section]:
|
||||
yield entry
|
||||
|
||||
def _java_packages(self):
|
||||
packages = set()
|
||||
metadata = collections.defaultdict(dict)
|
||||
for entry in self._enumerate_section(report="content-java", section="content"):
|
||||
# normalize to pseudo-inline
|
||||
pkg_type = entry["type"].lower()
|
||||
if pkg_type in ("java-jar", "java-war", "java-ear"):
|
||||
pkg_type = "java-?ar"
|
||||
elif pkg_type in ("java-jpi", "java-hpi"):
|
||||
pkg_type = "java-?pi"
|
||||
|
||||
pkg = utils.package.Package(
|
||||
name=entry["package"],
|
||||
type=pkg_type,
|
||||
)
|
||||
packages.add(pkg)
|
||||
metadata[pkg.type][pkg] = utils.package.Metadata(
|
||||
version=entry["maven-version"]
|
||||
)
|
||||
|
||||
return packages, metadata
|
||||
|
||||
def _npm_packages(self):
|
||||
packages = set()
|
||||
metadata = collections.defaultdict(dict)
|
||||
for entry in self._enumerate_section(report="content-npm", section="content"):
|
||||
pkg = utils.package.Package(
|
||||
name=entry["package"],
|
||||
type=entry["type"].lower(),
|
||||
)
|
||||
packages.add(pkg)
|
||||
metadata[pkg.type][pkg] = utils.package.Metadata(version=entry["version"])
|
||||
|
||||
return packages, metadata
|
||||
|
||||
def _python_packages(self):
|
||||
packages = set()
|
||||
metadata = collections.defaultdict(dict)
|
||||
for entry in self._enumerate_section(
|
||||
report="content-python", section="content"
|
||||
):
|
||||
pkg = utils.package.Package(
|
||||
name=entry["package"],
|
||||
type=entry["type"].lower(),
|
||||
)
|
||||
packages.add(pkg)
|
||||
metadata[pkg.type][pkg] = utils.package.Metadata(version=entry["version"])
|
||||
|
||||
return packages, metadata
|
||||
|
||||
def _gem_packages(self):
|
||||
packages = set()
|
||||
metadata = collections.defaultdict(dict)
|
||||
for entry in self._enumerate_section(report="content-gem", section="content"):
|
||||
pkg = utils.package.Package(
|
||||
name=entry["package"],
|
||||
type=entry["type"].lower(),
|
||||
)
|
||||
packages.add(pkg)
|
||||
metadata[pkg.type][pkg] = utils.package.Metadata(version=entry["version"])
|
||||
|
||||
return packages, metadata
|
||||
|
||||
def _os_packages(self):
|
||||
packages = set()
|
||||
metadata = collections.defaultdict(dict)
|
||||
for entry in self._enumerate_section(report="content-os", section="content"):
|
||||
pkg = utils.package.Package(
|
||||
name=entry["package"], type=entry["type"].lower()
|
||||
)
|
||||
packages.add(pkg)
|
||||
metadata[pkg.type][pkg] = utils.package.Metadata(version=entry["version"])
|
||||
|
||||
return packages, metadata
|
156
test/inline-compare/utils/package.py
Normal file
156
test/inline-compare/utils/package.py
Normal file
|
@ -0,0 +1,156 @@
|
|||
import difflib
|
||||
import collections
|
||||
import dataclasses
|
||||
from typing import Set, FrozenSet, Tuple, Any, List
|
||||
|
||||
Metadata = collections.namedtuple("Metadata", "version")
|
||||
Package = collections.namedtuple("Package", "name type")
|
||||
Info = collections.namedtuple("Info", "packages metadata")
|
||||
|
||||
SimilarPackages = collections.namedtuple("SimilarPackages", "pkg missed")
|
||||
ProbableMatch = collections.namedtuple("ProbableMatch", "pkg ratio")
|
||||
|
||||
|
||||
@dataclasses.dataclass()
|
||||
class Analysis:
|
||||
"""
|
||||
A package metadata analysis class. When given the raw syft and inline data, all necessary derivative information
|
||||
needed to do a comparison of package and metadata is performed, allowing callers to interpret the results
|
||||
"""
|
||||
|
||||
# all raw data from the inline scan and syft reports
|
||||
syft_data: Info
|
||||
inline_data: Info
|
||||
|
||||
# all derivative information (derived from the raw data above)
|
||||
overlapping_packages: FrozenSet[Package] = dataclasses.field(init=False)
|
||||
extra_packages: FrozenSet[Package] = dataclasses.field(init=False)
|
||||
missing_packages: FrozenSet[Package] = dataclasses.field(init=False)
|
||||
|
||||
inline_metadata: Set[Tuple[Any, Any]] = dataclasses.field(init=False)
|
||||
missing_metadata: Set[Tuple[Any, Any]] = dataclasses.field(init=False)
|
||||
overlapping_metadata: Set[Tuple[Any, Any]] = dataclasses.field(init=False)
|
||||
|
||||
similar_missing_packages: List[Package] = dataclasses.field(init=False)
|
||||
unmatched_missing_packages: List[Package] = dataclasses.field(init=False)
|
||||
|
||||
def __post_init__(self):
|
||||
if not self.valid():
|
||||
raise RuntimeError("invalid data given")
|
||||
|
||||
# basic sets derived from package information
|
||||
self.overlapping_packages = self.syft_data.packages & self.inline_data.packages
|
||||
self.extra_packages = self.syft_data.packages - self.inline_data.packages
|
||||
self.missing_packages = self.inline_data.packages - self.syft_data.packages
|
||||
|
||||
# basic sets derived from metadata information
|
||||
self.inline_metadata = self._inline_metadata()
|
||||
self.overlapping_metadata = self._overlapping_metadata()
|
||||
self.missing_metadata = self.inline_metadata - self.overlapping_metadata
|
||||
|
||||
# try to account for potential false negatives by pairing extra packages discovered only by syft with missing
|
||||
# packages discovered only by inline scan.
|
||||
(
|
||||
similar_missing_packages,
|
||||
unmatched_missing_packages,
|
||||
) = self._pair_similar_packages(self.extra_packages, self.missing_packages)
|
||||
self.similar_missing_packages = similar_missing_packages
|
||||
self.unmatched_missing_packages = unmatched_missing_packages
|
||||
|
||||
def valid(self) -> bool:
|
||||
# we are purposefully selecting test images that are guaranteed to have packages (this should never happen).
|
||||
# ... if it does, then this analysis is not valid!
|
||||
return bool(self.inline_data.packages)
|
||||
|
||||
def _inline_metadata(self):
|
||||
"""
|
||||
Returns the set of inline scan metadata paired with the corresponding package info.
|
||||
"""
|
||||
inline_metadata_set = set()
|
||||
for package in self.inline_data.packages:
|
||||
metadata = self.inline_data.metadata[package.type][package]
|
||||
inline_metadata_set.add((package, metadata))
|
||||
return inline_metadata_set
|
||||
|
||||
def _overlapping_metadata(self):
|
||||
"""
|
||||
Returns the metadata which has been found similar between both syft and inline scan.
|
||||
"""
|
||||
syft_overlap_metadata_set = set()
|
||||
for package in self.syft_data.packages:
|
||||
metadata = self.syft_data.metadata[package.type][package]
|
||||
# we only want to really count mismatched metadata for packages that are at least found by inline
|
||||
if package in self.inline_data.metadata.get(package.type, []):
|
||||
syft_overlap_metadata_set.add((package, metadata))
|
||||
|
||||
return syft_overlap_metadata_set & self.inline_metadata
|
||||
|
||||
@staticmethod
|
||||
def _pair_similar_packages(extra_packages, missing_packages, similar_threshold=0.7):
|
||||
"""
|
||||
Try to account for potential false negatives by pairing extra packages discovered only by syft with missing
|
||||
packages discovered only by inline scan.
|
||||
"""
|
||||
matches = collections.defaultdict(set)
|
||||
found = {}
|
||||
for s in extra_packages:
|
||||
for i in missing_packages:
|
||||
ratio = difflib.SequenceMatcher(None, s.name, i.name).ratio()
|
||||
if ratio >= similar_threshold:
|
||||
if i in found:
|
||||
# only allow for an inline package to be paired once
|
||||
if ratio < found[i]:
|
||||
continue
|
||||
else:
|
||||
matches[s].discard(i)
|
||||
|
||||
# persist the result
|
||||
found[i] = ratio
|
||||
matches[s].add(i)
|
||||
|
||||
results = []
|
||||
for s, i_set in matches.items():
|
||||
missed = tuple([ProbableMatch(pkg=i, ratio=found[i]) for i in i_set])
|
||||
results.append(SimilarPackages(pkg=s, missed=missed))
|
||||
|
||||
not_found = [i for i in missing_packages if i not in found]
|
||||
|
||||
return sorted(results, key=lambda x: x.pkg), sorted(
|
||||
not_found, key=lambda x: x.name
|
||||
)
|
||||
|
||||
@property
|
||||
def percent_overlapping_packages(self):
|
||||
"""Returns a percentage representing how many packages that were found relative to the number of expected"""
|
||||
return (
|
||||
float(len(self.overlapping_packages))
|
||||
/ float(len(self.inline_data.packages))
|
||||
) * 100.0
|
||||
|
||||
@property
|
||||
def percent_overlapping_metadata(self):
|
||||
"""Returns a percentage representing how many matching metdata that were found relative to the number of expected"""
|
||||
return (
|
||||
float(len(self.overlapping_metadata)) / float(len(self.inline_metadata))
|
||||
) * 100.0
|
||||
|
||||
@property
|
||||
def percent_probable_overlapping_packages(self):
|
||||
"""
|
||||
Returns a percentage representing how many packages that were found relative to the number of expected after
|
||||
considering pairing of missing packages with extra packages in a fuzzy match.
|
||||
"""
|
||||
return (
|
||||
float(len(self.overlapping_packages) + len(self.similar_missing_packages))
|
||||
/ float(len(self.inline_data.packages))
|
||||
) * 100.0
|
||||
|
||||
@property
|
||||
def quality_gate_score(self):
|
||||
"""
|
||||
The result of the analysis in the form of an aggregated percentage; it is up to the caller to use this value
|
||||
and enforce a quality gate.
|
||||
"""
|
||||
return (
|
||||
self.percent_overlapping_packages + self.percent_overlapping_metadata
|
||||
) / 2.0
|
54
test/inline-compare/utils/syft.py
Normal file
54
test/inline-compare/utils/syft.py
Normal file
|
@ -0,0 +1,54 @@
|
|||
import os
|
||||
import json
|
||||
import collections
|
||||
|
||||
import utils.package
|
||||
import utils.image
|
||||
|
||||
|
||||
class Syft:
|
||||
"""
|
||||
Class for parsing syft output into a set of packages and package metadata.
|
||||
"""
|
||||
report_tmpl = "{image}.json"
|
||||
|
||||
def __init__(self, image, report_dir):
|
||||
self.report_path = os.path.join(
|
||||
report_dir, self.report_tmpl.format(image=utils.image.clean(image))
|
||||
)
|
||||
|
||||
def _enumerate_section(self, section):
|
||||
with open(self.report_path) as json_file:
|
||||
data = json.load(json_file)
|
||||
for entry in data[section]:
|
||||
yield entry
|
||||
|
||||
def packages(self):
|
||||
packages = set()
|
||||
metadata = collections.defaultdict(dict)
|
||||
for entry in self._enumerate_section(section="artifacts"):
|
||||
|
||||
# normalize to inline
|
||||
pkg_type = entry["type"].lower()
|
||||
if pkg_type in ("wheel", "egg", "python"):
|
||||
pkg_type = "python"
|
||||
elif pkg_type in ("deb",):
|
||||
pkg_type = "dpkg"
|
||||
elif pkg_type in ("java-archive",):
|
||||
# normalize to pseudo-inline
|
||||
pkg_type = "java-?ar"
|
||||
elif pkg_type in ("jenkins-plugin",):
|
||||
# normalize to pseudo-inline
|
||||
pkg_type = "java-?pi"
|
||||
elif pkg_type in ("apk",):
|
||||
pkg_type = "apkg"
|
||||
|
||||
pkg = utils.package.Package(
|
||||
name=entry["name"],
|
||||
type=pkg_type,
|
||||
)
|
||||
|
||||
packages.add(pkg)
|
||||
metadata[pkg.type][pkg] = utils.package.Metadata(version=entry["version"])
|
||||
|
||||
return utils.package.Info(packages=frozenset(packages), metadata=metadata)
|
Loading…
Reference in a new issue