diff --git a/.neurocaas_contrib_dataconfig.json b/.neurocaas_contrib_dataconfig.json new file mode 100644 index 00000000..bf302cf3 --- /dev/null +++ b/.neurocaas_contrib_dataconfig.json @@ -0,0 +1 @@ +{"configpath": "s3://bucketname/groupname/configs/config.json", "datapath": "s3://bucketname/groupname/inputs/data.txt"} \ No newline at end of file diff --git a/experiments/__pycache__/calculate_cost.cpython-36.pyc b/experiments/__pycache__/calculate_cost.cpython-36.pyc index 0e4026b4..b1b268e1 100644 Binary files a/experiments/__pycache__/calculate_cost.cpython-36.pyc and b/experiments/__pycache__/calculate_cost.cpython-36.pyc differ diff --git a/ncap_iac/ncap_blueprints/__pycache__/__init__.cpython-36.pyc b/ncap_iac/ncap_blueprints/__pycache__/__init__.cpython-36.pyc index 9052c645..f9ffe0ce 100644 Binary files a/ncap_iac/ncap_blueprints/__pycache__/__init__.cpython-36.pyc and b/ncap_iac/ncap_blueprints/__pycache__/__init__.cpython-36.pyc differ diff --git a/ncap_iac/ncap_blueprints/autolfads-torch/stack_config_template.json b/ncap_iac/ncap_blueprints/autolfads-torch/stack_config_template.json index 9a376166..c4286fe0 100644 --- a/ncap_iac/ncap_blueprints/autolfads-torch/stack_config_template.json +++ b/ncap_iac/ncap_blueprints/autolfads-torch/stack_config_template.json @@ -4,10 +4,10 @@ "STAGE": "websubstack", "Lambda": { "CodeUri": "../../protocols", - "Handler": "submit_start.handler_develop", + "Handler": "submit_start.handler_multisession", "Launch": true, "LambdaConfig": { - "AMI": "ami-08f59b6fdb19d0344", + "AMI": "ami-0cebcfea203997fcf", "INSTANCE_TYPE": "p2.8xlarge", "REGION": "us-east-1", "IAM_ROLE": "SSMRole", diff --git a/ncap_iac/ncap_blueprints/utils_stack/print_privatekey.sh b/ncap_iac/ncap_blueprints/utils_stack/print_privatekey.sh old mode 100644 new mode 100755 diff --git a/ncap_iac/protocols/__pycache__/__init__.cpython-36.pyc b/ncap_iac/protocols/__pycache__/__init__.cpython-36.pyc index f4c0cfa6..97ce6ea9 100644 Binary files a/ncap_iac/protocols/__pycache__/__init__.cpython-36.pyc and b/ncap_iac/protocols/__pycache__/__init__.cpython-36.pyc differ diff --git a/ncap_iac/protocols/__pycache__/log.cpython-36.pyc b/ncap_iac/protocols/__pycache__/log.cpython-36.pyc index 2f99bff5..a46054bf 100644 Binary files a/ncap_iac/protocols/__pycache__/log.cpython-36.pyc and b/ncap_iac/protocols/__pycache__/log.cpython-36.pyc differ diff --git a/ncap_iac/protocols/__pycache__/postprocess.cpython-36.pyc b/ncap_iac/protocols/__pycache__/postprocess.cpython-36.pyc index 6959a889..f2e013d9 100644 Binary files a/ncap_iac/protocols/__pycache__/postprocess.cpython-36.pyc and b/ncap_iac/protocols/__pycache__/postprocess.cpython-36.pyc differ diff --git a/ncap_iac/protocols/__pycache__/submit_start.cpython-36.pyc b/ncap_iac/protocols/__pycache__/submit_start.cpython-36.pyc index 3ee6923e..ab066ef2 100644 Binary files a/ncap_iac/protocols/__pycache__/submit_start.cpython-36.pyc and b/ncap_iac/protocols/__pycache__/submit_start.cpython-36.pyc differ diff --git a/ncap_iac/protocols/submit_start.py b/ncap_iac/protocols/submit_start.py index 35ba79a7..666542ba 100644 --- a/ncap_iac/protocols/submit_start.py +++ b/ncap_iac/protocols/submit_start.py @@ -168,6 +168,7 @@ def __init__(self,bucket_name,key,time): raise ValueError("[JOB TERMINATE REASON] 'configname' field not given in submit.json file") msg = " [Internal (init)] Analysis request with dataset(s): {}, config file {}".format(self.data_name,self.config_name) + self.logger.append(msg) self.logger.printlatest() self.logger.write() @@ -385,7 +386,7 @@ def get_costmonitoring(self): message = " [Internal (get_costmonitoring)] Customized budget not found. Using default budget value of {}".format(budget) self.logger.append(message) self.logger.printlatest() - except: + except: raise Exception(" [Internal (get_costmonitoring)] Unexpected Error: Unable to get budget.") except Exception: raise Exception(" [Internal (get_costmonitoring)] Unexpected Error: Unable to get budget.") @@ -613,6 +614,259 @@ def compute_volumesize(self): else: self.full_volumesize = default_size + +class Submission_multisession(Submission_dev): + """ + Lambda for launching multisession runs. + + :param bucket_name: name of the S3 bucket that this is a submission for (corresponds to an analysis). + :param key: key of submit file within this bucket. + :param time: some unique identifier that distinguishes this job from all others. + :ivar bucket_name: initial_value: bucket_name + :ivar path: name of the group responsible for this job. + :ivar time: initial value: time ## TODO Remove this field. + :ivar jobname: "job_{}_{}_{}".format(submit_name,bucket_name,self.timestamp) + :ivar jobpath: os.path.join(path,"outputs",jobname) + :ivar logger: s3.Logger object + :ivar instance_type: either given in submit file, or default option of analysis. + :ivar data_name: submit file's dataname field. + :ivar config_name: submit file's configname field. + """ + def __init__(self,bucket_name,key,time): + super().__init__(bucket_name,key,time) + + # Edit self.data_name_list to directory name in case of multisession run via website + try: + configfile = utilsparams3.load_yaml(bucket_name, self.config_name) + except Exception: + raise Exception("Config must be a valid YAML file.") + + + msg = f" [Internal (init)] Multisession in config file: {str('multisession' in configfile)}. Configfile \"multisession\" value: {str(configfile['multisession'])}. Bucket name: {str(self.bucket_name)}." + self.logger.append(msg) + self.logger.printlatest() + self.logger.write() + + if "multisession" in configfile and configfile["multisession"] == True and self.bucket_name == "autolfads-torch": + # Assumes upload from website => list of data paths. Result is [path/to/dataDirPrefix] + self.data_name_list = ["/".join(self.data_name_list[0].split("/")[:-1])] + self.data_name = self.data_name_list[0] + + msg = " [Internal (init)] UPDATED: Multisession mode. Analysis request with dataset(s): {}, config file {}".format(self.data_name,self.config_name) + self.logger.append(msg) + self.logger.printlatest() + self.logger.write() + + def get_costmonitoring(self): + """ + Gets the cost incurred by a given group so far by looking at the logs bucket of the appropriate s3 folder. + + """ + ## first get the path to the log folder we should be looking at. + group_name = self.path + assert len(group_name) > 0; "[JOB TERMINATE REASON] Can't locate the group that triggered analysis, making it impossible to determine incurred cost." + logfolder_path = "logs/{}/".format(group_name) + full_reportpath = os.path.join(logfolder_path,"i-") + ## now get all of the computereport filenames: + all_files = utilsparams3.ls_name(self.bucket_name,full_reportpath) + + ## for each, we extract the contents: + jobdata = {} + cost = 0 + ## now calculate the cost: + for jobfile in all_files: + instanceid = jobfile.split(full_reportpath)[1].split(".json")[0] + jobdata = utilsparams3.load_json(self.bucket_name,jobfile) + price = jobdata["price"] + start = jobdata["start"] + end = jobdata["end"] + try: + starttime = datetime.strptime(start, "%Y-%m-%dT%H:%M:%SZ") + endtime = datetime.strptime(end, "%Y-%m-%dT%H:%M:%SZ") + diff = endtime-starttime + duration = abs(diff.seconds) + instcost = price*duration/3600. + except TypeError: + ## In rare cases it seems one or the other of these things don't actually have entries. This is a problem. for now, charge for the hour: + message = " [Internal (get_costmonitoring)] Duration of past jobs not found. Pricing for an hour" + self.logger.append(message) + self.logger.printlatest() + instcost = price + cost+= instcost + + ## Now compare against the cost of the job you're currently running: + ## need duration from config (self.parse_config) and self.instance_type + ## By assuming they're all standard instances we upper bound the cost. + try: + price = utilsparampricing.get_price(utilsparampricing.get_region_name(utilsparampricing.region_id),self.instance_type,os = "Linux") + if self.jobduration is None: + duration = defaultduration/60 ## in hours. + else: + duration = self.jobduration/60 + jobpricebound = duration*price + cost += jobpricebound + except Exception as e: + print(e) + raise Exception(" [Internal (get_costmonitoring)] Unexpected Error: Unable to estimate cost of current job.") + + ## Now compare agains the expected cost of instances with the current ami: + try: + ami = os.environ["AMI"] + total_activeprice = self.prices_active_instances_ami(ami) + + except Exception as e: + print(e) + try: + activeprice = utilsparampricing.get_price(utilsparampricing.get_region_name(utilsparampricing.region_id),self.instance_type,os = "Linux") + number = len([i for i in utilsparamec2.get_active_instances_ami(ami)]) + activeduration = defaultduration*number/60 ## default to the default duration instead if not given. + total_activeprice = activeprice*activeduration + except Exception as e: + print(e) + raise Exception(" [Internal (get_costmonitoring)] Unexpected Error: Unable to estimate cost of active jobs.") + + cost += total_activeprice + + ## Now compare with budget: + try: + budget = float(utilsparamssm.get_budget_parameter(self.path,self.bucket_name)) + except ClientError as e: + try: + assert e.response["Error"]["Code"] == "ParameterNotFound" + budget = float(os.environ["MAXCOST"]) + message = " [Internal (get_costmonitoring)] Customized budget not found. Using default budget value of {}".format(budget) + self.logger.append(message) + self.logger.printlatest() + except: + raise Exception(" [Internal (get_costmonitoring)] Unexpected Error: Unable to get budget.") + except Exception: + raise Exception(" [Internal (get_costmonitoring)] Unexpected Error: Unable to get budget.") + + if cost < budget: + message = " [Internal (get_costmonitoring)] Projected total costs: ${}. Remaining budget: ${}".format(cost,budget-cost) + self.logger.append(message) + self.logger.printlatest() + self.logger.write() + validjob = True + elif cost >= budget: + message = " [Internal (get_costmonitoring)] Projected total costs: ${}. Over budget (${}), cancelling job. Contact administrator.".format(cost,budget) + self.logger.append(message) + self.logger.printlatest() + self.logger.write() + validjob = False + return validjob + + def acquire_instances(self): + """ + Streamlines acquisition, setting up of multiple instances. Better exception handling when instances cannot be launched, and spot instances with defined duration when avaialble. + + """ + nb_instances = 1 #all data files will be used to train a single core model + + ## Check how many instances are running. + active = utilsparamec2.count_active_instances(self.instance_type) + ## Ensure that we have enough bandwidth to support this request: + if active +nb_instances < int(os.environ['DEPLOY_LIMIT']): + pass + else: + self.logger.append(" [Internal (acquire_instances)] RESOURCE ERROR: Instance requests greater than pipeline bandwidth. Please contact NeuroCAAS admin.") + self.logger.printlatest() + self.logger.write() + raise ValueError("[JOB TERMINATE REASON] Instance requests greater than pipeline bandwidth. Too many simultaneously deployed analyses.") + + instances = utilsparamec2.launch_new_instances_with_tags_additional( + instance_type=self.instance_type, + ami=os.environ['AMI'], + logger= self.logger, + number = nb_instances, + add_size = self.full_volumesize, + duration = self.jobduration, + group = self.path, + analysis = self.bucket_name, + job = self.jobname + ) + #instances = utilsparamec2.launch_new_instances_with_tags( + #instance_type=self.instance_type, + #ami=os.environ['AMI'], + #logger= self.logger, + #number = nb_instances, + #add_size = self.full_volumesize, + #duration = self.jobduration + #) + + ## Even though we have a check in place, also check how many were launched: + try: + assert len(instances) > 0 + except AssertionError: + self.logger.append(" [Internal (acquire_instances)] RESOURCE ERROR: Instances not launched. AWS capacity reached. Please contact NeuroCAAS admin.") + self.logger.printlatest() + self.logger.write() + raise AssertionError("[JOB TERMINATE REASON] Instance requests greater than pipeline bandwidth (base AWS capacity). Too many simultaneously deployed analyses") + + self.instances = instances + + return instances + + def process_inputs(self,dryrun=False): + """ Initiates Processing On Previously Acquired EC2 Instance. This version requires that you include a config (fourth) argument """ + try: + os.environ['COMMAND'].format("a","b","c","d") + except IndexError as ie: + msg = " [Internal (process_inputs)] INPUT ERROR: not enough arguments in the COMMAND argument." + self.logger.append(msg) + self.logger.printlatest() + self.logger.write() + raise ValueError("[JOB TERMINATE REASON] Not the correct format for arguments. Protocols for job manager are misformatted.") + + ## input bucket: + if self.bypass_data["input"]["bucket"] is not None: + input_bucket = self.bypass_data["input"]["bucket"] + data_check_paths = self.bypass_data["input"]["datapath"] #use the path to the directory instead of the file paths + config_check_path = self.bypass_data["input"]["configpath"] + else: + input_bucket = self.bucket_name + data_check_paths = self.data_name_list #use the path to the directory instead of the file paths. + config_check_path = self.config_name + + if self.bypass_data["output"]["bucket"] is not None: + output_bucket = self.bypass_data["output"]["bucket"] + result_check_paths = self.bypass_data["output"]["resultpath"] + outpath_full = "s3://{}/{}".format(output_bucket,os.path.join(result_check_paths,self.jobname)) + else: + outpath_full = os.path.join(os.environ['OUTDIR'],self.jobname) + + + ## Bypass: + #self.bucket_name -> input_bucket + #self.filenames -> data_check_paths + #outpath_full -> resultpath(FULL) + #self.config_name -> config_check_path + + ## Should we vectorize the log here? + #outpath_full = os.path.join(os.environ['OUTDIR'],self.jobname) + commands = [os.environ['COMMAND'].format( + input_bucket, filename, outpath_full, config_check_path + ) for filename in data_check_paths] + + print(commands,"command to send") + if not dryrun: + for f,dirname in enumerate(data_check_paths): #there will only be one + response = utilsparamssm.execute_commands_on_linux_instances( + commands=[os.environ['COMMAND'].format( + input_bucket, dirname, outpath_full, config_check_path + )], # TODO: variable outdir as option + instance_ids=[self.instances[f].instance_id], + working_dirs=[os.environ['WORKING_DIRECTORY']], + log_bucket_name=input_bucket, + log_path=os.path.join(self.jobpath,'internal_ec2_logs') + ) + self.logger.initialize_datasets_dev(dirname,self.instances[f].instance_id,response["Command"]["CommandId"]) + self.logger.append(" [Internal (process_inputs)] Starting analysis {} with parameter set {}".format(f+1,os.path.basename(dirname))) + self.logger.printlatest() + self.logger.write() + self.logger.append(" [Internal (process_inputs)] All jobs submitted. Processing...") + return commands + ## We are no longer using this function. It depends upon automation documents that can be found in the cfn utils_stack template. Consider using this as a reference when switching to automation documents instead of pure runcommand. def add_volumes(self): """ @@ -699,8 +953,6 @@ def process_inputs(self): self.logger.write() self.logger.append(" [Internal (process_inputs)] All jobs submitted. Processing...") pass - - def process_upload_dev(bucket_name, key,time): """ @@ -1079,6 +1331,167 @@ def process_upload_deploy(bucket_name, key,time): submission.inputlogger.write() submission.submitlogger.write() +def process_upload_multisession(bucket_name,key,time): + exitcode = 99 + + donemessage = "[Job Manager] {s}: DONE" + awserrormessage = "[Job Manager] {s}: AWS ERROR. {e}\n[Job Manager] Shutting down job." + internalerrormessage = "[Job Manager] {s}: INTERNAL ERROR. {e}\n[Job Manager] Shutting down job." + + + ## Step 1: Initialization. Most basic checking for submit file. If this fails, will not generate a certificate. + step = "STEP 1/4 (Initialization)" + try: + if os.environ['LAUNCH'] == 'true': + ## Make submission object + print("creating submission object") + submission = Submission_multisession(bucket_name, key, time) + print("created submission object") + elif os.environ["LAUNCH"] == 'false': + raise NotImplementedError("This option not available for configs. ") + submission.logger.append(donemessage.format(s = step)) + submission.logger.printlatest() + submission.logger.write() + except ClientError as ce: + e = ce.response["Error"] + print(awserrormessage.format(s= step,e = e)) + return exitcode + except Exception: + e = traceback.format_exc() + print(internalerrormessage.format(s= step,e = e)) + return exitcode + + ## Step 2: Validation. If we the data does not exist, or we are over cost limit, this will fail. + step = "STEP 2/4 (Validation)" + try: + submission.check_existence() + submission.parse_config() + valid = submission.get_costmonitoring() + assert valid + submission.logger.append(donemessage.format(s = step)) + submission.logger.printlatest() + submission.logger.write() + except AssertionError as e: + print(e) + e = "Error: Job is not covered by budget. Contact NeuroCAAS administrator." + submission.logger.append(internalerrormessage.format(s= step,e = e)) + submission.logger.printlatest() + submission.logger.write() + return exitcode + except ClientError as ce: + e = ce.response["Error"] + submission.logger.append(awserrormessage.format(s = step,e = e)) + submission.logger.printlatest() + submission.logger.write() + return exitcode + except Exception: + e = traceback.format_exc() + submission.logger.append(internalerrormessage.format(s = step,e = e)) + submission.logger.printlatest() + submission.logger.write() + return exitcode + + # Step 3: Setup: Getting the volumesize, hardware specs of immutable analysis environments. + step = "STEP 3/4 (Environment Setup)" + try: + submission.compute_volumesize() + submission.logger.append(donemessage.format(s = step)) + submission.logger.printlatest() + submission.logger.write() + except ClientError as ce: + e = ce.response["Error"] + submission.logger.append(awserrormessage.format(s = step,e = e)) + submission.logger.printlatest() + submission.logger.write() + utilsparams3.write_endfile(submission.bucket_name,submission.jobpath) + return exitcode + except Exception: + e = traceback.format_exc() + submission.logger.append(internalerrormessage.format(s = step,e = e)) + submission.logger.printlatest() + submission.logger.write() + utilsparams3.write_endfile(submission.bucket_name,submission.jobpath) + return exitcode + + # Step 4: Processing: Creating the immutable analysis environments, sending the commands to them. + step = "STEP 4/4 (Initialize Processing)" + try: + ## From here on out, if something goes wrong we will terminate all created instances. + instances=submission.acquire_instances() + submission.logger.printlatest() + submission.logger.write() + jobs = submission.log_jobs() + submission.logger.printlatest() + submission.logger.write() + ## NOTE: IN LAMBDA, JSON BOOLEANS ARE CONVERTED TO STRING + if os.environ["MONITOR"] == "true": + submission.put_instance_monitor_rule() + elif os.environ["MONITOR"] == "false": + submission.logger.append(" [Internal (monitoring)] Skipping monitor.") + submission.logger.write() + submission.start_instance() + submission.logger.write() + submission.process_inputs() + submission.logger.append(donemessage.format(s = step)) + submission.logger.printlatest() + submission.logger.write() + submission.logger.append("JOB MONITOR LOG COMPLETE. SEE TOP FOR LIVE PER-DATASET MONITORING") + submission.logger.initialize_monitor() + ## should be a success at this point. + exitcode = 0 + except ClientError as ce: + e = ce.response["Error"] + ## We occasianally get "Invalid Instance Id calls due to AWS side errors." + if e["Code"] == "InvalidInstanceId": + e = "Transient AWS Communication Error. Please Try Again" + submission.logger.append(awserrormessage.format(s = step,e = e)) + submission.logger.printlatest() + submission.logger.write() + ## We need to separately attempt all of the relevant cleanup steps. + try: + ## In this case we need to delete the monitor log: + [utilsparams3.delete_active_monitorlog(submission.bucket_name,"{}.json".format(inst.id)) for inst in instances] + except Exception: + se = traceback.format_exc() + message = "While cleaning up from AWS Error, another error occured: {}".format(se) + submission.logger.append(internalerrormessage.format(s = step,e = message)) + submission.logger.printlatest() + submission.logger.write() + try: + ## We also need to delete the monitor rule: + utilsparamevents.full_delete_rule(submission.rulename) + except Exception: + se = traceback.format_exc() + message = "While cleaning up from AWS Error, another error occured: {}".format(se) + submission.logger.append(internalerrormessage.format(s = step,e = message)) + submission.logger.printlatest() + submission.logger.write() + ## We finally need to terminate the relevant instances: + for inst in instances: + try: + inst.terminate() + except Exception: + se = traceback.format_exc() + message = "While cleaning up from AWS Error, another error occured: {}".format(se) + submission.logger.append(internalerrormessage.format(s = step,e = message)) + submission.logger.printlatest() + submission.logger.write() + continue + except Exception: + e = traceback.format_exc() + try: + [inst.terminate() for inst in instances] + except UnboundLocalError: + submission.logger.append("No instances to terminate") + submission.logger.printlatest() + submission.logger.write() + + submission.logger.append(internalerrormessage.format(s = step,e = e)) + submission.logger.printlatest() + submission.logger.write() + + return exitcode + ## Actual lambda handlers. def handler_develop(event,context): """ @@ -1124,3 +1537,31 @@ def handler_ensemble(event,context): print("processing returned exit code {}".format(exitcode)) return exitcode +def handler_multisession(event,context): + """ + Handler for multisession modeling. + """ + for record in event['Records']: + time = record['eventTime'] + bucket_name = record['s3']['bucket']['name'] + key = record['s3']['object']['key'] + submit_file = utilsparams3.load_json(bucket_name, key) + configpath = submit_file["configname"] + try: + configfile = utilsparams3.load_yaml(bucket_name, configpath) + except Exception: + raise Exception("Config must be a valid YAML file.") + print(f"#########\nDEBUGGING MULTISESSION:\nmultisession in config file: {'multisession' in configfile}\nconfig[\"multisession\"]: {configfile['multisession']}") + try: + if "multisession" in configfile and configfile["multisession"] == True: + print("Creating a single instance for multisession modeling.") + exitcode = process_upload_multisession(bucket_name, key, time) + print("process returned exit code {}".format(exitcode)) + else: + print("DEBUG: REACHED DEV PATH") + exitcode = process_upload_dev(bucket_name, key, time) + print("process returned with exit code {}".format(exitcode)) + except KeyError: + raise Exception("Config file does not specify \"multisession\" param.") + + return exitcode \ No newline at end of file diff --git a/tests/protocol_tests/test_submit_start.py b/tests/protocol_tests/test_submit_start.py index 4f7cabe3..865d88d3 100644 --- a/tests/protocol_tests/test_submit_start.py +++ b/tests/protocol_tests/test_submit_start.py @@ -21,9 +21,12 @@ with open(os.path.join(loc,"../../ncap_iac/global_params_initialized.json")) as f: gpdict = json.load(f) bucket_name = "test-submitlambda-analysis" +bucket_name_multi = "test-submitlambda-analysis-multi" sep_bucket = "independent" +sep_bucket_multi = "independent-multi" bucket_name_legacy = "test-submitlambda-analysis-legacy" key_name = "test_user/submissions/submit.json" +key_name_single = "test_user/submissions/singlesubmit.json" fakedatakey_name = "test_user/submissions/fakedatasubmit.json" fakeconfigkey_name = "test_user/submissions/fakeconfigsubmit.json" notimestampkey_name = "test_user/submissions/notimestampconfigsubmit.json" @@ -241,6 +244,109 @@ def setup_testing_bucket(monkeypatch): logging.error(e) raise yield bucket_name,os.path.join(user_name,"submissions/submit.json") + +@pytest.fixture +def setup_testing_bucket_multisession(monkeypatch): + """Sets up a localstack bucket called test-submitlambda-analysis with the following directory structure: + / + |-test_user + |-inputs + |-data.json + |-configs + |-config.json + |-submissions + |-submit.json + |-logs + |-test_user + |-joblog1 + |-joblog2 + ... + + Additionally sets up a separate bucket, "{sep_bucket}", with the following structure: + / + |-sep_inputs %% these for "bucket bypass" + |-data.json + |-sep_configs + |-config.json + |-sep_results + |- + + """ + + subkeys = { + "inputs/data1.json":{"data":"value"}, + "inputs/data2.json":{"data":"value"}, + "configs/config.json":{"param":"p1"}, + "configs/fullconfig.json":{"param":"p1","__duration__":360,"__dataset_size__":20,"ensemble_size":5}, + "configs/duration10config.json":{"param":"p1","__duration__":10,"__dataset_size__":20}, + "configs/duration600config.json":{"param":"p1","__duration__":600,"__dataset_size__":20}, + "configs/durationnoneconfig.json":{"param":"p1","__dataset_size__":20}, + "submissions/submit.json":{ + "dataname":os.path.join(user_name,"inputs"), + "configname":os.path.join(user_name,"configs","config.json"), + "timestamp":"testtimestamp"}, + "submissions/{}".format(os.path.basename(fakedatakey_name)):{ + "dataname":os.path.join(user_name,"inputs"), + "configname":os.path.join(user_name,"configs","config.json"), + "timestamp":"testtimestamp"}, + "submissions/{}".format(os.path.basename(fakeconfigkey_name)):{ + "dataname":os.path.join(user_name,"inputs"), + "configname":os.path.join(user_name,"configs","config22.json"), + "timestamp":"testtimestamp"}, + "submissions/{}".format(os.path.basename(nodatakey_name)):{ + "configname":os.path.join(user_name,"configs","config22.json"), + "timestamp":"testtimestamp"}, + "submissions/{}".format(os.path.basename(noconfigkey_name)):{ + "dataname":os.path.join(user_name,"inputs"), + "timestamp":"testtimestamp"}, + "submissions/{}".format(os.path.basename(notimestampkey_name)):{ + "dataname":os.path.join(user_name,"inputs"), + "configname":os.path.join(user_name,"configs","config22.json")}, + ## new: bucket skip. + "submissions/bucketskipsubmit.json":{ + "dataname":os.path.join("s3://{}".format(sep_bucket_multi),"sep_inputs"), + "configname":os.path.join("s3://{}".format(sep_bucket_multi),"sep_configs","configsep.json"), + "timestamp":"testtimestamp", + "resultpath":"s3://{}/sep_results".format(sep_bucket_multi)} + } + + session = localstack_client.session.Session() + s3_client = session.client("s3") + s3_resource = session.resource("s3") + monkeypatch.setattr(s3, "s3_client", session.client("s3")) ## TODO I don't think these are scoped correctly w/o a context manager. + monkeypatch.setattr(s3, "s3_resource", session.resource("s3")) + try: + for sk in subkeys: + obj = s3_client.get_object(Bucket = bucket_name_multi,Key = os.path.join(user_name,sk)) + s3_client.get_object(Bucket = bucket_name_multi,Key="logs/test_user/i-0ff308d5c9b5786f4.json") + except ClientError: + ## Write data files + s3_client.create_bucket(Bucket = bucket_name_multi) + for sk in subkeys: + key = os.path.join(user_name,sk) + writeobj = s3_resource.Object(bucket_name_multi,key) + content = bytes(json.dumps(subkeys[sk]).encode("UTF-8")) + writeobj.put(Body = content) + ## Test storage bypass with additional subkeys + s3_client.create_bucket(Bucket = sep_bucket_multi) + ind_data = { + "sep_inputs/datasep1.json":{"data":"value"}, + "sep_inputs/datasep2.json":{"data2":"value2"}, + "sep_configs/configsep.json":{"param":"p1"}} + for d in ind_data: + writeobj = s3_resource.Object(sep_bucket_multi,d) + content = bytes(json.dumps(ind_data[d]).encode("UTF-8")) + writeobj.put(Body=content) + + ## Write logs + log_paths = get_paths(test_log_mats) + try: + for f in log_paths: + s3_client.upload_file(os.path.join(test_log_mats,f),bucket_name_multi,Key = f) + except ClientError as e: + logging.error(e) + raise + yield bucket_name_multi,os.path.join(user_name,"submissions/submit.json") @pytest.fixture def setup_testing_bucket_legacy(monkeypatch): @@ -341,6 +447,37 @@ def check_instances(): for instance in instances: pytest.fail("Uncleaned instances!") + +@pytest.fixture +def set_ssm_budget_under_multi(monkeypatch): + """Use SSM to set budget lower than the tolerable value. + + """ + session = localstack_client.session.Session() + ssm_client = session.client("ssm") + monkeypatch.setattr(ssm, "ssm_client", session.client("ssm")) ## TODO I don't think these are scoped correctly w/o a context manager. + ssm_client.put_parameter(Name = ssm.budgetname.format(g =user_name,a=bucket_name_multi), + Overwrite = False, + Value = "0", + Type = "String") + yield + ssm_client.delete_parameter(Name = ssm.budgetname.format(g=user_name,a = bucket_name_multi)) + +@pytest.fixture +def set_ssm_budget_over_multi(monkeypatch): + """Use SSM to set budget higher than the tolerable value. + + """ + session = localstack_client.session.Session() + ssm_client = session.client("ssm") + monkeypatch.setattr(ssm, "ssm_client", session.client("ssm")) ## TODO I don't think these are scoped correctly w/o a context manager. + ssm_client.put_parameter(Name = ssm.budgetname.format(g=user_name,a=bucket_name_multi), + Overwrite = False, + Value = "1300", + Type = "String") + yield + ssm_client.delete_parameter(Name = ssm.budgetname.format(g=user_name,a=bucket_name_multi)) + @pytest.fixture def set_ssm_budget_under(monkeypatch): """Use SSM to set budget lower than the tolerable value. @@ -503,12 +640,12 @@ def test_Submission_dev_get_costmonitoring_fail_active(self,setup_lambda_env,set assert not sd.get_costmonitoring() def test_Submission_dev_get_costmonitoring_ssm_fail(self,setup_lambda_env,setup_testing_bucket,check_instances,monkeypatch,set_ssm_budget_under,set_price,patch_boto3_ec2): - bucket_name,submit_path = setup_testing_bucket[0],setup_testing_bucket[1] - sd = submit_start.Submission_dev(bucket_name,key_name,"111111111") - monkeypatch.setenv("MAXCOST",str(1300)) - sd.check_existence() - sd.parse_config() - assert not sd.get_costmonitoring() + bucket_name,submit_path = setup_testing_bucket[0],setup_testing_bucket[1] + sd = submit_start.Submission_dev(bucket_name,key_name,"111111111") + monkeypatch.setenv("MAXCOST",str(1300)) + sd.check_existence() + sd.parse_config() + assert not sd.get_costmonitoring() def test_Submission_dev_get_costmonitoring_ssm(self,setup_lambda_env,setup_testing_bucket,check_instances,monkeypatch,set_ssm_budget_over,set_price,patch_boto3_ec2): bucket_name,submit_path = setup_testing_bucket[0],setup_testing_bucket[1] @@ -786,3 +923,246 @@ def test_Submission_Launch_Monitor_log_jobs(self,setup_lambda_env,setup_testing_ sd.acquire_instance() sd.log_jobs() + + +class Test_Submission_multisession(): + def test_Submission_multisession(self,setup_lambda_env,setup_testing_bucket_multisession,check_instances): + ## set up the os environment correctly. + bucket_name,submit_path = setup_testing_bucket_multisession[0],setup_testing_bucket_multisession[1] + sd = submit_start.Submission_multisession(bucket_name,submit_path,"111111111") + + def test_Submission_multisession_nobucket(self,setup_lambda_env,setup_testing_bucket_multisession,check_instances): + bucket_name,submit_path = setup_testing_bucket_multisession[0],setup_testing_bucket_multisession[1] + with pytest.raises(FileNotFoundError): + sd = submit_start.Submission_multisession("fakebucket",submit_path,"111111111") + + def test_Submission_multisession_nofile(self,setup_lambda_env,setup_testing_bucket_multisession,check_instances): + bucket_name,submit_path = setup_testing_bucket_multisession[0],setup_testing_bucket_multisession[1] + with pytest.raises(FileNotFoundError): + sd = submit_start.Submission_multisession(bucket_name,os.path.join(os.path.dirname(submit_path),"fakefilesubmit.json"),"111111111") + + def test_Submission_multisession_file_misconfig(self,setup_lambda_env,setup_testing_bucket_multisession,check_instances): + """If unable to find a group name:""" + bucket_name,submit_path = setup_testing_bucket_multisession[0],setup_testing_bucket_multisession[1] + with pytest.raises(FileNotFoundError): + sd = submit_start.Submission_multisession(bucket_name,"fakefilesubmit.json","111111111") + + def test_Submission_multisession_no_datakey(self,setup_lambda_env,setup_testing_bucket_multisession,check_instances): + """If unable to find a group name:""" + bucket_name,submit_path = setup_testing_bucket_multisession[0],setup_testing_bucket_multisession[1] + with pytest.raises(ValueError): + sd = submit_start.Submission_multisession(bucket_name,nodatakey_name,"111111111") + + def test_Submission_multisession_no_configkey(self,setup_lambda_env,setup_testing_bucket_multisession,check_instances): + """If unable to find a group name:""" + bucket_name,submit_path = setup_testing_bucket_multisession[0],setup_testing_bucket_multisession[1] + with pytest.raises(ValueError): + sd = submit_start.Submission_multisession(bucket_name,noconfigkey_name,"111111111") + + def test_Submission_multisession_no_timestamp(self,setup_lambda_env,setup_testing_bucket_multisession,check_instances): + """If no timestamp provided:""" + bucket_name,submit_path = setup_testing_bucket_multisession[0],setup_testing_bucket_multisession[1] + with pytest.raises(ValueError): + sd = submit_start.Submission_multisession(bucket_name,notimestampkey_name,"111111111") + + +### Testing check_existence. + def test_Submission_multisession_check_existence(self,setup_lambda_env,setup_testing_bucket_multisession,check_instances): + """check existence of data in s3.""" + submitname="submit.json" + path=["test_user/inputs"] + bucket_name,submit_path = setup_testing_bucket_multisession[0],setup_testing_bucket_multisession[1] + submit_path = os.path.join(user_name,"submissions",submitname) + sd = submit_start.Submission_multisession(bucket_name,submit_path,"111111111") + sd.check_existence() + assert sd.filenames == path + + @pytest.mark.parametrize("dataname,error",[(1,TypeError),("fake",ValueError),(["fake","fake"],ValueError)]) + def test_Submission_multisession_check_existence_wrongdata(self,setup_lambda_env,setup_testing_bucket_multisession,check_instances,dataname,error): + """check existence of data in s3.""" + bucket_name,submit_path = setup_testing_bucket_multisession[0],setup_testing_bucket_multisession[1] + sd = submit_start.Submission_multisession(bucket_name,submit_path,"111111111") + sd.data_name = dataname + if type(dataname)!= list: + sd.data_name_list = [dataname] + elif type(dataname) == list: + sd.data_name_list = dataname + with pytest.raises(error): + sd.check_existence() + + def test_Submission_multisession_check_existence_wrongconfig(self,setup_lambda_env,setup_testing_bucket_multisession,check_instances): + """check existence of data in s3.""" + bucket_name,submit_path = setup_testing_bucket_multisession[0],setup_testing_bucket_multisession[1] + sd = submit_start.Submission_multisession(bucket_name,submit_path,"111111111") + sd.config_name = "trash.yaml" + with pytest.raises(ValueError): + sd.check_existence() + +## Testing function get_costmonitoring + def test_Submission_multisession_get_costmonitoring(self,setup_lambda_env,setup_testing_bucket_multisession,check_instances,monkeypatch,set_price,patch_boto3_ec2): + session = localstack_client.session.Session() + ssm_client = session.client("ssm") + monkeypatch.setattr(ssm, "ssm_client", session.client("ssm")) + bucket_name,submit_path = setup_testing_bucket_multisession[0],setup_testing_bucket_multisession[1] + sd = submit_start.Submission_multisession(bucket_name,key_name,"111111111") + monkeypatch.setenv("MAXCOST",str(1300)) + sd.check_existence() ## check existence of dataset files. Necessary bc we project costs of this job. + sd.parse_config() + assert sd.get_costmonitoring() + + def test_Submission_multisession_get_costmonitoring_fail(self,setup_lambda_env,setup_testing_bucket_multisession,check_instances,monkeypatch,set_price,patch_boto3_ec2): + session = localstack_client.session.Session() + ssm_client = session.client("ssm") + monkeypatch.setattr(ssm, "ssm_client", session.client("ssm")) + bucket_name,submit_path = setup_testing_bucket_multisession[0],setup_testing_bucket_multisession[1] + sd = submit_start.Submission_multisession(bucket_name,key_name,"111111111") + monkeypatch.setenv("MAXCOST",str(1200)) + sd.check_existence() + sd.parse_config() + assert not sd.get_costmonitoring() + + def test_Submission_multisession_get_costmonitoring_fail_active(self,setup_lambda_env,setup_testing_bucket_multisession,check_instances,monkeypatch,set_price,patch_boto3_ec2): + session = localstack_client.session.Session() + ssm_client = session.client("ssm") + monkeypatch.setattr(ssm, "ssm_client", session.client("ssm")) + #def raiser(ami,other): + # raise Exception + #monkeypatch.setattr(submit_start.Submission_dev,"prices_active_instances_ami",raiser) + bucket_name,submit_path = setup_testing_bucket_multisession[0],setup_testing_bucket_multisession[1] + sd = submit_start.Submission_multisession(bucket_name,key_name,"111111111") + monkeypatch.setenv("MAXCOST",str(1200)) + sd.check_existence() + sd.parse_config() + assert not sd.get_costmonitoring() + + def test_Submission_multisession_get_costmonitoring_ssm_fail(self,setup_lambda_env,setup_testing_bucket_multisession,check_instances,monkeypatch,set_ssm_budget_under_multi,set_price,patch_boto3_ec2): + bucket_name,submit_path = setup_testing_bucket_multisession[0],setup_testing_bucket_multisession[1] + sd = submit_start.Submission_multisession(bucket_name,key_name,"111111111") + monkeypatch.setenv("MAXCOST",str(1300)) + sd.check_existence() + sd.parse_config() + assert not sd.get_costmonitoring() + + def test_Submission_multisession_get_costmonitoring_ssm(self,setup_lambda_env,setup_testing_bucket_multisession,check_instances,monkeypatch,set_ssm_budget_over_multi,set_price,patch_boto3_ec2): + bucket_name,submit_path = setup_testing_bucket_multisession[0],setup_testing_bucket_multisession[1] + sd = submit_start.Submission_multisession(bucket_name,key_name,"111111111") + monkeypatch.setenv("MAXCOST",str(1200)) + sd.check_existence() + sd.parse_config() + assert sd.get_costmonitoring() + + + def test_Submission_multisession_get_costmonitoring_ssm_default_fail(self,setup_lambda_env,setup_testing_bucket_multisession,check_instances,monkeypatch,set_ssm_budget_other,set_price,patch_boto3_ec2): + bucket_name,submit_path = setup_testing_bucket_multisession[0],setup_testing_bucket_multisession[1] + sd = submit_start.Submission_multisession(bucket_name,key_name,"111111111") + monkeypatch.setenv("MAXCOST",str(1200)) + sd.check_existence() + sd.parse_config() + assert not sd.get_costmonitoring() + + def test_Submission_multisession_get_costmonitoring_ssm_default(self,setup_lambda_env,setup_testing_bucket_multisession,check_instances,monkeypatch,set_ssm_budget_other,set_price,patch_boto3_ec2): + bucket_name,submit_path = setup_testing_bucket_multisession[0],setup_testing_bucket_multisession[1] + sd = submit_start.Submission_multisession(bucket_name,key_name,"111111111") + monkeypatch.setenv("MAXCOST",str(1300)) + sd.check_existence() + sd.parse_config() + assert sd.get_costmonitoring() + + + def test_Submission_multisession_parse_config(self,setup_lambda_env,setup_testing_bucket_multisession): + bucket_name,submit_path = setup_testing_bucket_multisession[0],setup_testing_bucket_multisession[1] + sd = submit_start.Submission_multisession(bucket_name,key_name,"111111111") + sd.parse_config() + assert sd.jobduration is None + assert sd.jobsize is None + + def test_Submission_multisession_parse_config_full(self,setup_lambda_env,setup_testing_bucket_multisession): + bucket_name,submit_path = setup_testing_bucket_multisession[0],setup_testing_bucket_multisession[1] + sd = submit_start.Submission_multisession(bucket_name,key_name,"111111111") + sd.config_name = os.path.join(user_name,"configs","fullconfig.json") + sd.parse_config() + assert sd.jobduration == 360 + assert sd.jobsize == 20 + + def test_Submission_multisession_acquire_instances(self,create_securitygroup,create_instance_profile,monkeypatch,setup_lambda_env,setup_testing_bucket_multisession,create_ami,kill_instances): + """For this test, we generate fake instances. We need to monkeypatch into ec2 in order to do so. + + """ + bucket_name,submit_path = setup_testing_bucket_multisession[0],setup_testing_bucket_multisession[1] + ami = create_ami + sg = create_securitygroup + + session = localstack_client.session.Session() + monkeypatch.setattr(ec2,"ec2_client",session.client("ec2")) + monkeypatch.setenv("SECURITY_GROUPS",sg) + monkeypatch.setattr(ec2,"ec2_resource",session.resource("ec2")) + + sd = submit_start.Submission_multisession(bucket_name,submit_path,"111111111") + monkeypatch.setenv("AMI",ami) + sd.config_name = os.path.join(user_name,"configs","fullconfig.json") + sd.check_existence() + sd.parse_config() + sd.compute_volumesize() + sd.acquire_instances() + info= ec2_client.describe_instances(InstanceIds = [i.id for i in sd.instances]) + for instanceinfo in info["Reservations"][0]["Instances"]: + tags = instanceinfo["Tags"] + assert {"Key":"PriceTracking","Value":"On"} in tags + assert {"Key":"Timeout","Value":"360"} in tags + assert {"Key":"group","Value":sd.path} in tags + assert {"Key":"job","Value":sd.jobname} in tags + assert {"Key":"analysis","Value":sd.bucket_name} in tags + + def test_Submission_multisession_skip(self,create_securitygroup,monkeypatch,create_instance_profile,setup_lambda_env,setup_testing_bucket_multisession,create_ami,kill_instances): + """Like the test directly above, but assuming we run in "storage skip" mode. + + """ + + ## set up the os environment correctly. + bucket_name,submit_path = setup_testing_bucket_multisession[0],setup_testing_bucket_multisession[1] + ami = create_ami + sg = create_securitygroup + + ## we need the s3 and ec2 relevant modules patched: + session = localstack_client.session.Session() + monkeypatch.setattr(s3,"s3_client",session.client("s3")) + monkeypatch.setattr(s3,"s3_resource",session.resource("s3")) + monkeypatch.setattr(ec2,"ec2_client",session.client("ec2")) + monkeypatch.setenv("SECURITY_GROUPS",sg) + monkeypatch.setattr(ec2,"ec2_resource",session.resource("ec2")) + skipsubmit = os.path.join(os.path.dirname(submit_path),"bucketskipsubmit.json") + + sd = submit_start.Submission_multisession(bucket_name,skipsubmit,"111111111") + assert sd.bypass_data["input"]["bucket"] == sep_bucket_multi + assert sd.bypass_data["output"]["bucket"] == sep_bucket_multi + assert sd.bypass_data["input"]["datapath"] == ["sep_inputs"] + assert sd.bypass_data["input"]["configpath"] =="sep_configs/configsep.json" + assert sd.bypass_data["output"]["resultpath"] =="sep_results" + monkeypatch.setenv("AMI",ami) + ## get submit file + submit= s3.load_json(bucket_name,skipsubmit) + + ## Check that data and config exist at non-traditional location given full path: + sd.check_existence() + ## Check that output directory exists: + assert len(s3.ls_name(sep_bucket_multi,os.path.join("sep_results","job__test-submitlambda-analysis-multi_testtimestamp","logs"))) > 0 + + ## Check bucket name and path are not altered for all other processing: + assert sd.bucket_name == bucket_name + assert sd.path == re.findall('.+?(?=/'+os.environ["SUBMITDIR"]+')',skipsubmit)[0] + sd.parse_config() + sd.compute_volumesize() + sd.acquire_instances() + #sd.start_instance() + commands = sd.process_inputs(dryrun=True) + reference_command = os.environ["COMMAND"].format(sep_bucket_multi,"sep_inputs","s3://independent-multi/sep_results/job__test-submitlambda-analysis-multi_testtimestamp","sep_configs/configsep.json") + print(f"\n\n\n\n\n commands 0: {commands[0]} \n\n\n\n the command: {reference_command} \n\n\n\n\n") + assert commands[0] == reference_command + info= ec2_client.describe_instances(InstanceIds = [i.id for i in sd.instances]) + #for instanceinfo in info["Reservations"][0]["Instances"]: + # tags = instanceinfo["Tags"] + # assert {"Key":"PriceTracking","Value":"On"} in tags + # assert {"Key":"Timeout","Value":"360"} in tags + # assert {"Key":"group","Value":sd.path} in tags + # assert {"Key":"job","Value":sd.jobname} in tags + # assert {"Key":"analysis","Value":sd.bucket_name} in tags