Skip to content

Commit

Permalink
Add capability to run forecast in segments (NOAA-EMC#2795)
Browse files Browse the repository at this point in the history
Adds the ability to run a forecast in segments instead of all at once.
To accomplish this, a new local `checkpnts` variable is introduced to
`config.base` to contain a comma-separated list of intermediate stopping
points for the forecast. This is combined with `FHMIN_GFS` and
`FHMAX_GFS` to create a comma-separated string `FCST_SEGMENTS` with all
the start/end points that is used by `config.fcst` and rocoto workflow.
Capability to parse these into python lists was added to wxflow in an
accompanying PR. If `checkpnts` is an empty string, this will result in
a single-segment forecast.

To accommodate the new segment metatasks that must be run serially, the
capability of `create_task()` was expanded to allow a dictionary key of
`is_serial`, which controls whether a metatask is parallel or serial
using pre-existing capability in rocoto. The default when not given is
parallel (i.e. most metatasks).

Resolves NOAA-EMC#2274
Refs NOAA-EMC/wxflow#39
Refs NOAA-EMC/wxflow#40
  • Loading branch information
WalterKolczynski-NOAA authored Aug 12, 2024
1 parent e2c0f06 commit 1d53953
Show file tree
Hide file tree
Showing 26 changed files with 209 additions and 120 deletions.
1 change: 1 addition & 0 deletions ci/cases/yamls/gfs_extended_ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ base:
DO_AWIPS: "NO"
DO_NPOESS: "YES"
DO_GENESIS_FSU: "NO"
FCST_BREAKPOINTS: 192
FHMAX_GFS: 384
FHMAX_HF_GFS: 120
13 changes: 12 additions & 1 deletion jobs/JGLOBAL_FORECAST
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,17 @@ fi
# Remove the Temporary working directory
##########################################
cd "${DATAROOT}" || true
[[ "${KEEPDATA}" == "NO" ]] && rm -rf "${DATA}" "${DATArestart}" # do not remove DATAjob. It contains DATAoutput
# do not remove DATAjob. It contains DATAoutput
if [[ "${KEEPDATA}" == "NO" ]]; then
rm -rf "${DATA}"

# Determine if this is the last segment
commas="${FCST_SEGMENTS//[^,]}"
n_segs=${#commas}
if (( n_segs - 1 == ${FCST_SEGMENT:-0} )); then
# Only delete temporary restarts if it is the last segment
rm -rf "${DATArestart}"
fi
fi

exit 0
4 changes: 2 additions & 2 deletions parm/archive/enkf.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ enkf:
{% endfor %}
- "logs/{{ cycle_YMDH }}/{{ RUN }}echgres.log"
- "logs/{{ cycle_YMDH }}/{{ RUN }}esfc.log"
{% for grp in range(iaufhrs | length) %}
{% for grp in range(IAUFHRS | length) %}
- "logs/{{ cycle_YMDH }}/{{ RUN }}ecen{{ '%03d' % grp }}.log"
{% endfor %}

Expand Down Expand Up @@ -68,7 +68,7 @@ enkf:
{% if DOIAU %}
# IAU increments/analyses

{% for fhr in iaufhrs if fhr != 6 %}
{% for fhr in IAUFHRS if fhr != 6 %}
{% if do_calc_increment %}
# Store analyses instead of increments
- "{{ COMIN_ATMOS_ANALYSIS_ENSSTAT | relpath(ROTDIR) }}/{{ head }}atma{{ '%03d' % fhr }}.ensmean.nc"
Expand Down
4 changes: 2 additions & 2 deletions parm/archive/enkf_restarta_grp.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ enkf_restarta_grp:
{% endif %}

# Member increments
{% for iaufhr in iaufhrs if iaufhr != 6 %}
{% for iaufhr in IAUFHRS if iaufhr != 6 %}
{% set iaufhr = iaufhr %}
{% if do_calc_increment %}
- "{{ COMIN_ATMOS_ANALYSIS_MEM | relpath(ROTDIR) }}/{{ head }}atma{{ '%03d' % iaufhr }}.nc"
{% else %}
- "{{ COMIN_ATMOS_ANALYSIS_MEM | relpath(ROTDIR) }}/{{ head }}ratmi{{ '%03d' % iaufhr }}.nc"
{% endif %}
{% endfor %} # iaufhr in iaufhrs
{% endfor %} # iaufhr in IAUFHRS

# Conventional data
{% if not lobsdiag_forenkf and not DO_JEDIATMENS %}
Expand Down
4 changes: 2 additions & 2 deletions parm/archive/gdas.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ gdas:
- "{{ COMIN_ATMOS_ANALYSIS | relpath(ROTDIR) }}/{{ head }}atmanl.ensres.nc"
{% if DOIAU %}
# Ensemble IAU analysis residuals
{% for fhr in iaufhrs if fhr != 6 %}
{% for fhr in IAUFHRS if fhr != 6 %}
- "{{ COMIN_ATMOS_ANALYSIS | relpath(ROTDIR) }}/{{ head }}atma{{ '%03d' % fhr }}.ensres.nc"
{% endfor %}
{% endif %}
Expand Down Expand Up @@ -108,7 +108,7 @@ gdas:
{% endif %} # End of cycled data

# Forecast and post logs
- "logs/{{ cycle_YMDH }}/{{ RUN }}fcst.log"
- "logs/{{ cycle_YMDH }}/{{ RUN }}fcst_seg0.log"

{% for fhr in range(0, FHMAX + 1, 3) %}
{% set fhr3 = '%03d' % fhr %}
Expand Down
2 changes: 1 addition & 1 deletion parm/archive/gdas_restarta.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ gdas_restarta:
# Deterministic analysis increments
- "{{ COMIN_ATMOS_ANALYSIS | relpath(ROTDIR) }}/{{ head }}atminc.nc"
# IAU increments
{% for iaufhr in iaufhrs if iaufhr != 6 %}
{% for iaufhr in IAUFHRS if iaufhr != 6 %}
- "{{ COMIN_ATMOS_ANALYSIS | relpath(ROTDIR) }}/{{ head }}atmi{{ "%03d" % iaufhr }}.nc"
{% endfor %}

Expand Down
2 changes: 1 addition & 1 deletion parm/archive/gfs_netcdfa.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ gfs_netcdfa:
- "{{ COMIN_ATMOS_ANALYSIS | relpath(ROTDIR) }}/{{ head }}atmanl.nc"
- "{{ COMIN_ATMOS_ANALYSIS | relpath(ROTDIR) }}/{{ head }}sfcanl.nc"
- "{{ COMIN_ATMOS_ANALYSIS | relpath(ROTDIR) }}/{{ head }}atminc.nc"
{% for iauhr in iaufhrs if iauhr != 6 %}
{% for iauhr in IAUFHRS if iauhr != 6 %}
- "{{ COMIN_ATMOS_ANALYSIS | relpath(ROTDIR) }}/{{ head }}atmi{{ "%03d" % iauhr }}.nc"
{% endfor %}
optional:
Expand Down
22 changes: 0 additions & 22 deletions parm/archive/master_enkf.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,6 @@
{% set cycle_YMDH = current_cycle | to_YMDH %}
{% set head = RUN + ".t" + cycle_HH + "z." %}

# Split IAUFHRS into a list; typically either "3,6,9" or 6 (integer)
{% if IAUFHRS is string %}
# "3,6,9"
{% set iaufhrs = [] %}
{% for iaufhr in IAUFHRS.split(",") %}
{% do iaufhrs.append(iaufhr | int) %}
{% endfor %}
{% else %}
# 6 (integer)
{% set iaufhrs = [IAUFHRS] %}
{% endif %}

# Repeat for IAUFHRS_ENKF
{% if IAUFHRS_ENKF is string %}
{% set iaufhrs_enkf = [] %}
{% for iaufhr in IAUFHRS_ENKF.split(",") %}
{% do iaufhrs_enkf.append(iaufhr | int) %}
{% endfor %}
{% else %}
{% set iaufhrs_enkf = [IAUFHRS_ENKF] %}
{% endif %}

# Determine which data to archive
datasets:
{% if ENSGRP == 0 %}
Expand Down
10 changes: 0 additions & 10 deletions parm/archive/master_gdas.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,6 @@
{% set cycle_YMDH = current_cycle | to_YMDH %}
{% set head = "gdas.t" + cycle_HH + "z." %}

# Split IAUFHRS into a list; typically either "3,6,9" or 6 (integer)
{% if IAUFHRS is string %}
{% set iaufhrs = [] %}
{% for iaufhr in IAUFHRS.split(",") %}
{% do iaufhrs.append(iaufhr | int) %}
{% endfor %}
{% else %}
{% set iaufhrs = [IAUFHRS] %}
{% endif %}

datasets:
# Always archive atmosphere forecast/analysis data
{% filter indent(width=4) %}
Expand Down
12 changes: 0 additions & 12 deletions parm/archive/master_gfs.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,6 @@
{% set cycle_YMD = current_cycle | to_YMD %}
{% set cycle_YMDH = current_cycle | to_YMDH %}

# Split IAUFHRS into a list; typically either "3,6,9" or 6 (integer)
{% if IAUFHRS is string %}
# "3,6,9"
{% set iaufhrs = [] %}
{% for iaufhr in IAUFHRS.split(",") %}
{% do iaufhrs.append(iaufhr | int) %}
{% endfor %}
{% else %}
# 6 (integer)
{% set iaufhrs = [IAUFHRS] %}
{% endif %}

# Determine which data to archive
datasets:
# Always archive atmosphere forecast/analysis data
Expand Down
7 changes: 5 additions & 2 deletions parm/config/gefs/config.base
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,11 @@ export gfs_cyc=@gfs_cyc@ # 0: no GFS cycle, 1: 00Z only, 2: 00Z and 12Z only, 4:

# GFS output and frequency
export FHMIN_GFS=0
export FHMIN=${FHMIN_GFS}
export FHMAX_GFS=@FHMAX_GFS@
export FHMAX_GFS="@FHMAX_GFS@"
# Intermediate times to stop forecast when running in segments
breakpnts="@FCST_BREAKPOINTS@"
export FCST_SEGMENTS="${FHMIN_GFS},${breakpnts:+${breakpnts},}${FHMAX_GFS}"

export FHOUT_GFS=6
export FHMAX_HF_GFS=@FHMAX_HF_GFS@
export FHOUT_HF_GFS=1
Expand Down
13 changes: 9 additions & 4 deletions parm/config/gefs/config.fcst
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,19 @@ string="--fv3 ${CASE}"
# shellcheck disable=SC2086
source "${EXPDIR}/config.ufs" ${string}

# shellcheck disable=SC2153
export FHMAX=${FHMAX_GFS}
# Convert comma-separated string into bash array
IFS=', ' read -ra segments <<< "${FCST_SEGMENTS}"
# Determine MIN and MAX based on the forecast segment
export FHMIN=${segments[${FCST_SEGMENT}]}
export FHMAX=${segments[${FCST_SEGMENT}+1]}
# Cap other FHMAX variables at FHMAX for the segment
export FHMAX_HF=$(( FHMAX_HF_GFS > FHMAX ? FHMAX : FHMAX_HF_GFS ))
export FHMAX_WAV=$(( FHMAX_WAV > FHMAX ? FHMAX : FHMAX_WAV ))
# shellcheck disable=SC2153
export FHOUT=${FHOUT_GFS}
export FHMAX_HF=${FHMAX_HF_GFS}
export FHOUT_HF=${FHOUT_HF_GFS}
export FHOUT_OCN=${FHOUT_OCN_GFS}
export FHOUT_ICE=${FHOUT_ICE_GFS}
export FHOUT_ICE=${FHOUT_ICE_GFS}

# Get task specific resources
source "${EXPDIR}/config.resources" fcst
Expand Down
1 change: 1 addition & 0 deletions parm/config/gefs/yaml/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ base:
DO_EXTRACTVARS: "NO"
FHMAX_GFS: 120
FHMAX_HF_GFS: 0
FCST_BREAKPOINTS: "48"
REPLAY_ICS: "NO"
USE_OCN_PERTURB_FILES: "false"
2 changes: 1 addition & 1 deletion parm/config/gfs/config.aeroanl
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ if [[ "${DOIAU}" == "YES" ]]; then
export aero_bkg_times="3,6,9"
export JEDIYAML="${PARMgfs}/gdas/aero/variational/3dvar_fgat_gfs_aero.yaml.j2"
else
export aero_bkg_times="6"
export aero_bkg_times="6," # Trailing comma is necessary so this is treated as a list
export JEDIYAML="${PARMgfs}/gdas/aero/variational/3dvar_gfs_aero.yaml.j2"
fi

Expand Down
13 changes: 8 additions & 5 deletions parm/config/gfs/config.base
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,10 @@ export gfs_cyc=@gfs_cyc@ # 0: no GFS cycle, 1: 00Z only, 2: 00Z and 12Z only, 4:

# GFS output and frequency
export FHMIN_GFS=0
export FHMAX_GFS=@FHMAX_GFS@
export FHMAX_GFS="@FHMAX_GFS@"
# Intermediate times to stop forecast when running in segments
breakpnts="@FCST_BREAKPOINTS@"
export FCST_SEGMENTS="${FHMIN_GFS},${breakpnts:+${breakpnts},}${FHMAX_GFS}"
export FHOUT_GFS=3 # 3 for ops
export FHMAX_HF_GFS=@FHMAX_HF_GFS@
export FHOUT_HF_GFS=1
Expand Down Expand Up @@ -384,10 +387,10 @@ fi

# if 3DVAR and IAU
if [[ ${DOHYBVAR} == "NO" && ${DOIAU} == "YES" ]]; then
export IAUFHRS="6"
export IAUFHRS="6,"
export IAU_FHROT="3"
export IAU_FILTER_INCREMENTS=".true."
export IAUFHRS_ENKF="6"
export IAUFHRS_ENKF="6,"
fi

# Generate post-processing ensemble spread files
Expand All @@ -397,10 +400,10 @@ export ENKF_SPREAD="YES"
if [[ "${MODE}" = "cycled" && "${SDATE}" = "${PDY}${cyc}" && ${EXP_WARM_START} = ".false." ]] || [[ "${DOIAU}" = "NO" ]] || [[ "${MODE}" = "forecast-only" && ${EXP_WARM_START} = ".false." ]] ; then
export IAU_OFFSET=0
export IAU_FHROT=0
export IAUFHRS="6"
export IAUFHRS="6,"
fi

if [[ "${DOIAU_ENKF}" = "NO" ]]; then export IAUFHRS_ENKF="6"; fi
if [[ "${DOIAU_ENKF}" = "NO" ]]; then export IAUFHRS_ENKF="6,"; fi

# Determine restart intervals
# For IAU, write restarts at beginning of window also
Expand Down
11 changes: 8 additions & 3 deletions parm/config/gfs/config.fcst
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,16 @@ source "${EXPDIR}/config.ufs" ${string}
# Forecast length for GFS forecast
case ${RUN} in
*gfs)
# shellcheck disable=SC2153
export FHMAX=${FHMAX_GFS}
# Convert comma-separated string into bash array
IFS=', ' read -ra segments <<< "${FCST_SEGMENTS}"
# Determine MIN and MAX based on the forecast segment
export FHMIN=${segments[${FCST_SEGMENT}]}
export FHMAX=${segments[${FCST_SEGMENT}+1]}
# Cap other FHMAX variables at FHMAX for the segment
export FHMAX_HF=$(( FHMAX_HF_GFS > FHMAX ? FHMAX : FHMAX_HF_GFS ))
export FHMAX_WAV=$(( FHMAX_WAV > FHMAX ? FHMAX : FHMAX_WAV ))
# shellcheck disable=SC2153
export FHOUT=${FHOUT_GFS}
export FHMAX_HF=${FHMAX_HF_GFS}
export FHOUT_HF=${FHOUT_HF_GFS}
export FHOUT_OCN=${FHOUT_OCN_GFS}
export FHOUT_ICE=${FHOUT_ICE_GFS}
Expand Down
1 change: 1 addition & 0 deletions parm/config/gfs/yaml/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ base:
DO_METP: "YES"
FHMAX_GFS: 120
FHMAX_HF_GFS: 0
FCST_BREAKPOINTS: ""
DO_VRFY_OCEANDA: "NO"
GSI_SOILANAL: "NO"
EUPD_CYC: "gdas"
Expand Down
2 changes: 1 addition & 1 deletion sorc/wxflow
3 changes: 2 additions & 1 deletion ush/calcanl_gfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import gsi_utils
from collections import OrderedDict
import datetime
from wxflow import cast_as_dtype

python2fortran_bool = {True: '.true.', False: '.false.'}

Expand Down Expand Up @@ -358,7 +359,7 @@ def calcanl_gfs(DoIAU, l4DEnsVar, Write4Danl, ComOut, APrefix,
ExecAnl = os.getenv('CALCANLEXEC', './calc_analysis.x')
ExecChgresInc = os.getenv('CHGRESINCEXEC', './interp_inc.x')
NEMSGet = os.getenv('NEMSIOGET', 'nemsio_get')
IAUHrs = list(map(int, os.getenv('IAUFHRS', '6').split(',')))
IAUHrs = cast_as_dtype(os.getenv('IAUFHRS', '6,'))
Run = os.getenv('RUN', 'gdas')
JEDI = gsi_utils.isTrue(os.getenv('DO_JEDIATMVAR', 'YES'))

Expand Down
2 changes: 1 addition & 1 deletion ush/forecast_predet.sh
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ common_predet(){

CDATE=${CDATE:-"${PDY}${cyc}"}
ENSMEM=${ENSMEM:-000}
MEMBER=$(( 10#${ENSMEM:-"-1"} )) # -1: control, 0: ensemble mean, >0: ensemble member $MEMBER

# Define significant cycles
half_window=$(( assim_freq / 2 ))
Expand Down Expand Up @@ -154,7 +155,6 @@ FV3_predet(){
FV3_OUTPUT_FH="${FV3_OUTPUT_FH} $(seq -s ' ' "${fhr}" "${FHOUT}" "${FHMAX}")"

# Other options
MEMBER=$(( 10#${ENSMEM:-"-1"} )) # -1: control, 0: ensemble mean, >0: ensemble member $MEMBER
PREFIX_ATMINC=${PREFIX_ATMINC:-""} # allow ensemble to use recentered increment

# IAU options
Expand Down
2 changes: 1 addition & 1 deletion ush/python/pygfs/task/aero_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def __init__(self, config):
'npz_anl': self.task_config['LEVS'] - 1,
'AERO_WINDOW_BEGIN': _window_begin,
'AERO_WINDOW_LENGTH': f"PT{self.task_config['assim_freq']}H",
'aero_bkg_fhr': map(int, str(self.task_config['aero_bkg_times']).split(',')),
'aero_bkg_fhr': self.task_config['aero_bkg_times'],
'OPREFIX': f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.",
'APREFIX': f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.",
'GPREFIX': f"gdas.t{self.task_config.previous_cycle.hour:02d}z.",
Expand Down
2 changes: 1 addition & 1 deletion ush/python/pygfs/task/aero_prepobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def __init__(self, config: Dict[str, Any]) -> None:
{
'window_begin': _window_begin,
'window_end': _window_end,
'sensors': str(self.task_config['SENSORS']).split(','),
'sensors': self.task_config['SENSORS'],
'data_dir': self.task_config['VIIRS_DATA_DIR'],
'input_files': '',
'OPREFIX': f"{self.task_config.RUN}.t{self.task_config.cyc:02d}z.",
Expand Down
27 changes: 27 additions & 0 deletions workflow/applications/applications.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ def __init__(self, conf: Configuration) -> None:
self.do_hpssarch = _base.get('HPSSARCH', False)

self.nens = _base.get('NMEM_ENS', 0)
self.fcst_segments = _base.get('FCST_SEGMENTS', None)

if not AppConfig.is_monotonic(self.fcst_segments):
raise ValueError(f'Forecast segments do not increase monotonically: {",".join(self.fcst_segments)}')

self.wave_runs = None
if self.do_wave:
Expand Down Expand Up @@ -208,3 +212,26 @@ def get_gfs_interval(gfs_cyc: int) -> timedelta:
return to_timedelta(gfs_internal_map[str(gfs_cyc)])
except KeyError:
raise KeyError(f'Invalid gfs_cyc = {gfs_cyc}')

@staticmethod
def is_monotonic(test_list: List, check_decreasing: bool = False) -> bool:
"""
Determine if an array is monotonically increasing or decreasing
TODO: Move this into wxflow somewhere
Inputs
test_list: List
A list of comparable values to check
check_decreasing: bool [default: False]
Check whether list is monotonically decreasing
Returns
bool: Whether the list is monotonically increasing (if check_decreasing
if False) or decreasing (if check_decreasing is True)
"""
if check_decreasing:
return all(x > y for x, y in zip(test_list, test_list[1:]))
else:
return all(x < y for x, y in zip(test_list, test_list[1:]))
Loading

0 comments on commit 1d53953

Please sign in to comment.