Skip to content

Commit

Permalink
Refactor staging job for mem loop and COM declare
Browse files Browse the repository at this point in the history
- Move rRUN, mem list, and COM declares inside python/yaml
- Move mem loop inside python
- Add path_exists checks to some initial conditions
- Create dicts for replace_tmpl COM declares

Refs NOAA-EMC#2475
  • Loading branch information
KateFriedman-NOAA committed Aug 12, 2024
1 parent 2913b58 commit b74e696
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 76 deletions.
55 changes: 2 additions & 53 deletions jobs/JGLOBAL_STAGE_IC
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,6 @@
source "${HOMEgfs}/ush/preamble.sh"
source "${HOMEgfs}/ush/jjob_header.sh" -e "stage_ic" -c "base stage_ic"

# Restart conditions for GFS cycle come from GDAS
# shellcheck disable=SC2153
rRUN=${RUN}
# shellcheck disable=SC2153
[[ ${RUN} == "gfs" ]] && rRUN="gdas"

# Define significant cycles
# shellcheck disable=SC2153
half_window=$(( assim_freq / 2 ))
Expand All @@ -30,56 +24,11 @@ else
fi
export current_cycle previous_cycle current_cycle_offset model_start_date_current_cycle

# Define MEMDIR_ARRAY
MEMDIR_ARRAY=()
if [[ "${RUN:-}" = "enkfgdas" || "${RUN:-}" = "gefs" ]]; then
if [[ "${RUN:-}" = "gefs" ]]; then
ii_start=0
elif [[ "${RUN:-}" = "enkfgdas" ]]; then
ii_start=1
fi
# Populate the member_dirs array based on the value of NMEM_ENS
for ((ii = "${ii_start}"; ii <= "${NMEM_ENS:-0}"; ii++)); do
MEMDIR_ARRAY+=("mem$(printf "%03d" "${ii}")")
done
else
MEMDIR_ARRAY+=("")
fi

# Initialize return code
err=0

###############################################################
for MEMDIR in "${MEMDIR_ARRAY[@]}"; do

# Declare COMs
if [[ "${MODE}" = "cycled" && "${RUN}" = "gdas" ]]; then
YMD=${current_cycle:0:8} HH=${current_cycle:8:2} declare_from_tmpl -x COMOUT_ATMOS_ANALYSIS:COM_ATMOS_ANALYSIS_TMPL
fi

if [[ ${EXP_WARM_START:-".false."} = ".true." ]]; then
MEMDIR=${MEMDIR} RUN=${rRUN} YMD=${previous_cycle:0:8} HH=${previous_cycle:8:2} declare_from_tmpl -x COMOUT_ATMOS_RESTART_PREV:COM_ATMOS_RESTART_TMPL
MEMDIR=${MEMDIR} RUN=${rRUN} YMD=${previous_cycle:0:8} HH=${previous_cycle:8:2} declare_from_tmpl -x COMOUT_MED_RESTART_PREV:COM_MED_RESTART_TMPL
else
MEMDIR=${MEMDIR} YMD=${current_cycle:0:8} HH=${current_cycle:8:2} declare_from_tmpl -x COMOUT_ATMOS_INPUT:COM_ATMOS_INPUT_TMPL
fi
if [[ "${DO_OCN:-}" = "YES" ]]; then
MEMDIR=${MEMDIR} RUN=${rRUN} YMD=${previous_cycle:0:8} HH=${previous_cycle:8:2} declare_from_tmpl -x COMOUT_OCEAN_RESTART_PREV:COM_OCEAN_RESTART_TMPL
if [[ "${REPLAY_ICS:-NO}" = "YES" ]]; then
MEMDIR=${MEMDIR} YMD=${current_cycle:0:8} HH=${current_cycle:8:2} declare_from_tmpl -x COMOUT_OCEAN_ANALYSIS:COM_OCEAN_ANALYSIS_TMPL
fi
fi
if [[ "${DO_ICE:-}" = "YES" ]]; then
MEMDIR=${MEMDIR} RUN=${rRUN} YMD=${previous_cycle:0:8} HH=${previous_cycle:8:2} declare_from_tmpl -x COMOUT_ICE_RESTART_PREV:COM_ICE_RESTART_TMPL
fi
if [[ "${DO_WAVE:-}" = "YES" ]]; then
MEMDIR=${MEMDIR} YMD=${previous_cycle:0:8} HH=${previous_cycle:8:2} declare_from_tmpl -x COMOUT_WAVE_RESTART_PREV:COM_WAVE_RESTART_TMPL
fi

# Execute staging
"${SCRgfs}/exglobal_stage_ic.py"

done # for MEMDIR in "${MEMDIR_ARRAY[@]}"; do
# Execute staging
"${SCRgfs}/exglobal_stage_ic.py"

###############################################################
# Check for errors and exit if any of the above failed
Expand Down
77 changes: 61 additions & 16 deletions parm/stage/stage.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -19,43 +19,77 @@
#############################################################

# Set variables used below
{% set cycle_HH = current_cycle | strftime("%H") %}
{% set current_cycle_YMD = current_cycle | to_YMD %}
{% set current_cycle_HH = current_cycle | strftime("%H") %}
{% set previous_cycle_YMD = previous_cycle | to_YMD %}
{% set previous_cycle_HH = previous_cycle | strftime("%H") %}
{% set m_prefix = model_start_date_current_cycle | to_YMD + "." + model_start_date_current_cycle | strftime("%H") + "0000" %}
{% set o_prefix = current_cycle_offset | to_YMD + "." + current_cycle_offset | strftime("%H") + "0000" %}
{% set p_prefix = previous_cycle | to_YMD + "." + previous_cycle | strftime("%H") + "0000" %}

#############################################################
####################################################################
# Initial condition to stage
####################################################################

# Declare a dict of search and replace terms to run on each template

{% if mem >= 0 %}
{% set mem_char = 'mem%03d' | format(mem) %}
{% else %}
{% set mem_char = '' %}
{% endif %}
{% set current_dict = ({ '${ROTDIR}':ROTDIR,
'${RUN}':RUN,
'${YMD}':current_cycle_YMD,
'${HH}':current_cycle_HH,
'${MEMDIR}': mem_char }) %}
{% set previous_dict = ({ '${ROTDIR}':ROTDIR,
'${RUN}':rRUN,
'${YMD}':previous_cycle_YMD,
'${HH}':previous_cycle_HH,
'${MEMDIR}': mem_char }) %}
{% set previous_run_dict = ({ '${ROTDIR}':ROTDIR,
'${RUN}':RUN,
'${YMD}':previous_cycle_YMD,
'${HH}':previous_cycle_HH,
'${MEMDIR}': mem_char }) %}

# Initial condition definitions

{% if MODE == "cycled" and RUN == "gdas" %}
{% set COMOUT_ATMOS_ANALYSIS = COM_ATMOS_ANALYSIS_TMPL | replace_tmpl(current_dict) %}
analysis:
mkdir:
- "{{ COMOUT_ATMOS_ANALYSIS }}"
copy:
{% for ftype in ["abias", "abias_air", "abias_int", "abias_pc", "atminc.nc", "radstat"] %}
{% if path_exists(ICSDIR ~ "/" ~ COMOUT_ATMOS_ANALYSIS | relpath(ROTDIR) ~ "/" ~ RUN ~ ".t" ~ cycle_HH ~ "z." ~ ftype) %}
- ["{{ ICSDIR }}/{{ COMOUT_ATMOS_ANALYSIS | relpath(ROTDIR) }}/{{ RUN }}.t{{ cycle_HH }}z.{{ ftype }}", "{{ COMOUT_ATMOS_ANALYSIS }}"]
{% if path_exists(ICSDIR ~ "/" ~ COMOUT_ATMOS_ANALYSIS | relpath(ROTDIR) ~ "/" ~ RUN ~ ".t" ~ current_cycle_HH ~ "z." ~ ftype) %}
- ["{{ ICSDIR }}/{{ COMOUT_ATMOS_ANALYSIS | relpath(ROTDIR) }}/{{ RUN }}.t{{ current_cycle_HH }}z.{{ ftype }}", "{{ COMOUT_ATMOS_ANALYSIS }}"]
{% endif %}
{% endfor %}
{% endif %}

{% if EXP_WARM_START == True %}
{% set COMOUT_ATMOS_RESTART_PREV = COM_ATMOS_RESTART_TMPL | replace_tmpl(previous_dict) %}
atmosphere_warm:
mkdir:
- "{{ COMOUT_ATMOS_RESTART_PREV }}"
copy:
{% for ftype in ["coupler.res", "fv_core.res.nc"] %}
- ["{{ ICSDIR }}/{{ COMOUT_ATMOS_RESTART_PREV | relpath(ROTDIR) }}/{{ o_prefix }}.{{ ftype }}", "{{ COMOUT_ATMOS_RESTART_PREV }}"]
- ["{{ ICSDIR }}/{{ COMOUT_ATMOS_RESTART_PREV | relpath(ROTDIR) }}/{{ m_prefix }}.{{ ftype }}", "{{ COMOUT_ATMOS_RESTART_PREV }}"]
{% endfor %}
{% for ftype in ["ca_data", "fv_core.res", "fv_srf_wnd.res", "fv_tracer.res", "phy_data", "sfc_data"] %}
{% for ntile in range(1, ntiles + 1) %}
- ["{{ ICSDIR }}/{{ COMOUT_ATMOS_RESTART_PREV | relpath(ROTDIR) }}/{{ o_prefix }}.{{ ftype }}.tile{{ ntile }}.nc", "{{ COMOUT_ATMOS_RESTART_PREV }}"]
- ["{{ ICSDIR }}/{{ COMOUT_ATMOS_RESTART_PREV | relpath(ROTDIR) }}/{{ m_prefix }}.{{ ftype }}.tile{{ ntile }}.nc", "{{ COMOUT_ATMOS_RESTART_PREV }}"]
{% endfor %} # ntile
{% endfor %} # ftype
{% for ntile in range(1, ntiles + 1) %}
{% if path_exists(ICSDIR ~ "/" ~ COMOUT_ATMOS_RESTART_PREV | relpath(ROTDIR) ~ "/" ~ p_prefix ~ ".sfcanl_data.tile" ~ ntile ~ ".nc") %}
- ["{{ ICSDIR }}/{{ COMOUT_ATMOS_RESTART_PREV | relpath(ROTDIR) }}/{{ p_prefix }}.sfcanl_data.tile{{ ntile }}.nc", "{{ COMOUT_ATMOS_RESTART_PREV }}"]
{% endif %} # path_exists
{% endfor %} # ntile
{% else %}
{% else %} # cold start
{% set COMOUT_ATMOS_INPUT = COM_ATMOS_INPUT_TMPL | replace_tmpl(current_dict) %}
atmosphere_cold:
mkdir:
- "{{ COMOUT_ATMOS_INPUT }}"
Expand All @@ -69,31 +103,38 @@ atmosphere_cold:
{% endif %}

{% if REPLAY_ICS == "YES" %}
{% set COMOUT_ATMOS_ANALYSIS = COM_ATMOS_ANALYSIS_TMPL | replace_tmpl(current_dict) %}
atmosphere_perturbation:
mkdir:
- "{{ COMOUT_ATMOS_ANALYSIS }}"
copy:
- ["{{ ICSDIR }}/{{ COMOUT_ATMOS_ANALYSIS | relpath(ROTDIR) }}/{{ m_prefix }}.fv3_perturbation.nc", "{{ COMOUT_ATMOS_ANALYSIS }}/{{ RUN }}.t{{ cycle_HH }}z.atminc.nc"]
- ["{{ ICSDIR }}/{{ COMOUT_ATMOS_ANALYSIS | relpath(ROTDIR) }}/{{ m_prefix }}.fv3_perturbation.nc", "{{ COMOUT_ATMOS_ANALYSIS }}/{{ RUN }}.t{{ current_cycle_HH }}z.atminc.nc"]
{% endif %}

{% if DO_NEST %}
atmosphere_nest:
{% set ntile = 7 %}
{% set ntile = 7 %}
{% if EXP_WARM_START == True %}
{% set COMOUT_ATMOS_RESTART_PREV = COM_ATMOS_RESTART_TMPL | replace_tmpl(previous_dict) %}
mkdir:
- "{{ COMOUT_ATMOS_RESTART_PREV }}"
copy:
{% if EXP_WARM_START == True %}
{% for ftype in ["ca_data", "fv_core.res", "fv_srf_wnd.res", "fv_tracer.res", "phy_data", "sfc_data"] %}
{% for ftype in ["ca_data", "fv_core.res", "fv_srf_wnd.res", "fv_tracer.res", "phy_data", "sfc_data"] %}
- ["{{ ICSDIR }}/{{ COMOUT_ATMOS_RESTART_PREV | relpath(ROTDIR) }}/{{ o_prefix }}.{{ ftype }}.tile{{ ntile }}.nc", "{{ COMOUT_ATMOS_RESTART_PREV }}/{{ o_prefix }}.{{ ftype }}.nest0{{ ntile-5 }}.tile{{ ntile }}.nc"]
{% endfor %} # ftype
{% else %}
{% for ftype in ["gfs_data", "sfc_data"] %}
{% endfor %}
{% else %} # cold start
{% set COMOUT_ATMOS_INPUT = COM_ATMOS_INPUT_TMPL | replace_tmpl(current_dict) %}
mkdir:
- "{{ COMOUT_ATMOS_INPUT }}"
copy:
{% for ftype in ["gfs_data", "sfc_data"] %}
- ["{{ COMOUT_ATMOS_INPUT }}/{{ ftype }}.tile{{ ntile }}.nc", "{{ COMOUT_ATMOS_INPUT }}/{{ ftype }}.nest0{{ ntile-5 }}.tile{{ ntile }}.nc"]
{% endfor %} # ftype
{% endif %} # cold-start
{% endfor %}
{% endif %}
{% endif %}

{% if DO_ICE %}
{% set COMOUT_ICE_RESTART_PREV = COM_ICE_RESTART_TMPL | replace_tmpl(previous_dict) %}
ice:
mkdir:
- "{{ COMOUT_ICE_RESTART_PREV }}"
Expand All @@ -102,6 +143,7 @@ ice:
{% endif %}

{% if DO_OCN %}
{% set COMOUT_OCEAN_RESTART_PREV = COM_OCEAN_RESTART_TMPL | replace_tmpl(previous_dict) %}
ocean:
mkdir:
- "{{ COMOUT_OCEAN_RESTART_PREV }}"
Expand All @@ -114,6 +156,7 @@ ocean:
{% endif %}

{% if REPLAY_ICS == "YES" %}
{% set COMOUT_OCEAN_ANALYSIS = COM_OCEAN_ANALYSIS_TMPL | replace_tmpl(current_dict) %}
replay:
mkdir:
- "{{ COMOUT_OCEAN_ANALYSIS }}"
Expand All @@ -122,6 +165,7 @@ replay:
{% endif %}

{% if EXP_WARM_START == True %}
{% set COMOUT_MED_RESTART_PREV = COM_MED_RESTART_TMPL | replace_tmpl(previous_dict) %}
{% if path_exists(ICSDIR ~ "/" ~ COMOUT_MED_RESTART_PREV | relpath(ROTDIR) ~ "/" ~ m_prefix ~ ".ufs.cpld.cpl.r.nc") %}
mediator:
mkdir:
Expand All @@ -134,6 +178,7 @@ mediator:
{% endif %} # DO_OCN=YES

{% if DO_WAVE %}
{% set COMOUT_WAVE_RESTART_PREV = COM_WAVE_RESTART_TMPL | replace_tmpl(previous_run_dict) %}
wave:
mkdir:
- "{{ COMOUT_WAVE_RESTART_PREV }}"
Expand Down
2 changes: 1 addition & 1 deletion scripts/exglobal_stage_ic.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def main():
stage = Stage(config)

# Pull out all the configuration keys needed to run stage job
keys = ['RUN', 'MODE', 'EXP_WARM_START',
keys = ['RUN', 'MODE', 'EXP_WARM_START', 'NMEM_ENS',
'previous_cycle', 'current_cycle',
'current_cycle_offset', 'model_start_date_current_cycle',
'ROTDIR', 'ICSDIR', 'STAGE_IC_YAML_TMPL',
Expand Down
34 changes: 28 additions & 6 deletions ush/python/pygfs/task/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,31 @@ def execute_stage(self, stage_dict: Dict[str, Any]) -> None:
# Add the os.path.exists function to the dict for yaml parsing
stage_dict['path_exists'] = os.path.exists

stage_set = parse_j2yaml(self.task_config.STAGE_IC_YAML_TMPL, stage_dict,
allow_missing=False)

# Copy files to ROTDIR
for key in stage_set.keys():
FileHandler(stage_set[key]).sync()
# Determine restart RUN
rRUN = self.task_config.RUN
if self.task_config.RUN == "gfs":
rRUN = "gdas"
stage_dict['rRUN'] = rRUN

# Determine ensemble member settings
MEM_START = -1 # Deterministic default, no members
if self.task_config.NMEM_ENS > 0:
if self.task_config.RUN == "gefs":
MEM_START = 0
elif self.task_config.RUN == "enkfgdas":
MEM_START = 1

if MEM_START >= 0: # Ensemble RUN
first_mem = MEM_START
last_mem = self.task_config.NMEM_ENS
else: # Deteministic RUN
first_mem = MEM_START
last_mem = MEM_START

# Loop over members
for mem in range(first_mem, last_mem + 1):
stage_dict['mem'] = mem
stage_set = parse_j2yaml(self.task_config.STAGE_IC_YAML_TMPL, stage_dict, allow_missing=False)
# Copy files to ROTDIR
for key in stage_set.keys():
FileHandler(stage_set[key]).sync()

0 comments on commit b74e696

Please sign in to comment.