inspec/lib/resources/aws/aws_security_group.rb
Timothy van Zadelhoff 5739cb2d6b aws_security_group: Query against other security group ids in allow_* matchers (#3576)
* add security-group to security-group rules
* update docs
* Add integration tests for security-group to security-group rules
* rubocop fix
*     Add one security group rule, with position.
* make control fit description

Signed-off-by: Timothy van Zadelhoff <timothy.inspec@theothersolution.nl
2018-11-13 13:25:33 -05:00

299 lines
9.6 KiB
Ruby

require 'set'
require 'ipaddr'
class AwsSecurityGroup < Inspec.resource(1)
name 'aws_security_group'
desc 'Verifies settings for an individual AWS Security Group.'
example "
describe aws_security_group('sg-12345678') do
it { should exist }
end
"
supports platform: 'aws'
include AwsSingularResourceMixin
attr_reader :description, :group_id, :group_name, :vpc_id, :inbound_rules, :outbound_rules, :inbound_rules_count, :outbound_rules_count
def to_s
"EC2 Security Group #{@group_id}"
end
def allow_in?(criteria = {})
allow(inbound_rules, criteria.dup)
end
RSpec::Matchers.alias_matcher :allow_in, :be_allow_in
def allow_out?(criteria = {})
allow(outbound_rules, criteria.dup)
end
RSpec::Matchers.alias_matcher :allow_out, :be_allow_out
def allow_in_only?(criteria = {})
allow_only(inbound_rules, criteria.dup)
end
RSpec::Matchers.alias_matcher :allow_in_only, :be_allow_in_only
def allow_out_only?(criteria = {})
allow_only(outbound_rules, criteria.dup)
end
RSpec::Matchers.alias_matcher :allow_out_only, :be_allow_out_only
private
def allow_only(rules, criteria)
rules = allow__focus_on_position(rules, criteria)
# allow_{in_out}_only require either a single-rule group, or you
# to select a rule using position.
return false unless rules.count == 1 || criteria.key?(:position)
if criteria.key?(:security_group)
if criteria.key?(:position)
pos = criteria[:position] -1
else
pos = 0
end
return false unless rules[pos].key?(:user_id_group_pairs) && rules[pos][:user_id_group_pairs].count == 1
end
criteria[:exact] = true
allow(rules, criteria)
end
def allow(rules, criteria)
criteria = allow__check_criteria(criteria)
rules = allow__focus_on_position(rules, criteria)
rules.any? do |rule|
matched = true
matched &&= allow__match_port(rule, criteria)
matched &&= allow__match_protocol(rule, criteria)
matched &&= allow__match_ipv4_range(rule, criteria)
matched &&= allow__match_ipv6_range(rule, criteria)
matched &&= allow__match_security_group(rule, criteria)
matched
end
end
def allow__check_criteria(raw_criteria)
allowed_criteria = [
:from_port,
:ipv4_range,
:ipv6_range,
:security_group,
:port,
:position,
:protocol,
:to_port,
:exact, # Internal
]
recognized_criteria = {}
allowed_criteria.each do |expected_criterion|
if raw_criteria.key?(expected_criterion)
recognized_criteria[expected_criterion] = raw_criteria.delete(expected_criterion)
end
end
# Any leftovers are unwelcome
unless raw_criteria.empty?
raise ArgumentError, "Unrecognized security group rule 'allow' criteria '#{raw_criteria.keys.join(',')}'. Expected criteria: #{allowed_criteria.join(', ')}"
end
recognized_criteria
end
def allow__focus_on_position(rules, criteria)
return rules unless criteria.key?(:position)
idx = criteria.delete(:position)
# Normalize to a zero-based numeric index
case # rubocop: disable Style/EmptyCaseCondition
when idx.is_a?(Symbol) && idx == :first
idx = 0
when idx.is_a?(Symbol) && idx == :last
idx = rules.count - 1
when idx.is_a?(String)
idx = idx.to_i - 1 # We document this as 1-based, so adjust to be zero-based.
when idx.is_a?(Numeric)
idx -= 1 # We document this as 1-based, so adjust to be zero-based.
else
raise ArgumentError, "aws_security_group 'allow' 'position' criteria must be an integer or the symbols :first or :last"
end
unless idx < rules.count
raise ArgumentError, "aws_security_group 'allow' 'position' criteria #{idx+1} is out of range - there are only #{rules.count} rules for security group #{group_id}."
end
[rules[idx]]
end
def allow__match_port(rule, criteria) # rubocop: disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity, Metrics/AbcSize
if criteria[:exact] || criteria[:from_port] || criteria[:to_port]
# Exact match mode
# :port is shorthand for a single-valued port range.
criteria[:to_port] = criteria[:from_port] = criteria[:port] if criteria[:port]
to = criteria[:to_port]
from = criteria[:from_port]
# It's a match if neither criteria was specified
return true if to.nil? && from.nil?
# Normalize to integers
to = to.to_i unless to.nil?
from = from.to_i unless from.nil?
# It's a match if either was specified and the other was not
return true if rule[:to_port] == to && from.nil?
return true if rule[:from_port] == from && to.nil?
# Finally, both must match.
rule[:to_port] == to && rule[:from_port] == from
elsif !criteria[:port]
# port not specified, match anything
true
else
# Range membership mode
rule_from = rule[:from_port] || 0
rule_to = rule[:to_port] || 65535
(rule_from..rule_to).cover?(criteria[:port].to_i)
end
end
def allow__match_protocol(rule, criteria)
return true unless criteria.key?(:protocol)
prot = criteria[:protocol]
# We provide a "fluency alias" for -1 (any).
prot = '-1' if prot == 'any'
rule[:ip_protocol] == prot
end
def match_ipv4_or_6_range(rule, criteria)
if criteria.key?(:ipv4_range)
query = criteria[:ipv4_range]
query = [query] unless query.is_a?(Array)
ranges = rule[:ip_ranges].map { |rng| rng[:cidr_ip] }
else # IPv6
query = criteria[:ipv6_range]
query = [query] unless query.is_a?(Array)
ranges = rule[:ipv_6_ranges].map { |rng| rng[:cidr_ipv_6] }
end
if criteria[:exact]
Set.new(query) == Set.new(ranges)
else
# CIDR subset mode
# "Each of the provided IP ranges must be a member of one of the rule's listed IP ranges"
query.all? do |candidate|
candidate = IPAddr.new(candidate)
ranges.any? do |range|
range = IPAddr.new(range)
range.include?(candidate)
end
end
end
end
def allow__match_ipv4_range(rule, criteria)
return true unless criteria.key?(:ipv4_range)
match_ipv4_or_6_range(rule, criteria)
end
def allow__match_ipv6_range(rule, criteria)
return true unless criteria.key?(:ipv6_range)
match_ipv4_or_6_range(rule, criteria)
end
def allow__match_security_group(rule, criteria)
return true unless criteria.key?(:security_group)
query = criteria[:security_group]
return false unless rule[:user_id_group_pairs]
rule[:user_id_group_pairs].any? { |group| query == group[:group_id] }
end
def validate_params(raw_params)
recognized_params = check_resource_param_names(
raw_params: raw_params,
allowed_params: [:id, :group_id, :group_name, :vpc_id],
allowed_scalar_name: :group_id,
allowed_scalar_type: String,
)
# id is an alias for group_id
recognized_params[:group_id] = recognized_params.delete(:id) if recognized_params.key?(:id)
if recognized_params.key?(:group_id) && recognized_params[:group_id] !~ /^sg\-[0-9a-f]{8}/
raise ArgumentError, 'aws_security_group security group ID must be in the format "sg-" followed by 8 hexadecimal characters.'
end
if recognized_params.key?(:vpc_id) && recognized_params[:vpc_id] !~ /^vpc\-[0-9a-f]{8}/
raise ArgumentError, 'aws_security_group VPC ID must be in the format "vpc-" followed by 8 hexadecimal characters.'
end
validated_params = recognized_params
if validated_params.empty?
raise ArgumentError, 'You must provide parameters to aws_security_group, such as group_name, group_id, or vpc_id.g_group.'
end
validated_params
end
def count_sg_rules(ip_permissions)
rule_count = 0
ip_permissions.each do |ip_permission|
[:ip_ranges, :ipv_6_ranges, :user_id_group_pairs].each do |key|
if ip_permission.key? key
rule_count += ip_permission[key].length
end
end
end
rule_count
end
def fetch_from_api # rubocop: disable Metrics/AbcSize
backend = BackendFactory.create(inspec_runner)
# Transform into filter format expected by AWS
filters = []
[
:description,
:group_id,
:group_name,
:vpc_id,
].each do |criterion_name|
instance_var = "@#{criterion_name}".to_sym
next unless instance_variable_defined?(instance_var)
val = instance_variable_get(instance_var)
next if val.nil?
filters.push(
{
name: criterion_name.to_s.tr('_', '-'),
values: [val],
},
)
end
dsg_response = backend.describe_security_groups(filters: filters)
if dsg_response.security_groups.empty?
@exists = false
@inbound_rules = []
@outbound_rules = []
return
end
@exists = true
@description = dsg_response.security_groups[0].description
@group_id = dsg_response.security_groups[0].group_id
@group_name = dsg_response.security_groups[0].group_name
@vpc_id = dsg_response.security_groups[0].vpc_id
@inbound_rules = dsg_response.security_groups[0].ip_permissions.map(&:to_h)
@inbound_rules_count = count_sg_rules(dsg_response.security_groups[0].ip_permissions.map(&:to_h))
@outbound_rules = dsg_response.security_groups[0].ip_permissions_egress.map(&:to_h)
@outbound_rules_count = count_sg_rules(dsg_response.security_groups[0].ip_permissions_egress.map(&:to_h))
end
class Backend
class AwsClientApi < AwsBackendBase
BackendFactory.set_default_backend self
self.aws_client_class = Aws::EC2::Client
def describe_security_groups(query)
aws_service_client.describe_security_groups(query)
end
end
end
end