mirror of
https://github.com/inspec/inspec
synced 2024-12-26 21:13:25 +00:00
242bee9ce6
Add inbound_rules_count and outbound_rules_count for total variants Signed-off-by: Martin Logan <martinloganzz@gmail.com>
335 lines
13 KiB
Ruby
335 lines
13 KiB
Ruby
require 'helper'
|
|
|
|
# MESGSB = MockEc2SecurityGroupSingleBackend
|
|
# Abbreviation not used outside this file
|
|
|
|
#=============================================================================#
|
|
# Constructor Tests
|
|
#=============================================================================#
|
|
class AwsSGSConstructor < Minitest::Test
|
|
def setup
|
|
AwsSecurityGroup::BackendFactory.select(AwsMESGSB::Empty)
|
|
end
|
|
|
|
def test_constructor_no_args_raises
|
|
assert_raises(ArgumentError) { AwsSecurityGroup.new }
|
|
end
|
|
|
|
def test_constructor_accept_scalar_param
|
|
AwsSecurityGroup.new('sg-12345678')
|
|
end
|
|
|
|
def test_constructor_expected_well_formed_args
|
|
{
|
|
id: 'sg-1234abcd',
|
|
group_id: 'sg-1234abcd',
|
|
vpc_id: 'vpc-1234abcd',
|
|
group_name: 'some-group',
|
|
}.each do |param, value|
|
|
AwsSecurityGroup.new(param => value)
|
|
end
|
|
end
|
|
|
|
def test_constructor_reject_malformed_args
|
|
{
|
|
id: 'sg-xyz-123',
|
|
group_id: '1234abcd',
|
|
vpc_id: 'vpc_1234abcd',
|
|
}.each do |param, value|
|
|
assert_raises(ArgumentError) { AwsSecurityGroup.new(param => value) }
|
|
end
|
|
end
|
|
|
|
def test_constructor_reject_unknown_resource_params
|
|
assert_raises(ArgumentError) { AwsSecurityGroup.new(beep: 'boop') }
|
|
end
|
|
end
|
|
|
|
#=============================================================================#
|
|
# Properties
|
|
#=============================================================================#
|
|
|
|
class AwsSGSProperties < Minitest::Test
|
|
def setup
|
|
AwsSecurityGroup::BackendFactory.select(AwsMESGSB::Basic)
|
|
end
|
|
|
|
def test_property_group_id
|
|
assert_equal('sg-12345678', AwsSecurityGroup.new('sg-12345678').group_id)
|
|
assert_nil(AwsSecurityGroup.new(group_name: 'my-group').group_id)
|
|
end
|
|
|
|
def test_property_group_name
|
|
assert_equal('beta', AwsSecurityGroup.new('sg-12345678').group_name)
|
|
assert_nil(AwsSecurityGroup.new('sg-87654321').group_name)
|
|
end
|
|
|
|
def test_property_vpc_id
|
|
assert_equal('vpc-aaaabbbb', AwsSecurityGroup.new('sg-aaaabbbb').vpc_id)
|
|
assert_nil(AwsSecurityGroup.new('sg-87654321').vpc_id)
|
|
end
|
|
|
|
def test_property_description
|
|
assert_equal('Awesome Group', AwsSecurityGroup.new('sg-12345678').description)
|
|
assert_nil(AwsSecurityGroup.new('sg-87654321').description)
|
|
end
|
|
|
|
def test_property_inbound_rules
|
|
assert_empty(AwsSecurityGroup.new('sg-87654321').inbound_rules)
|
|
rules = AwsSecurityGroup.new('sg-12345678').inbound_rules
|
|
assert_kind_of(Array, rules)
|
|
assert_kind_of(Hash, rules[0])
|
|
end
|
|
|
|
def test_property_outbound_rules
|
|
assert_empty(AwsSecurityGroup.new('sg-87654321').outbound_rules)
|
|
rules = AwsSecurityGroup.new('sg-12345678').outbound_rules
|
|
assert_kind_of(Array, rules)
|
|
assert_kind_of(Hash, rules[0])
|
|
end
|
|
|
|
def test_property_inbound_rules_count
|
|
assert_equal(0, AwsSecurityGroup.new('sg-aaaabbbb').inbound_rules_count)
|
|
count = AwsSecurityGroup.new('sg-12345678').inbound_rules_count
|
|
assert_equal(7, count)
|
|
assert_kind_of(Numeric, count)
|
|
end
|
|
|
|
def test_property_outbound_rules_count
|
|
assert_equal(0, AwsSecurityGroup.new('sg-aaaabbbb').outbound_rules_count)
|
|
count = AwsSecurityGroup.new('sg-12345678').outbound_rules_count
|
|
assert_equal(2, count)
|
|
assert_kind_of(Numeric, count)
|
|
end
|
|
end
|
|
|
|
#=============================================================================#
|
|
# Matchers
|
|
#=============================================================================#
|
|
|
|
class AwsSGSProperties < Minitest::Test
|
|
def setup
|
|
AwsSecurityGroup::BackendFactory.select(AwsMESGSB::Basic)
|
|
end
|
|
|
|
def test_matcher_allow_criteria_validation
|
|
sg = AwsSecurityGroup.new('sg-aaaabbbb')
|
|
rules = sg.inbound_rules
|
|
assert_raises(ArgumentError, "allow should reject unrecognized criteria") { sg.allow_in?(shoe_size: 9) }
|
|
[
|
|
:from_port,
|
|
:ipv4_range,
|
|
:port,
|
|
:position,
|
|
:protocol,
|
|
:to_port,
|
|
].each do |criterion|
|
|
# No errors here
|
|
sg.allow_in?(criterion => 'dummy')
|
|
end
|
|
end
|
|
|
|
def test_matcher_allow_inbound_empty
|
|
sg = AwsSecurityGroup.new('sg-aaaabbbb')
|
|
rules = sg.inbound_rules
|
|
assert_equal(0, rules.count)
|
|
refute(sg.allow_in?()) # Should we test this - "open" crieria?
|
|
end
|
|
|
|
def test_matcher_allow_inbound_complex
|
|
sg = AwsSecurityGroup.new('sg-12345678')
|
|
assert_equal(3, sg.inbound_rules.count, "count the number of rules for 3-rule group")
|
|
|
|
# Position pinning
|
|
assert(sg.allow_in?(ipv4_range: "10.1.4.0/24", position: 2), "use numeric position")
|
|
assert(sg.allow_in?(ipv4_range: "10.1.4.0/24", position: "2"), "use string position")
|
|
assert(sg.allow_in?(ipv4_range: "10.2.0.0/16", position: :last), "use :last position")
|
|
assert(sg.allow_in?(port: 22, position: :first), "use :first position")
|
|
|
|
# Port
|
|
assert(sg.allow_in?(port: 22), "match on a numeric port")
|
|
assert(sg.allow_in?(port: "22"), "match on a string port")
|
|
assert(sg.allow_in?(to_port: "22", from_port: "22"), "match on to/from port")
|
|
assert(sg.allow_in?(port: 9002, position: 3), "range matching on port with allow_in")
|
|
refute(sg.allow_in_only?(port: 9002, position: 3), "no range matching on port with allow_in_only")
|
|
assert(sg.allow_in_only?(from_port: 9001, to_port: 9003, position: 3), "exact range matching on port with allow_in_only")
|
|
|
|
# Protocol
|
|
assert(sg.allow_in?(protocol: 'tcp'), "match on tcp protocol, unpinned")
|
|
assert(sg.allow_in?(protocol: 'tcp', position: 1), "match on tcp protocol")
|
|
assert(sg.allow_in?(protocol: 'any', position: 2), "match on our 'any' alias protocol")
|
|
assert(sg.allow_in?(protocol: '-1', position: 2), "match on AWS spec '-1 for any' protocol")
|
|
|
|
# IPv4 range testing
|
|
assert(sg.allow_in?(ipv4_range: ["10.1.4.0/24"]), "match on 1 ipv4 range as array")
|
|
assert(sg.allow_in?(ipv4_range: ["10.1.4.33/32"]), "match on 1 ipv4 range subnet membership")
|
|
assert(sg.allow_in?(ipv4_range: ["10.1.4.33/32", "10.1.4.82/32"]), "match on 2 addrs ipv4 range subnet membership")
|
|
assert(sg.allow_in?(ipv4_range: ["10.1.4.0/25", "10.1.4.128/25"]), "match on 2 subnets ipv4 range subnet membership")
|
|
assert(sg.allow_in_only?(ipv4_range: "10.1.4.0/24", position: 2), "exact match on 1 ipv4 range with _only")
|
|
refute(sg.allow_in_only?(ipv4_range: "10.1.4.33/32", position: 2), "no range membership ipv4 range with _only")
|
|
assert(sg.allow_in?(ipv4_range: "10.1.2.0/24"), "match on a list ipv4 range when providing only one value (first)")
|
|
assert(sg.allow_in?(ipv4_range: "10.1.3.0/24"), "match on a list ipv4 range when providing only one value (last)")
|
|
assert(sg.allow_in?(ipv4_range: ["10.1.2.33/32", "10.1.3.33/32"]), "match on a list of single IPs against a list of subnets")
|
|
assert(sg.allow_in?(ipv4_range: ["10.1.2.0/24", "10.1.3.0/24"]))
|
|
refute(sg.allow_in?(ipv4_range: ["10.1.22.0/24", "10.1.33.0/24"]))
|
|
assert(sg.allow_in?(ipv4_range: ["10.1.3.0/24", "10.1.2.0/24"])) # Order is ignored
|
|
assert(sg.allow_in_only?(ipv4_range: ["10.1.2.0/24", "10.1.3.0/24"], position: 1))
|
|
refute(sg.allow_in_only?(ipv4_range: ["10.1.2.0/24"], position: 1))
|
|
refute(sg.allow_in_only?(ipv4_range: ["10.1.3.0/24"], position: 1))
|
|
|
|
# IPv6 range testing
|
|
assert(sg.allow_in?(ipv6_range: ["2001:db8::/122"]), "match on 1 ipv6 range as array")
|
|
assert(sg.allow_in?(ipv6_range: ["2001:db8::20/128"]), "match on 1 ipv6 range subnet membership")
|
|
assert(sg.allow_in?(ipv6_range: ["2001:db8::20/128", "2001:db8::3f/128"]), "match on 2 addrs ipv6 range subnet membership")
|
|
assert(sg.allow_in?(ipv6_range: ["2001:db8::/128", "1968:db8::/124"]), "match on 2 subnets ipv6 range subnet membership")
|
|
assert(sg.allow_in_only?(ipv6_range: "2018:db8::/122", position: 2), "exact match on 1 ipv6 range with _only")
|
|
refute(sg.allow_in_only?(ipv6_range: "2001:db8::20/128", position: 2), "no range membership ipv6 range with _only")
|
|
|
|
# Test _only with a 3-rule group, but omitting position
|
|
refute(sg.allow_in_only?(port: 22), "_only will fail a multi-rule SG even if it has matching criteria")
|
|
refute(sg.allow_in_only?(), "_only will fail a multi-rule SG even if it has match-any criteria")
|
|
|
|
# Test _only with a single rule group (ie, omitting position)
|
|
sg = AwsSecurityGroup.new('sg-22223333')
|
|
assert_equal(1, sg.inbound_rules.count, "count the number of rules for 1-rule group")
|
|
assert_equal(1, sg.inbound_rules_count, "Count the number of rule variants for 1-rule group")
|
|
assert(sg.allow_in_only?(ipv4_range: "0.0.0.0/0"), "Match IP range using _only on 1-rule group")
|
|
assert(sg.allow_in_only?(protocol: 'any'), "Match protocol using _only on 1-rule group")
|
|
refute(sg.allow_in_only?(port: 22), "no match port using _only on 1-rule group")
|
|
|
|
# Test _only with a single rule group for IPv6
|
|
sg = AwsSecurityGroup.new('sg-33334444')
|
|
assert_equal(1, sg.inbound_rules.count, "count the number of rules for 1-rule ipv6 group")
|
|
assert_equal(1, sg.inbound_rules_count, "Count the number of rule variants for 1-rule gipv6 roup")
|
|
assert(sg.allow_in_only?(ipv6_range: "::/0"), "Match IP range using _only on 1-rule ipv6 group")
|
|
assert(sg.allow_in_only?(protocol: 'any'), "Match protocol using _only on 1-rule ipv6 group")
|
|
refute(sg.allow_in_only?(port: 22), "no match port using _only on 1-rule ipv6 group")
|
|
end
|
|
end
|
|
|
|
|
|
#=============================================================================#
|
|
# Test Fixtures
|
|
#=============================================================================#
|
|
|
|
module AwsMESGSB
|
|
class Empty < AwsBackendBase
|
|
def describe_security_groups(_query)
|
|
OpenStruct.new({
|
|
security_groups: [],
|
|
})
|
|
end
|
|
end
|
|
|
|
class Basic < AwsBackendBase
|
|
def describe_security_groups(query)
|
|
fixtures = [
|
|
OpenStruct.new({
|
|
description: 'Some Group',
|
|
group_id: 'sg-aaaabbbb',
|
|
group_name: 'alpha',
|
|
vpc_id: 'vpc-aaaabbbb',
|
|
ip_permissions: [],
|
|
ip_permissions_egress: [],
|
|
}),
|
|
OpenStruct.new({
|
|
description: 'Awesome Group',
|
|
group_id: 'sg-12345678',
|
|
group_name: 'beta',
|
|
vpc_id: 'vpc-12345678',
|
|
ip_permissions: [
|
|
OpenStruct.new({
|
|
from_port: 22,
|
|
to_port: 22,
|
|
ip_protocol: 'tcp',
|
|
ip_ranges: [
|
|
# Apparently AWS returns these as plain hashes,
|
|
# nested in two levels of Structs.
|
|
{cidr_ip:"10.1.2.0/24"},
|
|
{cidr_ip:"10.1.3.0/24"},
|
|
],
|
|
ipv_6_ranges: [
|
|
{cidr_ipv_6:"2001:db8::/122"},
|
|
{cidr_ipv_6:"1968:db8::/124"},
|
|
],
|
|
}),
|
|
OpenStruct.new({
|
|
from_port: nil,
|
|
to_port: nil,
|
|
ip_protocol: "-1",
|
|
ip_ranges: [
|
|
{cidr_ip:"10.1.4.0/24"},
|
|
],
|
|
ipv_6_ranges: [
|
|
{cidr_ipv_6:"2018:db8::/122"}
|
|
]
|
|
}),
|
|
OpenStruct.new({
|
|
from_port: 9001,
|
|
to_port: 9003,
|
|
ip_protocol: "udp",
|
|
ip_ranges: [
|
|
{cidr_ip:"10.2.0.0/16"},
|
|
]
|
|
}),
|
|
],
|
|
ip_permissions_egress: [
|
|
OpenStruct.new({
|
|
from_port: 123,
|
|
to_port: 123,
|
|
ip_protocol: "udp",
|
|
ip_ranges: [
|
|
{cidr_ip:"128.138.140.44/32"},
|
|
],
|
|
ipv_6_ranges: [
|
|
{cidr_ipv_6:"2001:db8::/122"}
|
|
]
|
|
}),
|
|
],
|
|
}),
|
|
OpenStruct.new({
|
|
description: 'Open Group',
|
|
group_id: 'sg-22223333',
|
|
group_name: 'gamma',
|
|
vpc_id: 'vpc-12345678',
|
|
ip_permissions: [
|
|
OpenStruct.new({
|
|
from_port: nil,
|
|
to_port: nil,
|
|
ip_protocol: "-1",
|
|
ip_ranges: [
|
|
{cidr_ip:"0.0.0.0/0"},
|
|
]
|
|
}),
|
|
],
|
|
ip_permissions_egress: [],
|
|
}),
|
|
OpenStruct.new({
|
|
description: 'Open Group',
|
|
group_id: 'sg-33334444',
|
|
group_name: 'delta',
|
|
vpc_id: 'vpc-12345678',
|
|
ip_permissions: [
|
|
OpenStruct.new({
|
|
from_port: nil,
|
|
to_port: nil,
|
|
ip_protocol: "-1",
|
|
ipv_6_ranges: [
|
|
{cidr_ipv_6:"::/0"},
|
|
]
|
|
}),
|
|
],
|
|
ip_permissions_egress: [],
|
|
}), ]
|
|
|
|
selected = fixtures.select do |sg|
|
|
query[:filters].all? do |filter|
|
|
filter[:values].include?(sg[filter[:name].tr('-','_')])
|
|
end
|
|
end
|
|
|
|
OpenStruct.new({ security_groups: selected })
|
|
end
|
|
end
|
|
|
|
end
|