require 'helper' # MESGSB = MockEc2SecurityGroupSingleBackend # Abbreviation not used outside this file #=============================================================================# # Constructor Tests #=============================================================================# class AwsSGSConstructor < Minitest::Test def setup AwsSecurityGroup::BackendFactory.select(AwsMESGSB::Empty) end def test_constructor_no_args_raises assert_raises(ArgumentError) { AwsSecurityGroup.new } end def test_constructor_accept_scalar_param AwsSecurityGroup.new('sg-12345678') end def test_constructor_expected_well_formed_args { id: 'sg-1234abcd', group_id: 'sg-1234abcd', vpc_id: 'vpc-1234abcd', group_name: 'some-group', }.each do |param, value| AwsSecurityGroup.new(param => value) end end def test_constructor_reject_malformed_args { id: 'sg-xyz-123', group_id: '1234abcd', vpc_id: 'vpc_1234abcd', }.each do |param, value| assert_raises(ArgumentError) { AwsSecurityGroup.new(param => value) } end end def test_constructor_reject_unknown_resource_params assert_raises(ArgumentError) { AwsSecurityGroup.new(beep: 'boop') } end end #=============================================================================# # Properties #=============================================================================# class AwsSGSProperties < Minitest::Test def setup AwsSecurityGroup::BackendFactory.select(AwsMESGSB::Basic) end def test_property_group_id assert_equal('sg-12345678', AwsSecurityGroup.new('sg-12345678').group_id) assert_nil(AwsSecurityGroup.new(group_name: 'my-group').group_id) end def test_property_group_name assert_equal('beta', AwsSecurityGroup.new('sg-12345678').group_name) assert_nil(AwsSecurityGroup.new('sg-87654321').group_name) end def test_property_vpc_id assert_equal('vpc-aaaabbbb', AwsSecurityGroup.new('sg-aaaabbbb').vpc_id) assert_nil(AwsSecurityGroup.new('sg-87654321').vpc_id) end def test_property_description assert_equal('Awesome Group', AwsSecurityGroup.new('sg-12345678').description) assert_nil(AwsSecurityGroup.new('sg-87654321').description) end def test_property_inbound_rules assert_empty(AwsSecurityGroup.new('sg-87654321').inbound_rules) rules = AwsSecurityGroup.new('sg-12345678').inbound_rules assert_kind_of(Array, rules) assert_kind_of(Hash, rules[0]) end def test_property_outbound_rules assert_empty(AwsSecurityGroup.new('sg-87654321').outbound_rules) rules = AwsSecurityGroup.new('sg-12345678').outbound_rules assert_kind_of(Array, rules) assert_kind_of(Hash, rules[0]) end def test_property_inbound_rules_count assert_equal(0, AwsSecurityGroup.new('sg-aaaabbbb').inbound_rules_count) count = AwsSecurityGroup.new('sg-12345678').inbound_rules_count assert_equal(7, count) assert_kind_of(Numeric, count) end def test_property_outbound_rules_count assert_equal(0, AwsSecurityGroup.new('sg-aaaabbbb').outbound_rules_count) count = AwsSecurityGroup.new('sg-12345678').outbound_rules_count assert_equal(2, count) assert_kind_of(Numeric, count) end end #=============================================================================# # Matchers #=============================================================================# class AwsSGSProperties < Minitest::Test def setup AwsSecurityGroup::BackendFactory.select(AwsMESGSB::Basic) end def test_matcher_allow_criteria_validation sg = AwsSecurityGroup.new('sg-aaaabbbb') rules = sg.inbound_rules assert_raises(ArgumentError, "allow should reject unrecognized criteria") { sg.allow_in?(shoe_size: 9) } [ :from_port, :ipv4_range, :port, :position, :protocol, :to_port, ].each do |criterion| # No errors here sg.allow_in?(criterion => 'dummy') end end def test_matcher_allow_inbound_empty sg = AwsSecurityGroup.new('sg-aaaabbbb') rules = sg.inbound_rules assert_equal(0, rules.count) refute(sg.allow_in?()) # Should we test this - "open" crieria? end def test_matcher_allow_inbound_complex sg = AwsSecurityGroup.new('sg-12345678') assert_equal(3, sg.inbound_rules.count, "count the number of rules for 3-rule group") # Position pinning assert(sg.allow_in?(ipv4_range: "10.1.4.0/24", position: 2), "use numeric position") assert(sg.allow_in?(ipv4_range: "10.1.4.0/24", position: "2"), "use string position") assert(sg.allow_in?(ipv4_range: "10.2.0.0/16", position: :last), "use :last position") assert(sg.allow_in?(port: 22, position: :first), "use :first position") # Port assert(sg.allow_in?(port: 22), "match on a numeric port") assert(sg.allow_in?(port: "22"), "match on a string port") assert(sg.allow_in?(to_port: "22", from_port: "22"), "match on to/from port") assert(sg.allow_in?(port: 9002, position: 3), "range matching on port with allow_in") refute(sg.allow_in_only?(port: 9002, position: 3), "no range matching on port with allow_in_only") assert(sg.allow_in_only?(from_port: 9001, to_port: 9003, position: 3), "exact range matching on port with allow_in_only") # Protocol assert(sg.allow_in?(protocol: 'tcp'), "match on tcp protocol, unpinned") assert(sg.allow_in?(protocol: 'tcp', position: 1), "match on tcp protocol") assert(sg.allow_in?(protocol: 'any', position: 2), "match on our 'any' alias protocol") assert(sg.allow_in?(protocol: '-1', position: 2), "match on AWS spec '-1 for any' protocol") # IPv4 range testing assert(sg.allow_in?(ipv4_range: ["10.1.4.0/24"]), "match on 1 ipv4 range as array") assert(sg.allow_in?(ipv4_range: ["10.1.4.33/32"]), "match on 1 ipv4 range subnet membership") assert(sg.allow_in?(ipv4_range: ["10.1.4.33/32", "10.1.4.82/32"]), "match on 2 addrs ipv4 range subnet membership") assert(sg.allow_in?(ipv4_range: ["10.1.4.0/25", "10.1.4.128/25"]), "match on 2 subnets ipv4 range subnet membership") assert(sg.allow_in_only?(ipv4_range: "10.1.4.0/24", position: 2), "exact match on 1 ipv4 range with _only") refute(sg.allow_in_only?(ipv4_range: "10.1.4.33/32", position: 2), "no range membership ipv4 range with _only") assert(sg.allow_in?(ipv4_range: "10.1.2.0/24"), "match on a list ipv4 range when providing only one value (first)") assert(sg.allow_in?(ipv4_range: "10.1.3.0/24"), "match on a list ipv4 range when providing only one value (last)") assert(sg.allow_in?(ipv4_range: ["10.1.2.33/32", "10.1.3.33/32"]), "match on a list of single IPs against a list of subnets") assert(sg.allow_in?(ipv4_range: ["10.1.2.0/24", "10.1.3.0/24"])) refute(sg.allow_in?(ipv4_range: ["10.1.22.0/24", "10.1.33.0/24"])) assert(sg.allow_in?(ipv4_range: ["10.1.3.0/24", "10.1.2.0/24"])) # Order is ignored assert(sg.allow_in_only?(ipv4_range: ["10.1.2.0/24", "10.1.3.0/24"], position: 1)) refute(sg.allow_in_only?(ipv4_range: ["10.1.2.0/24"], position: 1)) refute(sg.allow_in_only?(ipv4_range: ["10.1.3.0/24"], position: 1)) # IPv6 range testing assert(sg.allow_in?(ipv6_range: ["2001:db8::/122"]), "match on 1 ipv6 range as array") assert(sg.allow_in?(ipv6_range: ["2001:db8::20/128"]), "match on 1 ipv6 range subnet membership") assert(sg.allow_in?(ipv6_range: ["2001:db8::20/128", "2001:db8::3f/128"]), "match on 2 addrs ipv6 range subnet membership") assert(sg.allow_in?(ipv6_range: ["2001:db8::/128", "1968:db8::/124"]), "match on 2 subnets ipv6 range subnet membership") assert(sg.allow_in_only?(ipv6_range: "2018:db8::/122", position: 2), "exact match on 1 ipv6 range with _only") refute(sg.allow_in_only?(ipv6_range: "2001:db8::20/128", position: 2), "no range membership ipv6 range with _only") # Test _only with a 3-rule group, but omitting position refute(sg.allow_in_only?(port: 22), "_only will fail a multi-rule SG even if it has matching criteria") refute(sg.allow_in_only?(), "_only will fail a multi-rule SG even if it has match-any criteria") # Test _only with a single rule group (ie, omitting position) sg = AwsSecurityGroup.new('sg-22223333') assert_equal(1, sg.inbound_rules.count, "count the number of rules for 1-rule group") assert_equal(1, sg.inbound_rules_count, "Count the number of rule variants for 1-rule group") assert(sg.allow_in_only?(ipv4_range: "0.0.0.0/0"), "Match IP range using _only on 1-rule group") assert(sg.allow_in_only?(protocol: 'any'), "Match protocol using _only on 1-rule group") refute(sg.allow_in_only?(port: 22), "no match port using _only on 1-rule group") # Test _only with a single rule group for IPv6 sg = AwsSecurityGroup.new('sg-33334444') assert_equal(1, sg.inbound_rules.count, "count the number of rules for 1-rule ipv6 group") assert_equal(1, sg.inbound_rules_count, "Count the number of rule variants for 1-rule gipv6 roup") assert(sg.allow_in_only?(ipv6_range: "::/0"), "Match IP range using _only on 1-rule ipv6 group") assert(sg.allow_in_only?(protocol: 'any'), "Match protocol using _only on 1-rule ipv6 group") refute(sg.allow_in_only?(port: 22), "no match port using _only on 1-rule ipv6 group") end end #=============================================================================# # Test Fixtures #=============================================================================# module AwsMESGSB class Empty < AwsBackendBase def describe_security_groups(_query) OpenStruct.new({ security_groups: [], }) end end class Basic < AwsBackendBase def describe_security_groups(query) fixtures = [ OpenStruct.new({ description: 'Some Group', group_id: 'sg-aaaabbbb', group_name: 'alpha', vpc_id: 'vpc-aaaabbbb', ip_permissions: [], ip_permissions_egress: [], }), OpenStruct.new({ description: 'Awesome Group', group_id: 'sg-12345678', group_name: 'beta', vpc_id: 'vpc-12345678', ip_permissions: [ OpenStruct.new({ from_port: 22, to_port: 22, ip_protocol: 'tcp', ip_ranges: [ # Apparently AWS returns these as plain hashes, # nested in two levels of Structs. {cidr_ip:"10.1.2.0/24"}, {cidr_ip:"10.1.3.0/24"}, ], ipv_6_ranges: [ {cidr_ipv_6:"2001:db8::/122"}, {cidr_ipv_6:"1968:db8::/124"}, ], }), OpenStruct.new({ from_port: nil, to_port: nil, ip_protocol: "-1", ip_ranges: [ {cidr_ip:"10.1.4.0/24"}, ], ipv_6_ranges: [ {cidr_ipv_6:"2018:db8::/122"} ] }), OpenStruct.new({ from_port: 9001, to_port: 9003, ip_protocol: "udp", ip_ranges: [ {cidr_ip:"10.2.0.0/16"}, ] }), ], ip_permissions_egress: [ OpenStruct.new({ from_port: 123, to_port: 123, ip_protocol: "udp", ip_ranges: [ {cidr_ip:"128.138.140.44/32"}, ], ipv_6_ranges: [ {cidr_ipv_6:"2001:db8::/122"} ] }), ], }), OpenStruct.new({ description: 'Open Group', group_id: 'sg-22223333', group_name: 'gamma', vpc_id: 'vpc-12345678', ip_permissions: [ OpenStruct.new({ from_port: nil, to_port: nil, ip_protocol: "-1", ip_ranges: [ {cidr_ip:"0.0.0.0/0"}, ] }), ], ip_permissions_egress: [], }), OpenStruct.new({ description: 'Open Group', group_id: 'sg-33334444', group_name: 'delta', vpc_id: 'vpc-12345678', ip_permissions: [ OpenStruct.new({ from_port: nil, to_port: nil, ip_protocol: "-1", ipv_6_ranges: [ {cidr_ipv_6:"::/0"}, ] }), ], ip_permissions_egress: [], }), ] selected = fixtures.select do |sg| query[:filters].all? do |filter| filter[:values].include?(sg[filter[:name].tr('-','_')]) end end OpenStruct.new({ security_groups: selected }) end end end