From b7a8c82a6f2c38030895d651b70e3192f7be1679 Mon Sep 17 00:00:00 2001 From: "Kate.Friedman" Date: Wed, 22 May 2024 19:25:30 +0000 Subject: [PATCH] Update stage python scripts Refs #2475 --- scripts/exglobal_stage_ic.py | 23 +++++--- ush/python/pygfs/task/stage.py | 105 +++++++++++++++++++++++---------- 2 files changed, 89 insertions(+), 39 deletions(-) diff --git a/scripts/exglobal_stage_ic.py b/scripts/exglobal_stage_ic.py index 4424948185..da738493fd 100755 --- a/scripts/exglobal_stage_ic.py +++ b/scripts/exglobal_stage_ic.py @@ -17,10 +17,13 @@ def main(): stage = Stage(config) #Pull out all the configuration keys needed to run stage job - keys = ['RUN','MODE','EXP_WARM_START','CDUMP','rCDUMP', + keys = ['RUN','MODE','current_cycle','EXP_WARM_START','CDUMP','rCDUMP', + 'ROTDIR','PARMgfs', 'ntiles', - 'BASE_CPLIC','waveGRD','OCNRES','USE_OCN_PERTURB_FILES', - 'CPL_ATMIC','CPL_ICEIC','CPL_MEDIC','CPL_OCNIC','CPL_WAVIC'] + 'BASE_CPLIC','waveGRD','OCNRES', + #TODO: GEFS only#'USE_OCN_PERTURB_FILES', + #TODO: Need this#'CPL_MEDIC', + 'CPL_ATMIC','CPL_ICEIC','CPL_OCNIC','CPL_WAVIC'] stage_dict = AttrDict() for key in keys: @@ -32,19 +35,23 @@ def main(): stage_dict[key] = stage.task_config[key] #TEST PRINT - #for key in stage_dict: - # print(f'{key} = {stage_dict[key]}') + for key in stage_dict: + print(f'{key} = {stage_dict[key]}') cwd = os.getcwd() os.chdir(config.ROTDIR) # Determine which ICs to stage - stage_set = stage.determine(stage_dict) + #stage_sets = stage.determine_stage(stage_dict) + stage_set = stage.determine_stage(stage_dict) # Stage ICs - # TODO - create and invoke copies - stage.execute(stage_set) + #for stage_set in stage_sets: + # print(f'set = {stage_set}') + # stage.execute_stage(stage_set) + print(f'set = {stage_set}') + stage.execute_stage(stage_set) os.chdir(cwd) diff --git a/ush/python/pygfs/task/stage.py b/ush/python/pygfs/task/stage.py index db26a3a160..bfcc57e6ce 100644 --- a/ush/python/pygfs/task/stage.py +++ b/ush/python/pygfs/task/stage.py @@ -7,23 +7,9 @@ from logging import getLogger from typing import Any, Dict, List -from wxflow import (AttrDict, - FileHandler, - Hsi, - Htar, - Task, - cast_strdict_as_dtypedict, - chgrp, - get_gid, - logit, - mkdir_p, - parse_j2yaml, - rm_p, - strftime, - to_YMD, - to_YMDH, - Template, - TemplateConstants) +from wxflow import (AttrDict, FileHandler, Hsi, Htar, Task, cast_strdict_as_dtypedict, + chgrp, get_gid, logit, mkdir_p, parse_j2yaml, rm_p, strftime, + to_YMD, to_YMDH, Template, TemplateConstants) logger = getLogger(__name__.split('.')[-1]) @@ -83,14 +69,54 @@ def _gen_relative_paths(self, root_path: str) -> Dict: return rel_path_dict + @staticmethod @logit(logger) - def determine(self, stage_dict: Dict[str, Any]) -> (Dict[str, Any], List[Dict[str, Any]]): + def _create_fileset(stage_set: Dict[str, Any]) -> List: + """ + Collect the list of all available files from the parsed yaml dict. + Globs are expanded and if required files are missing, an error is + raised. + + TODO: expand all globs in the jinja yaml files instead of expanding + them here and issue errors here if globbing patterns (*, ?, []) + are found. + + Parameters + ---------- + stage_set: Dict + Contains full paths for required and optional files to be staged. + """ + + fileset = [] + if "required" in stage_set: + if stage_set.required is not None: + for item in stage_set.required: + glob_set = glob.glob(item) + if len(glob_set) == 0: + raise FileNotFoundError(f"FATAL ERROR: Required file, directory, or glob {item} not found!") + for entry in glob_set: + fileset.append(entry) + + if "optional" in stage_set: + if stage_set.optional is not None: + for item in stage_set.optional: + glob_set = glob.glob(item) + if len(glob_set) == 0: + logger.warning(f"WARNING: optional file/glob {item} not found!") + else: + for entry in glob_set: + fileset.append(entry) + + return fileset + + @logit(logger) + def determine_stage(self, stage_dict: Dict[str, Any]) -> (Dict[str, Any], List[Dict[str, Any]]): """Determine which initial condition files need to be placed in ROTDIR. Parameters ---------- stage_dict : Dict[str, Any] - Task specific keys, e.g. runtime options (DO_AERO, DO_ICE, etc) + Task specific keys, e.g. runtime options (DO_WAVE, DO_ICE, etc) Return ------ @@ -100,10 +126,6 @@ def determine(self, stage_dict: Dict[str, Any]) -> (Dict[str, Any], List[Dict[st stage_parm = os.path.join(stage_dict.PARMgfs, "stage") - # Add the glob.glob function for capturing log filenames - # TODO remove this kludge once log filenames are explicit - stage_dict['glob'] = glob.glob - # Add the os.path.exists function to the dict for yaml parsing stage_dict['path_exists'] = os.path.exists @@ -114,7 +136,8 @@ def determine(self, stage_dict: Dict[str, Any]) -> (Dict[str, Any], List[Dict[st if stage_dict.MODE == "cycled": master_yaml = "master_cycled.yaml.j2" elif stage_dict.MODE == "forecast-only": - master_yaml = "master_forecast_only.yaml.j2" + #master_yaml = "master_forecast_only.yaml.j2" + master_yaml = "fv3_cold.yaml.j2" elif stage_dict.RUN == "gefs": raise NotImplementedError("FATAL ERROR: Staging is not yet set up for GEFS runs") elif stage_dict.RUN == "sfs": @@ -122,16 +145,36 @@ def determine(self, stage_dict: Dict[str, Any]) -> (Dict[str, Any], List[Dict[st else: raise ValueError(f"FATAL ERROR: Staging is not enabled for {stage_dict.RUN} runs") - parsed_sets = parse_j2yaml(os.path.join(stage_parm, master_yaml), stage_dict) + #parsed_sets = parse_j2yaml(os.path.join(stage_parm, master_yaml), stage_dict) + stage_set = parse_j2yaml(os.path.join(stage_parm, master_yaml), stage_dict) + #print(f'parsed_sets = {parsed_sets}') + + #stage_sets = [] - stage_sets = [] + #for dataset in parsed_sets.datasets.values(): - for dataset in parsed_sets.datasets.values(): + # dataset["fileset"] = Stage._create_fileset(dataset) - dataset["fileset"] = Stage._create_fileset(dataset) + # stage_sets.append(dataset) - stage_sets.append(dataset) + #return stage_sets + return stage_set - return stage_sets + @logit(logger) + def execute_stage(self, stage_set: Dict[str, Any]) -> None: + """Perform local staging of initial condition files. + + Parameters + ---------- + stage_set : Dict[str, Any] + FileHandler instructions to populate ROTDIR with + + Return + ------ + None + """ -#TODO - create def for staging + # Copy files to ROTDIR + for key in stage_set.keys(): + # print(f'key = {key}') + FileHandler(stage_set[key]).sync()