From 0461b976ed9e55e2cda8076685319be32953479c Mon Sep 17 00:00:00 2001 From: gamaleld Date: Thu, 6 May 2021 02:08:48 +0200 Subject: [PATCH 1/3] Create EC2_TERMINATION_PROTECTION.py --- .../EC2_TERMINATION_PROTECTION.py | 422 ++++++++++++++++++ 1 file changed, 422 insertions(+) create mode 100644 python/EC2_TERMINATION_PROTECTION/EC2_TERMINATION_PROTECTION.py diff --git a/python/EC2_TERMINATION_PROTECTION/EC2_TERMINATION_PROTECTION.py b/python/EC2_TERMINATION_PROTECTION/EC2_TERMINATION_PROTECTION.py new file mode 100644 index 00000000..ae74d2c8 --- /dev/null +++ b/python/EC2_TERMINATION_PROTECTION/EC2_TERMINATION_PROTECTION.py @@ -0,0 +1,422 @@ +# Copyright 2017-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You may +# not use this file except in compliance with the License. A copy of the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for +# the specific language governing permissions and limitations under the License. + +import json +import sys +import datetime +import boto3 +import botocore + +try: + import liblogging +except ImportError: + pass + +############## +# Parameters # +############## + +# Define the default resource to report to Config Rules +DEFAULT_RESOURCE_TYPE = 'AWS::EC2::Instance' + +# Set to True to get the lambda to assume the Role attached on the Config Service (useful for cross-account). +ASSUME_ROLE_MODE = False + +# Other parameters (no change needed) +CONFIG_ROLE_TIMEOUT_SECONDS = 900 + +############# +# Main Code # +############# + +def evaluate_compliance(event, configuration_item, valid_rule_parameters): + """Form the evaluation(s) to be return to Config Rules + + Return either: + None -- when no result needs to be displayed + a string -- either COMPLIANT, NON_COMPLIANT or NOT_APPLICABLE + a dictionary -- the evaluation dictionary, usually built by build_evaluation_from_config_item() + a list of dictionary -- a list of evaluation dictionary , usually built by build_evaluation() + + Keyword arguments: + event -- the event variable given in the lambda handler + configuration_item -- the configurationItem dictionary in the invokingEvent + valid_rule_parameters -- the output of the evaluate_parameters() representing validated parameters of the Config Rule + + Advanced Notes: + 1 -- if a resource is deleted and generate a configuration change with ResourceDeleted status, the Boilerplate code will put a NOT_APPLICABLE on this resource automatically. + 2 -- if a None or a list of dictionary is returned, the old evaluation(s) which are not returned in the new evaluation list are returned as NOT_APPLICABLE by the Boilerplate code + 3 -- if None or an empty string, list or dict is returned, the Boilerplate code will put a "shadow" evaluation to feedback that the evaluation took place properly + """ + + ############################### + # Add your custom logic here. # + ############################### + + # Get instance ID from CI + instance_id = configuration_item['resourceId'] + + # Describe EC2 Attributes for this Instance to get DisableApiTermination Value True|False + # False = NON_COMPLIANT + # True = COMPLIANT + + ec2_client = get_client('ec2', event) + + attributes = ec2_client.describe_instance_attribute( + Attribute='disableApiTermination', + InstanceId=instance_id, + ) + + termination_status = attributes['DisableApiTermination']['Value'] + + # Scenario 1: EC2 Termination Protection is Enabled, Rule is COMPLIANT + if termination_status: + return build_evaluation_from_config_item(configuration_item, 'COMPLIANT') + else: + # Scenario 1: EC2 Termination Protection is Enabled, Rule is COMPLIANT + return build_evaluation_from_config_item(configuration_item, 'NON_COMPLIANT', annotation='This EC2 Instance does not have EC2 Termination Protection enabled.') + + return 'NOT_APPLICABLE' + +def evaluate_parameters(rule_parameters): + """Evaluate the rule parameters dictionary validity. Raise a ValueError for invalid parameters. + + Return: + anything suitable for the evaluate_compliance() + + Keyword arguments: + rule_parameters -- the Key/Value dictionary of the Config Rules parameters + """ + valid_rule_parameters = rule_parameters + return valid_rule_parameters + +#################### +# Helper Functions # +#################### + +# Build an error to be displayed in the logs when the parameter is invalid. +def build_parameters_value_error_response(ex): + """Return an error dictionary when the evaluate_parameters() raises a ValueError. + + Keyword arguments: + ex -- Exception text + """ + return build_error_response(internal_error_message="Parameter value is invalid", + internal_error_details="An ValueError was raised during the validation of the Parameter value", + customer_error_code="InvalidParameterValueException", + customer_error_message=str(ex)) + +# This gets the client after assuming the Config service role +# either in the same AWS account or cross-account. +def get_client(service, event, region=None): + """Return the service boto client. It should be used instead of directly calling the client. + + Keyword arguments: + service -- the service name used for calling the boto.client() + event -- the event variable given in the lambda handler + region -- the region where the client is called (default: None) + """ + if not ASSUME_ROLE_MODE: + return boto3.client(service, region) + credentials = get_assume_role_credentials(get_execution_role_arn(event), region) + return boto3.client(service, aws_access_key_id=credentials['AccessKeyId'], + aws_secret_access_key=credentials['SecretAccessKey'], + aws_session_token=credentials['SessionToken'], + region_name=region + ) + +# This generate an evaluation for config +def build_evaluation(resource_id, compliance_type, event, resource_type=DEFAULT_RESOURCE_TYPE, annotation=None): + """Form an evaluation as a dictionary. Usually suited to report on scheduled rules. + + Keyword arguments: + resource_id -- the unique id of the resource to report + compliance_type -- either COMPLIANT, NON_COMPLIANT or NOT_APPLICABLE + event -- the event variable given in the lambda handler + resource_type -- the CloudFormation resource type (or AWS::::Account) to report on the rule (default DEFAULT_RESOURCE_TYPE) + annotation -- an annotation to be added to the evaluation (default None). It will be truncated to 255 if longer. + """ + eval_cc = {} + if annotation: + eval_cc['Annotation'] = build_annotation(annotation) + eval_cc['ComplianceResourceType'] = resource_type + eval_cc['ComplianceResourceId'] = resource_id + eval_cc['ComplianceType'] = compliance_type + eval_cc['OrderingTimestamp'] = str(json.loads(event['invokingEvent'])['notificationCreationTime']) + return eval_cc + +def build_evaluation_from_config_item(configuration_item, compliance_type, annotation=None): + """Form an evaluation as a dictionary. Usually suited to report on configuration change rules. + + Keyword arguments: + configuration_item -- the configurationItem dictionary in the invokingEvent + compliance_type -- either COMPLIANT, NON_COMPLIANT or NOT_APPLICABLE + annotation -- an annotation to be added to the evaluation (default None). It will be truncated to 255 if longer. + """ + eval_ci = {} + if annotation: + eval_ci['Annotation'] = build_annotation(annotation) + eval_ci['ComplianceResourceType'] = configuration_item['resourceType'] + eval_ci['ComplianceResourceId'] = configuration_item['resourceId'] + eval_ci['ComplianceType'] = compliance_type + eval_ci['OrderingTimestamp'] = configuration_item['configurationItemCaptureTime'] + return eval_ci + +#################### +# Boilerplate Code # +#################### + +# Get execution role for Lambda function +def get_execution_role_arn(event): + role_arn = None + if 'ruleParameters' in event: + rule_params = json.loads(event['ruleParameters']) + role_name = rule_params.get("ExecutionRoleName") + if role_name: + execution_role_prefix = event["executionRoleArn"].split("/")[0] + role_arn = "{}/{}".format(execution_role_prefix, role_name) + + if not role_arn: + role_arn = event['executionRoleArn'] + + return role_arn + +# Build annotation within Service constraints +def build_annotation(annotation_string): + if len(annotation_string) > 256: + return annotation_string[:244] + " [truncated]" + return annotation_string + +# Helper function used to validate input +def check_defined(reference, reference_name): + if not reference: + raise Exception('Error: ', reference_name, 'is not defined') + return reference + +# Check whether the message is OversizedConfigurationItemChangeNotification or not +def is_oversized_changed_notification(message_type): + check_defined(message_type, 'messageType') + return message_type == 'OversizedConfigurationItemChangeNotification' + +# Check whether the message is a ScheduledNotification or not. +def is_scheduled_notification(message_type): + check_defined(message_type, 'messageType') + return message_type == 'ScheduledNotification' + +# Get configurationItem using getResourceConfigHistory API +# in case of OversizedConfigurationItemChangeNotification +def get_configuration(resource_type, resource_id, configuration_capture_time): + result = AWS_CONFIG_CLIENT.get_resource_config_history( + resourceType=resource_type, + resourceId=resource_id, + laterTime=configuration_capture_time, + limit=1) + configuration_item = result['configurationItems'][0] + return convert_api_configuration(configuration_item) + +# Convert from the API model to the original invocation model +def convert_api_configuration(configuration_item): + for k, v in configuration_item.items(): + if isinstance(v, datetime.datetime): + configuration_item[k] = str(v) + configuration_item['awsAccountId'] = configuration_item['accountId'] + configuration_item['ARN'] = configuration_item['arn'] + configuration_item['configurationStateMd5Hash'] = configuration_item['configurationItemMD5Hash'] + configuration_item['configurationItemVersion'] = configuration_item['version'] + configuration_item['configuration'] = json.loads(configuration_item['configuration']) + if 'relationships' in configuration_item: + for i in range(len(configuration_item['relationships'])): + configuration_item['relationships'][i]['name'] = configuration_item['relationships'][i]['relationshipName'] + return configuration_item + +# Based on the type of message get the configuration item +# either from configurationItem in the invoking event +# or using the getResourceConfigHistiry API in getConfiguration function. +def get_configuration_item(invoking_event): + check_defined(invoking_event, 'invokingEvent') + if is_oversized_changed_notification(invoking_event['messageType']): + configuration_item_summary = check_defined(invoking_event['configurationItemSummary'], 'configurationItemSummary') + return get_configuration(configuration_item_summary['resourceType'], configuration_item_summary['resourceId'], configuration_item_summary['configurationItemCaptureTime']) + if is_scheduled_notification(invoking_event['messageType']): + return None + return check_defined(invoking_event['configurationItem'], 'configurationItem') + +# Check whether the resource has been deleted. If it has, then the evaluation is unnecessary. +def is_applicable(configuration_item, event): + try: + check_defined(configuration_item, 'configurationItem') + check_defined(event, 'event') + except: + return True + status = configuration_item['configurationItemStatus'] + event_left_scope = event['eventLeftScope'] + if status == 'ResourceDeleted': + print("Resource Deleted, setting Compliance Status to NOT_APPLICABLE.") + + return status in ('OK', 'ResourceDiscovered') and not event_left_scope + + +def get_assume_role_credentials(role_arn, region=None): + sts_client = boto3.client('sts', region) + try: + assume_role_response = sts_client.assume_role(RoleArn=role_arn, + RoleSessionName="configLambdaExecution", + DurationSeconds=CONFIG_ROLE_TIMEOUT_SECONDS) + if 'liblogging' in sys.modules: + liblogging.logSession(role_arn, assume_role_response) + return assume_role_response['Credentials'] + except botocore.exceptions.ClientError as ex: + # Scrub error message for any internal account info leaks + print(str(ex)) + if 'AccessDenied' in ex.response['Error']['Code']: + ex.response['Error']['Message'] = "AWS Config does not have permission to assume the IAM role." + else: + ex.response['Error']['Message'] = "InternalError" + ex.response['Error']['Code'] = "InternalError" + raise ex + +# This removes older evaluation (usually useful for periodic rule not reporting on AWS::::Account). +def clean_up_old_evaluations(latest_evaluations, event): + + cleaned_evaluations = [] + + old_eval = AWS_CONFIG_CLIENT.get_compliance_details_by_config_rule( + ConfigRuleName=event['configRuleName'], + ComplianceTypes=['COMPLIANT', 'NON_COMPLIANT'], + Limit=100) + + old_eval_list = [] + + while True: + for old_result in old_eval['EvaluationResults']: + old_eval_list.append(old_result) + if 'NextToken' in old_eval: + next_token = old_eval['NextToken'] + old_eval = AWS_CONFIG_CLIENT.get_compliance_details_by_config_rule( + ConfigRuleName=event['configRuleName'], + ComplianceTypes=['COMPLIANT', 'NON_COMPLIANT'], + Limit=100, + NextToken=next_token) + else: + break + + for old_eval in old_eval_list: + old_resource_id = old_eval['EvaluationResultIdentifier']['EvaluationResultQualifier']['ResourceId'] + newer_founded = False + for latest_eval in latest_evaluations: + if old_resource_id == latest_eval['ComplianceResourceId']: + newer_founded = True + if not newer_founded: + cleaned_evaluations.append(build_evaluation(old_resource_id, "NOT_APPLICABLE", event)) + + return cleaned_evaluations + latest_evaluations + +def lambda_handler(event, context): + if 'liblogging' in sys.modules: + liblogging.logEvent(event) + + global AWS_CONFIG_CLIENT + + #print(event) + check_defined(event, 'event') + invoking_event = json.loads(event['invokingEvent']) + rule_parameters = {} + if 'ruleParameters' in event: + rule_parameters = json.loads(event['ruleParameters']) + + try: + valid_rule_parameters = evaluate_parameters(rule_parameters) + except ValueError as ex: + return build_parameters_value_error_response(ex) + + try: + AWS_CONFIG_CLIENT = get_client('config', event) + if invoking_event['messageType'] in ['ConfigurationItemChangeNotification', 'ScheduledNotification', 'OversizedConfigurationItemChangeNotification']: + configuration_item = get_configuration_item(invoking_event) + if is_applicable(configuration_item, event): + compliance_result = evaluate_compliance(event, configuration_item, valid_rule_parameters) + else: + compliance_result = "NOT_APPLICABLE" + else: + return build_internal_error_response('Unexpected message type', str(invoking_event)) + except botocore.exceptions.ClientError as ex: + if is_internal_error(ex): + return build_internal_error_response("Unexpected error while completing API request", str(ex)) + return build_error_response("Customer error while making API request", str(ex), ex.response['Error']['Code'], ex.response['Error']['Message']) + except ValueError as ex: + return build_internal_error_response(str(ex), str(ex)) + + evaluations = [] + latest_evaluations = [] + + if not compliance_result: + latest_evaluations.append(build_evaluation(event['accountId'], "NOT_APPLICABLE", event, resource_type='AWS::::Account')) + evaluations = clean_up_old_evaluations(latest_evaluations, event) + elif isinstance(compliance_result, str): + if configuration_item: + evaluations.append(build_evaluation_from_config_item(configuration_item, compliance_result)) + else: + evaluations.append(build_evaluation(event['accountId'], compliance_result, event, resource_type=DEFAULT_RESOURCE_TYPE)) + elif isinstance(compliance_result, list): + for evaluation in compliance_result: + missing_fields = False + for field in ('ComplianceResourceType', 'ComplianceResourceId', 'ComplianceType', 'OrderingTimestamp'): + if field not in evaluation: + print("Missing " + field + " from custom evaluation.") + missing_fields = True + + if not missing_fields: + latest_evaluations.append(evaluation) + evaluations = clean_up_old_evaluations(latest_evaluations, event) + elif isinstance(compliance_result, dict): + missing_fields = False + for field in ('ComplianceResourceType', 'ComplianceResourceId', 'ComplianceType', 'OrderingTimestamp'): + if field not in compliance_result: + print("Missing " + field + " from custom evaluation.") + missing_fields = True + if not missing_fields: + evaluations.append(compliance_result) + else: + evaluations.append(build_evaluation_from_config_item(configuration_item, 'NOT_APPLICABLE')) + + # Put together the request that reports the evaluation status + result_token = event['resultToken'] + test_mode = False + if result_token == 'TESTMODE': + # Used solely for RDK test to skip actual put_evaluation API call + test_mode = True + + # Invoke the Config API to report the result of the evaluation + evaluation_copy = [] + evaluation_copy = evaluations[:] + while evaluation_copy: + AWS_CONFIG_CLIENT.put_evaluations(Evaluations=evaluation_copy[:100], ResultToken=result_token, TestMode=test_mode) + del evaluation_copy[:100] + + # Used solely for RDK test to be able to test Lambda function + return evaluations + +def is_internal_error(exception): + return ((not isinstance(exception, botocore.exceptions.ClientError)) or exception.response['Error']['Code'].startswith('5') + or 'InternalError' in exception.response['Error']['Code'] or 'ServiceError' in exception.response['Error']['Code']) + +def build_internal_error_response(internal_error_message, internal_error_details=None): + return build_error_response(internal_error_message, internal_error_details, 'InternalError', 'InternalError') + +def build_error_response(internal_error_message, internal_error_details=None, customer_error_code=None, customer_error_message=None): + error_response = { + 'internalErrorMessage': internal_error_message, + 'internalErrorDetails': internal_error_details, + 'customerErrorMessage': customer_error_message, + 'customerErrorCode': customer_error_code + } + print(error_response) + return error_response From 9e729cdd39cb60a4c606989dce2dbe0cb876841e Mon Sep 17 00:00:00 2001 From: gamaleld Date: Thu, 6 May 2021 02:09:07 +0200 Subject: [PATCH 2/3] Add files via upload --- .../EC2_TERMINATION_PROTECTION_test.py | 187 ++++++++++++++++++ .../parameters.json | 12 ++ 2 files changed, 199 insertions(+) create mode 100644 python/EC2_TERMINATION_PROTECTION/EC2_TERMINATION_PROTECTION_test.py create mode 100644 python/EC2_TERMINATION_PROTECTION/parameters.json diff --git a/python/EC2_TERMINATION_PROTECTION/EC2_TERMINATION_PROTECTION_test.py b/python/EC2_TERMINATION_PROTECTION/EC2_TERMINATION_PROTECTION_test.py new file mode 100644 index 00000000..339b79cf --- /dev/null +++ b/python/EC2_TERMINATION_PROTECTION/EC2_TERMINATION_PROTECTION_test.py @@ -0,0 +1,187 @@ +# Copyright 2017-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You may +# not use this file except in compliance with the License. A copy of the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for +# the specific language governing permissions and limitations under the License. + +import sys +import unittest +try: + from unittest.mock import MagicMock +except ImportError: + from mock import MagicMock +import botocore + +############## +# Parameters # +############## + +# Define the default resource to report to Config Rules +DEFAULT_RESOURCE_TYPE = 'AWS::EC2::Instance' + +############# +# Main Code # +############# + +CONFIG_CLIENT_MOCK = MagicMock() +STS_CLIENT_MOCK = MagicMock() +ec2_mock = MagicMock() + +class Boto3Mock(): + @staticmethod + def client(client_name, *args, **kwargs): + if client_name == 'config': + return CONFIG_CLIENT_MOCK + if client_name == 'sts': + return STS_CLIENT_MOCK + if client_name == 'ec2': + return ec2_mock + raise Exception("Attempting to create an unknown client") + +sys.modules['boto3'] = Boto3Mock() + +RULE = __import__('EC2_TERMINATION_PROTECTION') + +class ComplianceTest(unittest.TestCase): + + invoking_event_ec2_scenario1 = '{"configurationItem":{"configurationItemCaptureTime":"2018-07-02T03:37:52.418Z","accountId": "681361479661","configurationItemStatus": "ResourceDiscovered","resourceType": "AWS::EC2::Instance","resourceId": "i-058180476cbe2999e","supplementaryConfiguration":{}},"notificationCreationTime":"2018-07-02T23:05:34.445Z","messageType":"ConfigurationItemChangeNotification"}' + invoking_event_ec2_scenario2 = '{"configurationItem":{"configurationItemCaptureTime":"2018-07-02T03:37:52.418Z","accountId": "681361479661","configurationItemStatus": "ResourceDiscovered","resourceType": "AWS::EC2::Instance","resourceId": "i-08d88c46031cc32e9","supplementaryConfiguration":{}},"notificationCreationTime":"2018-07-02T23:05:34.445Z","messageType":"ConfigurationItemChangeNotification"}' + + # Scenario 1: Termination Protection Enabled on EC2 Instance + def test_scenario_1_ec2_termination_protection_enabled(self): + #RULE.ASSUME_ROLE_MODE = True + + sc1 = {'DisableApiTermination': {'Value': True}, 'InstanceId': 'i-058180476cbe2999e', 'ResponseMetadata': {'RequestId': 'cae447c3-5083-4a92-b447-c51f948397c1', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'cae447c3-5083-4a92-b447-c51f948397c1', 'content-type': 'text/xml;charset=UTF-8', 'content-length': '359', 'date': 'Sat, 11 Jul 2020 12:45:39 GMT', 'server': 'AmazonEC2'}, 'RetryAttempts': 0}} + ec2_mock.describe_instance_attribute = MagicMock(return_value=sc1) + + response = RULE.lambda_handler(build_lambda_configurationchange_event(self.invoking_event_ec2_scenario1), {}) + resp_expected = [] + resp_expected.append(build_expected_response('COMPLIANT', 'i-058180476cbe2999e')) + assert_successful_evaluation(self, response, resp_expected) + + # Scenario 2: Termination Protection Disabled on EC2 Instance + def test_scenario_2_ec2_termination_protection_disabled(self): + #RULE.ASSUME_ROLE_MODE = True + + sc2 = {'DisableApiTermination': {'Value': False}, 'InstanceId': 'i-08d88c46031cc32e9', 'ResponseMetadata': {'RequestId': 'cae447c3-5083-4a92-b447-c51f948397c1', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'cae447c3-5083-4a92-b447-c51f948397c1', 'content-type': 'text/xml;charset=UTF-8', 'content-length': '359', 'date': 'Sat, 11 Jul 2020 12:45:39 GMT', 'server': 'AmazonEC2'}, 'RetryAttempts': 0}} + ec2_mock.describe_instance_attribute = MagicMock(return_value=sc2) + + response = RULE.lambda_handler(build_lambda_configurationchange_event(self.invoking_event_ec2_scenario2), {}) + resp_expected = [] + resp_expected.append(build_expected_response('NON_COMPLIANT', 'i-08d88c46031cc32e9', annotation='This EC2 Instance does not have EC2 Termination Protection enabled.')) + assert_successful_evaluation(self, response, resp_expected) + +#################### +# Helper Functions # +#################### + +def build_lambda_configurationchange_event(invoking_event, rule_parameters=None): + event_to_return = { + 'configRuleName':'myrule', + 'executionRoleArn':'roleArn', + 'eventLeftScope': False, + 'invokingEvent': invoking_event, + 'accountId': '123456789012', + 'configRuleArn': 'arn:aws:config:us-east-1:123456789012:config-rule/config-rule-8fngan', + 'resultToken':'token' + } + if rule_parameters: + event_to_return['ruleParameters'] = rule_parameters + return event_to_return + +def build_lambda_scheduled_event(rule_parameters=None): + invoking_event = '{"messageType":"ScheduledNotification","notificationCreationTime":"2017-12-23T22:11:18.158Z"}' + event_to_return = { + 'configRuleName':'myrule', + 'executionRoleArn':'roleArn', + 'eventLeftScope': False, + 'invokingEvent': invoking_event, + 'accountId': '123456789012', + 'configRuleArn': 'arn:aws:config:us-east-1:123456789012:config-rule/config-rule-8fngan', + 'resultToken':'token' + } + if rule_parameters: + event_to_return['ruleParameters'] = rule_parameters + return event_to_return + +def build_expected_response(compliance_type, compliance_resource_id, compliance_resource_type=DEFAULT_RESOURCE_TYPE, annotation=None): + if not annotation: + return { + 'ComplianceType': compliance_type, + 'ComplianceResourceId': compliance_resource_id, + 'ComplianceResourceType': compliance_resource_type + } + return { + 'ComplianceType': compliance_type, + 'ComplianceResourceId': compliance_resource_id, + 'ComplianceResourceType': compliance_resource_type, + 'Annotation': annotation + } + +def assert_successful_evaluation(test_class, response, resp_expected, evaluations_count=1): + if isinstance(response, dict): + test_class.assertEquals(resp_expected['ComplianceResourceType'], response['ComplianceResourceType']) + test_class.assertEquals(resp_expected['ComplianceResourceId'], response['ComplianceResourceId']) + test_class.assertEquals(resp_expected['ComplianceType'], response['ComplianceType']) + test_class.assertTrue(response['OrderingTimestamp']) + if 'Annotation' in resp_expected or 'Annotation' in response: + test_class.assertEquals(resp_expected['Annotation'], response['Annotation']) + elif isinstance(response, list): + test_class.assertEquals(evaluations_count, len(response)) + for i, response_expected in enumerate(resp_expected): + test_class.assertEquals(response_expected['ComplianceResourceType'], response[i]['ComplianceResourceType']) + test_class.assertEquals(response_expected['ComplianceResourceId'], response[i]['ComplianceResourceId']) + test_class.assertEquals(response_expected['ComplianceType'], response[i]['ComplianceType']) + test_class.assertTrue(response[i]['OrderingTimestamp']) + if 'Annotation' in response_expected or 'Annotation' in response[i]: + test_class.assertEquals(response_expected['Annotation'], response[i]['Annotation']) + +def assert_customer_error_response(test_class, response, customer_error_code=None, customer_error_message=None): + if customer_error_code: + test_class.assertEqual(customer_error_code, response['customerErrorCode']) + if customer_error_message: + test_class.assertEqual(customer_error_message, response['customerErrorMessage']) + test_class.assertTrue(response['customerErrorCode']) + test_class.assertTrue(response['customerErrorMessage']) + if "internalErrorMessage" in response: + test_class.assertTrue(response['internalErrorMessage']) + if "internalErrorDetails" in response: + test_class.assertTrue(response['internalErrorDetails']) + +def sts_mock(): + assume_role_response = { + "Credentials": { + "AccessKeyId": "string", + "SecretAccessKey": "string", + "SessionToken": "string"}} + STS_CLIENT_MOCK.reset_mock(return_value=True) + STS_CLIENT_MOCK.assume_role = MagicMock(return_value=assume_role_response) + +################## +# Common Testing # +################## + +class TestStsErrors(unittest.TestCase): + + def test_sts_unknown_error(self): + RULE.ASSUME_ROLE_MODE = True + RULE.evaluate_parameters = MagicMock(return_value=True) + STS_CLIENT_MOCK.assume_role = MagicMock(side_effect=botocore.exceptions.ClientError( + {'Error': {'Code': 'unknown-code', 'Message': 'unknown-message'}}, 'operation')) + response = RULE.lambda_handler(build_lambda_configurationchange_event('{}'), {}) + assert_customer_error_response( + self, response, 'InternalError', 'InternalError') + + def test_sts_access_denied(self): + RULE.ASSUME_ROLE_MODE = True + RULE.evaluate_parameters = MagicMock(return_value=True) + STS_CLIENT_MOCK.assume_role = MagicMock(side_effect=botocore.exceptions.ClientError( + {'Error': {'Code': 'AccessDenied', 'Message': 'access-denied'}}, 'operation')) + response = RULE.lambda_handler(build_lambda_configurationchange_event('{}'), {}) + assert_customer_error_response( + self, response, 'AccessDenied', 'AWS Config does not have permission to assume the IAM role.') diff --git a/python/EC2_TERMINATION_PROTECTION/parameters.json b/python/EC2_TERMINATION_PROTECTION/parameters.json new file mode 100644 index 00000000..9fe91fec --- /dev/null +++ b/python/EC2_TERMINATION_PROTECTION/parameters.json @@ -0,0 +1,12 @@ +{ + "Version": "1.0", + "Parameters": { + "RuleName": "EC2_TERMINATION_PROTECTION", + "SourceRuntime": "python3.7", + "CodeKey": "EC2_TERMINATION_PROTECTION.zip", + "InputParameters": "{}", + "OptionalParameters": "{}", + "SourceEvents": "AWS::EC2::Instance" + }, + "Tags": "[]" +} \ No newline at end of file From c14e46752645158aa604ee574f9757630bde4373 Mon Sep 17 00:00:00 2001 From: gamaleld Date: Fri, 7 May 2021 12:28:43 +0200 Subject: [PATCH 3/3] Update EC2_TERMINATION_PROTECTION_test.py remove accountID generated by RDK and replaced with 123456789012 --- .../EC2_TERMINATION_PROTECTION_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/EC2_TERMINATION_PROTECTION/EC2_TERMINATION_PROTECTION_test.py b/python/EC2_TERMINATION_PROTECTION/EC2_TERMINATION_PROTECTION_test.py index 339b79cf..c597196b 100644 --- a/python/EC2_TERMINATION_PROTECTION/EC2_TERMINATION_PROTECTION_test.py +++ b/python/EC2_TERMINATION_PROTECTION/EC2_TERMINATION_PROTECTION_test.py @@ -49,8 +49,8 @@ def client(client_name, *args, **kwargs): class ComplianceTest(unittest.TestCase): - invoking_event_ec2_scenario1 = '{"configurationItem":{"configurationItemCaptureTime":"2018-07-02T03:37:52.418Z","accountId": "681361479661","configurationItemStatus": "ResourceDiscovered","resourceType": "AWS::EC2::Instance","resourceId": "i-058180476cbe2999e","supplementaryConfiguration":{}},"notificationCreationTime":"2018-07-02T23:05:34.445Z","messageType":"ConfigurationItemChangeNotification"}' - invoking_event_ec2_scenario2 = '{"configurationItem":{"configurationItemCaptureTime":"2018-07-02T03:37:52.418Z","accountId": "681361479661","configurationItemStatus": "ResourceDiscovered","resourceType": "AWS::EC2::Instance","resourceId": "i-08d88c46031cc32e9","supplementaryConfiguration":{}},"notificationCreationTime":"2018-07-02T23:05:34.445Z","messageType":"ConfigurationItemChangeNotification"}' + invoking_event_ec2_scenario1 = '{"configurationItem":{"configurationItemCaptureTime":"2018-07-02T03:37:52.418Z","accountId": "123456789012","configurationItemStatus": "ResourceDiscovered","resourceType": "AWS::EC2::Instance","resourceId": "i-058180476cbe2999e","supplementaryConfiguration":{}},"notificationCreationTime":"2018-07-02T23:05:34.445Z","messageType":"ConfigurationItemChangeNotification"}' + invoking_event_ec2_scenario2 = '{"configurationItem":{"configurationItemCaptureTime":"2018-07-02T03:37:52.418Z","accountId": "123456789012","configurationItemStatus": "ResourceDiscovered","resourceType": "AWS::EC2::Instance","resourceId": "i-08d88c46031cc32e9","supplementaryConfiguration":{}},"notificationCreationTime":"2018-07-02T23:05:34.445Z","messageType":"ConfigurationItemChangeNotification"}' # Scenario 1: Termination Protection Enabled on EC2 Instance def test_scenario_1_ec2_termination_protection_enabled(self):