diff --git a/CHANGES.md b/CHANGES.md index d6032dd..94f0944 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -7,10 +7,12 @@ ### Changed * [#348]: Bumped default Spark to 3.2; dropped support for Python 3.6; added CI build for Python 3.10. -* [#361]: Migrated from AdoptOpenJDK, which is deprecated, to Adoptium Open JDK. +* [#361]: Migrated from AdoptOpenJDK, which is deprecated, to Adoptium OpenJDK. +* [#362]: Improved Flintrock's ability to cleanup after launch failures. [#348]: https://github.com/nchammas/flintrock/pull/348 [#361]: https://github.com/nchammas/flintrock/pull/361 +[#362]: https://github.com/nchammas/flintrock/pull/362 ## [2.0.0] - 2021-06-10 diff --git a/flintrock/ec2.py b/flintrock/ec2.py index 2bbb5f1..f7d01ed 100644 --- a/flintrock/ec2.py +++ b/flintrock/ec2.py @@ -7,6 +7,7 @@ import logging from ipaddress import IPv4Network from datetime import datetime +from typing import List, Tuple # External modules import boto3 @@ -82,9 +83,13 @@ def instances(self): @property @functools.lru_cache() - def private_network(self): + def private_network(self) -> bool: ec2 = boto3.resource(service_name='ec2', region_name=self.region) - return not ec2.Subnet(self.master_instance.subnet_id).map_public_ip_on_launch + if self.master_instance: + reference_instance = self.master_instance + else: + reference_instance = self.slave_instances[0] + return not ec2.Subnet(reference_instance.subnet_id).map_public_ip_on_launch @property def master_ip(self): @@ -190,6 +195,7 @@ def destroy(self): for instance in self.instances: instance.modify_attribute( Groups=[flintrock_base_group.id]) + time.sleep(1) # TODO: Centralize logic to get cluster security group name from cluster name. cluster_group = list( @@ -263,16 +269,17 @@ def add_slaves_check(self): @timeit def add_slaves( - self, - *, - user: str, - identity_file: str, - num_slaves: int, - spot_price: float, - spot_request_duration: str, - min_root_ebs_size_gb: int, - tags: list, - assume_yes: bool): + self, + *, + user: str, + identity_file: str, + num_slaves: int, + spot_price: float, + spot_request_duration: str, + min_root_ebs_size_gb: int, + tags: list, + assume_yes: bool, + ): security_group_ids = [ group['GroupId'] for group in self.master_instance.security_groups] @@ -328,19 +335,10 @@ def add_slaves( instance_profile_arn=instance_profile_arn, ebs_optimized=self.master_instance.ebs_optimized, instance_initiated_shutdown_behavior=instance_initiated_shutdown_behavior, - user_data=user_data) - - slave_tags = [ - {'Key': 'flintrock-role', 'Value': 'slave'}, - {'Key': 'Name', 'Value': '{c}-slave'.format(c=self.name)}] - slave_tags += tags - - (ec2.instances - .filter( - Filters=[ - {'Name': 'instance-id', 'Values': [i.id for i in new_slave_instances]} - ]) - .create_tags(Tags=slave_tags)) + user_data=user_data, + tag_specifications=_tag_specs(self.name, 'slave', tags), + ) + time.sleep(3) existing_slaves = self.slave_ips @@ -702,29 +700,53 @@ def get_ec2_block_device_mappings( def _create_instances( - *, - num_instances, - region, - spot_price, - spot_request_valid_until, - ami, - assume_yes, - key_name, - instance_type, - block_device_mappings, - availability_zone, - placement_group, - tenancy, - security_group_ids, - subnet_id, - instance_profile_arn, - ebs_optimized, - instance_initiated_shutdown_behavior, - user_data) -> 'List[boto3.resources.factory.ec2.Instance]': + *, + num_instances, + region, + spot_price, + spot_request_valid_until, + ami, + assume_yes, + key_name, + instance_type, + block_device_mappings, + availability_zone, + placement_group, + tenancy, + security_group_ids, + subnet_id, + instance_profile_arn, + ebs_optimized, + instance_initiated_shutdown_behavior, + user_data, + tag_specifications, +) -> 'List[boto3.resources.factory.ec2.Instance]': ec2 = boto3.resource(service_name='ec2', region_name=region) cluster_instances = [] spot_requests = [] + common_launch_specs = { + 'ImageId': ami, + 'KeyName': key_name, + 'InstanceType': instance_type, + 'BlockDeviceMappings': block_device_mappings, + 'Placement': { + 'AvailabilityZone': availability_zone, + 'Tenancy': tenancy, + 'GroupName': placement_group, + }, + 'SecurityGroupIds': security_group_ids, + 'SubnetId': subnet_id, + 'IamInstanceProfile': {'Arn': instance_profile_arn}, + 'EbsOptimized': ebs_optimized, + 'UserData': user_data, + 'TagSpecifications': [ + { + 'ResourceType': 'instance', + 'Tags': tag_specifications, + }, + ], + } try: if spot_price: @@ -736,20 +758,8 @@ def _create_instances( SpotPrice=str(spot_price), InstanceCount=num_instances, ValidUntil=spot_request_valid_until, - LaunchSpecification={ - 'ImageId': ami, - 'KeyName': key_name, - 'InstanceType': instance_type, - 'BlockDeviceMappings': block_device_mappings, - 'Placement': { - 'AvailabilityZone': availability_zone, - 'GroupName': placement_group}, - 'SecurityGroupIds': security_group_ids, - 'SubnetId': subnet_id, - 'IamInstanceProfile': { - 'Arn': instance_profile_arn}, - 'EbsOptimized': ebs_optimized, - 'UserData': user_data})['SpotInstanceRequests'] + LaunchSpecification=common_launch_specs, + )['SpotInstanceRequests'] request_ids = [r['SpotInstanceRequestId'] for r in spot_requests] pending_request_ids = request_ids @@ -783,32 +793,13 @@ def _create_instances( {'Name': 'instance-id', 'Values': [r['InstanceId'] for r in spot_requests]} ])) else: - # Move this to flintrock.py? - logger.info("Launching {c} instance{s}...".format( - c=num_instances, - s='' if num_instances == 1 else 's')) - - # TODO: If an exception is raised in here, some instances may be - # left stranded. cluster_instances = ec2.create_instances( MinCount=num_instances, MaxCount=num_instances, - ImageId=ami, - KeyName=key_name, - InstanceType=instance_type, - BlockDeviceMappings=block_device_mappings, - Placement={ - 'AvailabilityZone': availability_zone, - 'Tenancy': tenancy, - 'GroupName': placement_group}, - SecurityGroupIds=security_group_ids, - SubnetId=subnet_id, - IamInstanceProfile={ - 'Arn': instance_profile_arn}, - EbsOptimized=ebs_optimized, + # Shutdown Behavior is specific to on-demand instances. InstanceInitiatedShutdownBehavior=instance_initiated_shutdown_behavior, - UserData=user_data) - time.sleep(10) # AWS metadata eventual consistency tax. + **common_launch_specs, + ) return cluster_instances except (Exception, KeyboardInterrupt) as e: if not isinstance(e, KeyboardInterrupt): @@ -917,59 +908,52 @@ def launch( else: instance_profile_arn = '' - num_instances = num_slaves + 1 if user_data is not None: user_data = user_data.read() else: user_data = '' - try: - cluster_instances = _create_instances( - num_instances=num_instances, - region=region, - spot_price=spot_price, - spot_request_valid_until=duration_to_expiration(spot_request_duration), - ami=ami, - assume_yes=assume_yes, - key_name=key_name, - instance_type=instance_type, - block_device_mappings=block_device_mappings, - availability_zone=availability_zone, - placement_group=placement_group, - tenancy=tenancy, - security_group_ids=security_group_ids, - subnet_id=subnet_id, - instance_profile_arn=instance_profile_arn, - ebs_optimized=ebs_optimized, - instance_initiated_shutdown_behavior=instance_initiated_shutdown_behavior, - user_data=user_data) - - master_instance = cluster_instances[0] - slave_instances = cluster_instances[1:] - - master_tags = [ - {'Key': 'flintrock-role', 'Value': 'master'}, - {'Key': 'Name', 'Value': '{c}-master'.format(c=cluster_name)}] - master_tags += tags - - (ec2.instances - .filter( - Filters=[ - {'Name': 'instance-id', 'Values': [master_instance.id]} - ]) - .create_tags(Tags=master_tags)) + common_instance_spec = { + 'region': region, + 'spot_price': spot_price, + 'spot_request_valid_until': duration_to_expiration(spot_request_duration), + 'ami': ami, + 'assume_yes': assume_yes, + 'key_name': key_name, + 'instance_type': instance_type, + 'block_device_mappings': block_device_mappings, + 'availability_zone': availability_zone, + 'placement_group': placement_group, + 'tenancy': tenancy, + 'security_group_ids': security_group_ids, + 'subnet_id': subnet_id, + 'instance_profile_arn': instance_profile_arn, + 'ebs_optimized': ebs_optimized, + 'instance_initiated_shutdown_behavior': instance_initiated_shutdown_behavior, + 'user_data': user_data, + } + + # We initialize these like this so that if the launch operation fails we have + # references we can use for cleanup. + master_instance = None + slave_instances = [] + cluster = None - slave_tags = [ - {'Key': 'flintrock-role', 'Value': 'slave'}, - {'Key': 'Name', 'Value': '{c}-slave'.format(c=cluster_name)}] - slave_tags += tags + master_tags = _tag_specs(cluster_name, 'master', tags) + slave_tags = _tag_specs(cluster_name, 'slave', tags) - (ec2.instances - .filter( - Filters=[ - {'Name': 'instance-id', 'Values': [i.id for i in slave_instances]} - ]) - .create_tags(Tags=slave_tags)) + try: + master_instance = _create_instances( + num_instances=1, + tag_specifications=master_tags, + **common_instance_spec, + )[0] + slave_instances = _create_instances( + num_instances=num_slaves, + tag_specifications=slave_tags, + **common_instance_spec, + ) + time.sleep(3) cluster = EC2Cluster( name=cluster_name, @@ -990,15 +974,19 @@ def launch( return cluster except (Exception, KeyboardInterrupt) as e: - if isinstance(e, InterruptedEC2Operation): - cleanup_instances = e.instances - else: - # TODO: There is no guarantee that cluster_instances is - # defined. - # See: https://github.com/nchammas/flintrock/issues/183 - cleanup_instances = cluster_instances + # If the interruption happens right after a request to create instances is + # made, we may not find all cluster nodes here. There is a small delay between + # when a create request is sent and when a subsequent call will see the results. + # This sleep works around that small delay. Is there a way to guarantee + # read-after-write consistency here? + time.sleep(1) + cluster = get_cluster( + cluster_name=cluster_name, + region=region, + vpc_id=vpc_id, + ) _cleanup_instances( - instances=cleanup_instances, + instances=cluster.instances, assume_yes=assume_yes, region=region, ) @@ -1028,17 +1016,21 @@ def get_clusters(*, cluster_names: list=[], region: str, vpc_id: str) -> list: if not vpc_id: vpc_id = get_default_vpc(region=region).id + # Since tags are assigned on creation and never removed by us (in contrast to how we + # remove security groups during a destroy operation), we can rely on them to find + # clusters. if cluster_names: - group_name_filter = ['flintrock-' + cn for cn in cluster_names] + cluster_name_filter = [{'Name': 'tag:flintrock-name', 'Values': cluster_names}] else: - group_name_filter = ['flintrock'] + cluster_name_filter = [] all_clusters_instances = list( ec2.instances.filter( Filters=[ - {'Name': 'instance-state-name', 'Values': ['pending', 'running', 'stopping', 'stopped']}, - {'Name': 'instance.group-name', 'Values': group_name_filter}, {'Name': 'vpc-id', 'Values': [vpc_id]}, + {'Name': 'instance-state-name', 'Values': ['pending', 'running', 'stopping', 'stopped']}, + {'Name': 'instance.group-name', 'Values': ['flintrock']}, + *cluster_name_filter, ])) found_cluster_names = { @@ -1117,16 +1109,33 @@ def _get_cluster_name(instance: 'boto3.resources.factory.ec2.Instance') -> str: """ Given an EC2 instance, get the name of the Flintrock cluster it belongs to. """ - for group in instance.security_groups: - if group['GroupName'].startswith('flintrock-'): - return group['GroupName'].replace('flintrock-', '', 1) - else: - raise Exception("Could not extract cluster name from instance: {i}".format( - i=instance.id)) + instance_tags = _ec2_tags_to_dict(instance.tags) + if 'flintrock-name' not in instance_tags: + raise Exception( + f"Could not extract cluster name from instance: {instance.id}" + ) + return instance_tags['flintrock-name'] + + +def _tag_specs(cluster_name: str, role: str, user_tags: dict) -> dict: + return [ + {'Key': 'flintrock-name', 'Value': cluster_name}, + {'Key': 'flintrock-role', 'Value': role}, + {'Key': 'Name', 'Value': f'{cluster_name}-{role}'}, + *user_tags, + ] + + +def _ec2_tags_to_dict(ec2_tags: list) -> dict: + return { + tag['Key']: tag['Value'] + for tag in ec2_tags + } def _get_cluster_master_slaves( - instances: list) -> ('boto3.resources.factory.ec2.Instance', list): + instances: list +) -> 'Tuple[boto3.resources.factory.ec2.Instance, List[boto3.resources.factory.ec2.Instance]]': """ Get the master and slave instances from a set of raw EC2 instances representing a Flintrock cluster. @@ -1135,25 +1144,17 @@ def _get_cluster_master_slaves( slave_instances = [] for instance in instances: - if not instance.tags: - # TODO: Better handle malformed clusters with missing tags. - # See: https://github.com/nchammas/flintrock/issues/183 - continue - for tag in instance.tags: - if tag['Key'] == 'flintrock-role': - if tag['Value'] == 'master': - if master_instance is not None: - raise Exception("More than one master found.") - else: - master_instance = instance - break - elif tag['Value'] == 'slave': - slave_instances.append(instance) - - # if not master_instance: - # print("Warning: No master found.", file=sys.stderr) - # elif not slave_instances: - # print("Warning: No slaves found.", file=sys.stderr) + tags = _ec2_tags_to_dict(instance.tags) + role = tags['flintrock-role'] + if role == 'master': + if master_instance is not None: + raise Exception("More than one master found.") + else: + master_instance = instance + elif role == 'slave': + slave_instances.append(instance) + else: + raise Exception(f"Unrecognized Flintrock role: {role}") return (master_instance, slave_instances) diff --git a/flintrock/flintrock.py b/flintrock/flintrock.py index 9c0fe67..38e1188 100644 --- a/flintrock/flintrock.py +++ b/flintrock/flintrock.py @@ -490,6 +490,11 @@ def launch( ) services += [spark] + logger.info( + "Launching 1 master and {n} slave{s}...".format( + n=num_slaves, + s='' if num_slaves == 1 else 's', + )) if provider == 'ec2': cluster = ec2.launch( cluster_name=cluster_name, @@ -822,6 +827,11 @@ def add_slaves( '--ec2-user'], scope=locals()) + logger.info( + "Launching {n} slave{s}...".format( + n=num_slaves, + s='' if num_slaves == 1 else 's', + )) if provider == 'ec2': cluster = ec2.get_cluster( cluster_name=cluster_name,