inspec/test/unit/resources/aws_security_group_test.rb

507 lines
20 KiB
Ruby
Raw Normal View History

require "helper"
require "inspec/resource"
require "resources/aws/aws_security_group"
require "resource_support/aws"
# MESGSB = MockEc2SecurityGroupSingleBackend
# Abbreviation not used outside this file
#=============================================================================#
# Constructor Tests
#=============================================================================#
class AwsSGSConstructor < Minitest::Test
def setup
AwsSecurityGroup::BackendFactory.select(AwsMESGSB::Empty)
end
def test_constructor_no_args_raises
assert_raises(ArgumentError) { AwsSecurityGroup.new }
end
def test_constructor_accept_scalar_param
AwsSecurityGroup.new("sg-12345678")
end
def test_constructor_expected_well_formed_args
{
id: "sg-1234abcd",
group_id: "sg-1234abcd",
vpc_id: "vpc-1234abcd",
group_name: "some-group",
}.each do |param, value|
AwsSecurityGroup.new(param => value)
end
end
def test_constructor_reject_malformed_args
{
id: "sg-xyz-123",
group_id: "1234abcd",
vpc_id: "vpc_1234abcd",
}.each do |param, value|
assert_raises(ArgumentError) { AwsSecurityGroup.new(param => value) }
end
end
def test_constructor_reject_unknown_resource_params
assert_raises(ArgumentError) { AwsSecurityGroup.new(beep: "boop") }
end
end
#=============================================================================#
# Properties
#=============================================================================#
class AwsSGSProperties < Minitest::Test
def setup
AwsSecurityGroup::BackendFactory.select(AwsMESGSB::Basic)
end
def test_property_group_id
assert_equal("sg-12345678", AwsSecurityGroup.new("sg-12345678").group_id)
assert_nil(AwsSecurityGroup.new(group_name: "my-group").group_id)
end
def test_property_group_name
assert_equal("beta", AwsSecurityGroup.new("sg-12345678").group_name)
assert_nil(AwsSecurityGroup.new("sg-87654321").group_name)
end
def test_property_vpc_id
assert_equal("vpc-aaaabbbb", AwsSecurityGroup.new("sg-aaaabbbb").vpc_id)
assert_nil(AwsSecurityGroup.new("sg-87654321").vpc_id)
end
def test_property_description
assert_equal("Awesome Group", AwsSecurityGroup.new("sg-12345678").description)
assert_nil(AwsSecurityGroup.new("sg-87654321").description)
end
def test_property_inbound_rules
assert_empty(AwsSecurityGroup.new("sg-87654321").inbound_rules)
rules = AwsSecurityGroup.new("sg-12345678").inbound_rules
assert_kind_of(Array, rules)
assert_kind_of(Hash, rules[0])
end
def test_property_outbound_rules
assert_empty(AwsSecurityGroup.new("sg-87654321").outbound_rules)
rules = AwsSecurityGroup.new("sg-12345678").outbound_rules
assert_kind_of(Array, rules)
assert_kind_of(Hash, rules[0])
end
def test_property_inbound_rules_count
assert_equal(0, AwsSecurityGroup.new("sg-aaaabbbb").inbound_rules_count)
count = AwsSecurityGroup.new("sg-12345678").inbound_rules_count
assert_equal(7, count)
assert_kind_of(Numeric, count)
end
def test_property_outbound_rules_count
assert_equal(0, AwsSecurityGroup.new("sg-aaaabbbb").outbound_rules_count)
count = AwsSecurityGroup.new("sg-12345678").outbound_rules_count
assert_equal(2, count)
assert_kind_of(Numeric, count)
end
end
#=============================================================================#
# Matchers
#=============================================================================#
class AwsSGSMatchers < Minitest::Test
def setup
AwsSecurityGroup::BackendFactory.select(AwsMESGSB::Basic)
end
def test_matcher_allow_criteria_validation
sg = AwsSecurityGroup.new("sg-aaaabbbb")
assert_raises(ArgumentError, "allow should reject unrecognized criteria") { sg.allow_in?(shoe_size: 9) }
%i{
from_port
ipv4_range
port
position
protocol
to_port
security_group
}.each do |criterion|
# No errors here
sg.allow_in?(criterion => "dummy")
end
end
def test_matcher_allow_inbound_empty
sg = AwsSecurityGroup.new("sg-aaaabbbb")
rules = sg.inbound_rules
assert_equal(0, rules.count)
refute(sg.allow_in?) # Should we test this - "open" criteria?
end
def test_matcher_allow_inbound_complex
sg = AwsSecurityGroup.new("sg-12345678")
assert_equal(3, sg.inbound_rules.count, "count the number of rules for 3-rule group")
# Position pinning
assert(sg.allow_in?(ipv4_range: "10.1.4.0/24", position: 2), "use numeric position")
assert(sg.allow_in?(ipv4_range: "10.1.4.0/24", position: "2"), "use string position")
assert(sg.allow_in?(ipv4_range: "10.2.0.0/16", position: :last), "use :last position")
assert(sg.allow_in?(port: 22, position: :first), "use :first position")
# Port
assert(sg.allow_in?(port: 22), "match on a numeric port")
assert(sg.allow_in?(port: "22"), "match on a string port")
assert(sg.allow_in?(to_port: "22", from_port: "22"), "match on to/from port")
assert(sg.allow_in?(port: 9002, position: 3), "range matching on port with allow_in")
refute(sg.allow_in_only?(port: 9002, position: 3), "no range matching on port with allow_in_only")
assert(sg.allow_in_only?(from_port: 9001, to_port: 9003, position: 3), "exact range matching on port with allow_in_only")
# Protocol
assert(sg.allow_in?(protocol: "tcp"), "match on tcp protocol, unpinned")
assert(sg.allow_in?(protocol: "tcp", position: 1), "match on tcp protocol")
assert(sg.allow_in?(protocol: "any", position: 2), "match on our 'any' alias protocol")
assert(sg.allow_in?(protocol: "-1", position: 2), "match on AWS spec '-1 for any' protocol")
# IPv4 range testing
assert(sg.allow_in?(ipv4_range: ["10.1.4.0/24"]), "match on 1 ipv4 range as array")
assert(sg.allow_in?(ipv4_range: ["10.1.4.33/32"]), "match on 1 ipv4 range subnet membership")
assert(sg.allow_in?(ipv4_range: ["10.1.4.33/32", "10.1.4.82/32"]), "match on 2 addrs ipv4 range subnet membership")
assert(sg.allow_in?(ipv4_range: ["10.1.4.0/25", "10.1.4.128/25"]), "match on 2 subnets ipv4 range subnet membership")
assert(sg.allow_in_only?(ipv4_range: "10.1.4.0/24", position: 2), "exact match on 1 ipv4 range with _only")
refute(sg.allow_in_only?(ipv4_range: "10.1.4.33/32", position: 2), "no range membership ipv4 range with _only")
assert(sg.allow_in?(ipv4_range: "10.1.2.0/24"), "match on a list ipv4 range when providing only one value (first)")
assert(sg.allow_in?(ipv4_range: "10.1.3.0/24"), "match on a list ipv4 range when providing only one value (last)")
assert(sg.allow_in?(ipv4_range: ["10.1.2.33/32", "10.1.3.33/32"]), "match on a list of single IPs against a list of subnets")
assert(sg.allow_in?(ipv4_range: ["10.1.2.0/24", "10.1.3.0/24"]))
refute(sg.allow_in?(ipv4_range: ["10.1.22.0/24", "10.1.33.0/24"]))
assert(sg.allow_in?(ipv4_range: ["10.1.3.0/24", "10.1.2.0/24"])) # Order is ignored
assert(sg.allow_in_only?(ipv4_range: ["10.1.2.0/24", "10.1.3.0/24"], position: 1))
refute(sg.allow_in_only?(ipv4_range: ["10.1.2.0/24"], position: 1))
refute(sg.allow_in_only?(ipv4_range: ["10.1.3.0/24"], position: 1))
# IPv6 range testing
assert(sg.allow_in?(ipv6_range: ["2001:db8::/122"]), "match on 1 ipv6 range as array")
assert(sg.allow_in?(ipv6_range: ["2001:db8::20/128"]), "match on 1 ipv6 range subnet membership")
assert(sg.allow_in?(ipv6_range: ["2001:db8::20/128", "2001:db8::3f/128"]), "match on 2 addrs ipv6 range subnet membership")
assert(sg.allow_in?(ipv6_range: ["2001:db8::/128", "1968:db8::/124"]), "match on 2 subnets ipv6 range subnet membership")
assert(sg.allow_in_only?(ipv6_range: "2018:db8::/122", position: 2), "exact match on 1 ipv6 range with _only")
refute(sg.allow_in_only?(ipv6_range: "2001:db8::20/128", position: 2), "no range membership ipv6 range with _only")
# Test _only with a 3-rule group, but omitting position
refute(sg.allow_in_only?(port: 22), "_only will fail a multi-rule SG even if it has matching criteria")
refute(sg.allow_in_only?, "_only will fail a multi-rule SG even if it has match-any criteria")
# Test _only with a single rule group (ie, omitting position)
sg = AwsSecurityGroup.new("sg-22223333")
assert_equal(1, sg.inbound_rules.count, "count the number of rules for 1-rule group")
assert_equal(1, sg.inbound_rules_count, "Count the number of rule variants for 1-rule group")
assert(sg.allow_in_only?(ipv4_range: "0.0.0.0/0"), "Match IP range using _only on 1-rule group")
assert(sg.allow_in_only?(protocol: "any"), "Match protocol using _only on 1-rule group")
refute(sg.allow_in_only?(port: 22), "no match port using _only on 1-rule group")
# Test _only with a single rule group for IPv6
sg = AwsSecurityGroup.new("sg-33334444")
assert_equal(1, sg.inbound_rules.count, "count the number of rules for 1-rule ipv6 group")
assert_equal(1, sg.inbound_rules_count, "Count the number of rule variants for 1-rule ipv6 group")
assert(sg.allow_in_only?(ipv6_range: "::/0"), "Match IP range using _only on 1-rule ipv6 group")
assert(sg.allow_in_only?(protocol: "any"), "Match protocol using _only on 1-rule ipv6 group")
refute(sg.allow_in_only?(port: 22), "no match port using _only on 1-rule ipv6 group")
# security-group
sg = AwsSecurityGroup.new("sg-55556666")
assert(sg.allow_in?(security_group: "sg-33334441"), "match on group-id")
assert(sg.allow_in?(security_group: "sg-33334441", port: 22), "match on group-id, numeric port")
assert(sg.allow_in?(security_group: "sg-33334441", port: "22"), "match on group-id, string port")
assert(sg.allow_in?(security_group: "sg-33334441", to_port: "22", from_port: "22"), "match on group-id, to/from port")
assert(sg.allow_in?(port: 9002, position: 3), "range matching on port with allow_in")
refute(sg.allow_in_only?(port: 9002, position: 3), "no range matching on port with allow_in_only")
refute(sg.allow_in_only?(security_group: "sg-33334441"), "no matching on group with allow_in_only when multiple group rules")
assert(sg.allow_in_only?(from_port: 9001, to_port: 9003, position: 3), "exact range matching on port with allow_in_only")
# Test _only with a single rule group for security-group
sg = AwsSecurityGroup.new("sg-33334441")
assert_equal(1, sg.inbound_rules.count, "count the number of rules for 1-rule security-group")
assert_equal(1, sg.inbound_rules_count, "Count the number of rule variants for 1-rule security-group")
assert(sg.allow_in_only?(security_group: "sg-33334444"), "Match security-group using _only on 1-rule security-group")
assert(sg.allow_in_only?(protocol: "any", security_group: "sg-33334444"), "Match protocol using _only on 1-rule security-group")
refute(sg.allow_in_only?(port: 22, security_group: "sg-33334444"), "no match port using _only on 1-rule security-group")
# Test _only with a single rule group for security-group with position pinning
sg = AwsSecurityGroup.new("sg-33334442")
assert(sg.allow_in_only?(security_group: "sg-33334444", position: 2), "Match security-group using _only with numerical position")
assert(sg.allow_in_only?(protocol: "any", security_group: "sg-33334444", position: 2), "Match protocol using _only on 1-rule security-group with numerical position")
refute(sg.allow_in_only?(port: 22, security_group: "sg-33334444", position: 2), "no match port using _only on 1-rule security-group with numerical position")
assert(sg.allow_in_only?(security_group: "sg-33334444", position: "2"), "Match security-group using _only with string position")
assert(sg.allow_in_only?(security_group: "sg-33334444", position: :last), "Match security-group using _only with last position")
end
end
#=============================================================================#
# Test Fixtures
#=============================================================================#
module AwsMESGSB
class Empty < AwsBackendBase
def describe_security_groups(_query)
OpenStruct.new({
security_groups: [],
})
end
end
class Basic < AwsBackendBase
def describe_security_groups(query)
fixtures = [
OpenStruct.new({
description: "Some Group",
group_id: "sg-aaaabbbb",
group_name: "alpha",
vpc_id: "vpc-aaaabbbb",
ip_permissions: [],
ip_permissions_egress: [],
}),
OpenStruct.new({
description: "Awesome Group",
group_id: "sg-12345678",
group_name: "beta",
vpc_id: "vpc-12345678",
ip_permissions: [
OpenStruct.new({
from_port: 22,
to_port: 22,
ip_protocol: "tcp",
ip_ranges: [
# Apparently AWS returns these as plain hashes,
# nested in two levels of Structs.
{ cidr_ip: "10.1.2.0/24" },
{ cidr_ip: "10.1.3.0/24" },
],
ipv_6_ranges: [
{ cidr_ipv_6: "2001:db8::/122" },
{ cidr_ipv_6: "1968:db8::/124" },
],
}),
OpenStruct.new({
from_port: nil,
to_port: nil,
ip_protocol: "-1",
ip_ranges: [
{ cidr_ip: "10.1.4.0/24" },
],
ipv_6_ranges: [
{ cidr_ipv_6: "2018:db8::/122" },
],
}),
OpenStruct.new({
from_port: 9001,
to_port: 9003,
ip_protocol: "udp",
ip_ranges: [
{ cidr_ip: "10.2.0.0/16" },
],
}),
],
ip_permissions_egress: [
OpenStruct.new({
from_port: 123,
to_port: 123,
ip_protocol: "udp",
ip_ranges: [
{ cidr_ip: "128.138.140.44/32" },
],
ipv_6_ranges: [
{ cidr_ipv_6: "2001:db8::/122" },
],
}),
],
}),
OpenStruct.new({
description: "Open Group",
group_id: "sg-22223333",
group_name: "gamma",
vpc_id: "vpc-12345678",
ip_permissions: [
OpenStruct.new({
from_port: nil,
to_port: nil,
ip_protocol: "-1",
ip_ranges: [
{ cidr_ip: "0.0.0.0/0" },
],
}),
],
ip_permissions_egress: [],
}),
OpenStruct.new({
description: "Open Group",
group_id: "sg-33334444",
group_name: "delta",
vpc_id: "vpc-12345678",
ip_permissions: [
OpenStruct.new({
from_port: nil,
to_port: nil,
ip_protocol: "-1",
ipv_6_ranges: [
{ cidr_ipv_6: "::/0" },
],
}),
],
ip_permissions_egress: [],
}),
OpenStruct.new({
description: "Open for group one group rule second position",
group_id: "sg-33334442",
group_name: "etha",
vpc_id: "vpc-12345678",
ip_permissions: [
OpenStruct.new({
from_port: nil,
to_port: nil,
ip_protocol: "-1",
ipv_6_ranges: [
{ cidr_ipv_6: "::/0" },
],
}),
OpenStruct.new({
from_port: nil,
to_port: nil,
ip_protocol: "-1",
user_id_group_pairs: [
OpenStruct.new({
description: "Open for group one rule second position",
group_id: "sg-33334444",
group_name: "delta",
peering_status: "",
user_id: "123456789012",
vpc_id: "",
vpc_peering_connection_id: "",
}),
],
}),
],
ip_permissions_egress: [],
}),
OpenStruct.new({
description: "Open for group one rule",
group_id: "sg-33334441",
group_name: "zeta",
vpc_id: "vpc-12345678",
ip_permissions: [
OpenStruct.new({
from_port: nil,
to_port: nil,
ip_protocol: "-1",
user_id_group_pairs: [
OpenStruct.new({
description: "Open for group one rule",
group_id: "sg-33334444",
group_name: "delta",
peering_status: "",
user_id: "123456789012",
vpc_id: "",
vpc_peering_connection_id: "",
}),
],
}),
],
ip_permissions_egress: [],
}),
OpenStruct.new({
description: "Open for group",
group_id: "sg-55556666",
group_name: "epsilon",
vpc_id: "vpc-12345678",
ip_permissions: [
OpenStruct.new({
from_port: 80,
to_port: 443,
ip_protocol: "-1",
ip_ranges: [
{ cidr_ip: "0.0.0.0/0" },
],
}),
OpenStruct.new({
from_port: 22,
to_port: 22,
ip_protocol: "-1",
user_id_group_pairs: [
OpenStruct.new({
description: "Open for group rule 2",
group_id: "sg-33334441",
group_name: "zeta",
peering_status: "",
user_id: "123456789012",
vpc_id: "",
vpc_peering_connection_id: "",
}),
],
}),
OpenStruct.new({
from_port: 9001,
to_port: 9003,
ip_protocol: "-1",
user_id_group_pairs: [
OpenStruct.new({
description: "Open for group rule 3",
group_id: "sg-33334441",
group_name: "zeta",
peering_status: "",
user_id: "123456789012",
vpc_id: "",
vpc_peering_connection_id: "",
}),
],
}),
OpenStruct.new({
from_port: nil,
to_port: nil,
ip_protocol: "-1",
user_id_group_pairs: [
OpenStruct.new({
description: "allow all from multiple sg",
group_id: "sg-33334441",
group_name: "zeta",
peering_status: "",
user_id: "123456789012",
vpc_id: "",
vpc_peering_connection_id: "",
}),
OpenStruct.new({
description: "allow all from multiple sg[2]",
group_id: "sg-33334442",
group_name: "etha",
peering_status: "",
user_id: "123456789012",
vpc_id: "",
vpc_peering_connection_id: "",
}),
OpenStruct.new({
description: "allow all from multiple sg[3]",
group_id: "sg-11112222",
group_name: "theta",
peering_status: "",
user_id: "123456789012",
vpc_id: "",
vpc_peering_connection_id: "",
}),
],
}),
],
ip_permissions_egress: [],
}) ]
selected = fixtures.select do |sg|
query[:filters].all? do |filter|
filter[:values].include?(sg[filter[:name].tr("-", "_")])
end
end
OpenStruct.new({ security_groups: selected })
end
end
end