diff --git a/parm/config/gfs/config.resources b/parm/config/gfs/config.resources index 851acb2e0d..b50e1c5fbb 100644 --- a/parm/config/gfs/config.resources +++ b/parm/config/gfs/config.resources @@ -911,7 +911,7 @@ case ${step} in ;; "verfozn") - walltime="00:05:00" + walltime="00:10:00" ntasks=1 threads_per_task=1 tasks_per_node=1 diff --git a/workflow/applications/applications.py b/workflow/applications/applications.py index d6d7453c3c..a694129e38 100644 --- a/workflow/applications/applications.py +++ b/workflow/applications/applications.py @@ -3,7 +3,6 @@ from typing import Dict, List, Any from datetime import timedelta from hosts import Host -from pathlib import Path from wxflow import Configuration, to_timedelta from abc import ABC, ABCMeta, abstractmethod @@ -32,57 +31,50 @@ def __init__(self, conf: Configuration) -> None: self.scheduler = Host().scheduler - # Save the configuration so we can source the config files when - # determining task resources - self.conf = conf + base = conf.parse_config('config.base') - _base = self.conf.parse_config('config.base') - # Define here so the child __init__ functions can use it; will - # be overwritten later during _init_finalize(). - self._base = _base - - self.mode = _base['MODE'] + self.mode = base['MODE'] if self.mode not in self.VALID_MODES: - raise NotImplementedError(f'{self.mode} is not a valid application mode.\n' + - 'Valid application modes are:\n' + - f'{", ".join(self.VALID_MODES)}') - - self.net = _base['NET'] - self.model_app = _base.get('APP', 'ATM') - self.do_atm = _base.get('DO_ATM', True) - self.do_wave = _base.get('DO_WAVE', False) - self.do_wave_bnd = _base.get('DOBNDPNT_WAVE', False) - self.do_ocean = _base.get('DO_OCN', False) - self.do_ice = _base.get('DO_ICE', False) - self.do_aero = _base.get('DO_AERO', False) - self.do_prep_obs_aero = _base.get('DO_PREP_OBS_AERO', False) - self.do_bufrsnd = _base.get('DO_BUFRSND', False) - self.do_gempak = _base.get('DO_GEMPAK', False) - self.do_awips = _base.get('DO_AWIPS', False) - self.do_verfozn = _base.get('DO_VERFOZN', True) - self.do_verfrad = _base.get('DO_VERFRAD', True) - self.do_vminmon = _base.get('DO_VMINMON', True) - self.do_tracker = _base.get('DO_TRACKER', True) - self.do_genesis = _base.get('DO_GENESIS', True) - self.do_genesis_fsu = _base.get('DO_GENESIS_FSU', False) - self.do_metp = _base.get('DO_METP', False) - self.do_upp = not _base.get('WRITE_DOPOST', True) - self.do_goes = _base.get('DO_GOES', False) - self.do_mos = _base.get('DO_MOS', False) - self.do_extractvars = _base.get('DO_EXTRACTVARS', False) - - self.do_hpssarch = _base.get('HPSSARCH', False) - - self.nens = _base.get('NMEM_ENS', 0) - self.fcst_segments = _base.get('FCST_SEGMENTS', None) + raise NotImplementedError(f'{self.mode} is not a valid application mode.\n' + f'Valid application modes are:\n' + f'{", ".join(self.VALID_MODES)}\n') + + self.net = base['NET'] + self.model_app = base.get('APP', 'ATM') + self.do_atm = base.get('DO_ATM', True) + self.do_wave = base.get('DO_WAVE', False) + self.do_wave_bnd = base.get('DOBNDPNT_WAVE', False) + self.do_ocean = base.get('DO_OCN', False) + self.do_ice = base.get('DO_ICE', False) + self.do_aero = base.get('DO_AERO', False) + self.do_prep_obs_aero = base.get('DO_PREP_OBS_AERO', False) + self.do_bufrsnd = base.get('DO_BUFRSND', False) + self.do_gempak = base.get('DO_GEMPAK', False) + self.do_awips = base.get('DO_AWIPS', False) + self.do_verfozn = base.get('DO_VERFOZN', True) + self.do_verfrad = base.get('DO_VERFRAD', True) + self.do_vminmon = base.get('DO_VMINMON', True) + self.do_tracker = base.get('DO_TRACKER', True) + self.do_genesis = base.get('DO_GENESIS', True) + self.do_genesis_fsu = base.get('DO_GENESIS_FSU', False) + self.do_metp = base.get('DO_METP', False) + self.do_upp = not base.get('WRITE_DOPOST', True) + self.do_goes = base.get('DO_GOES', False) + self.do_mos = base.get('DO_MOS', False) + self.do_extractvars = base.get('DO_EXTRACTVARS', False) + + self.do_hpssarch = base.get('HPSSARCH', False) + + self.nens = base.get('NMEM_ENS', 0) + self.fcst_segments = base.get('FCST_SEGMENTS', None) if not AppConfig.is_monotonic(self.fcst_segments): raise ValueError(f'Forecast segments do not increase monotonically: {",".join(self.fcst_segments)}') self.wave_runs = None if self.do_wave: - wave_run = _base.get('WAVE_RUN', 'BOTH').lower() + wave_run = base.get('WAVE_RUN', 'BOTH').lower() if wave_run in ['both']: self.wave_runs = ['gfs', 'gdas'] elif wave_run in ['gfs', 'gdas']: @@ -91,45 +83,52 @@ def __init__(self, conf: Configuration) -> None: self.aero_anl_runs = None self.aero_fcst_runs = None if self.do_aero: - aero_anl_run = _base.get('AERO_ANL_RUN', 'BOTH').lower() + aero_anl_run = base.get('AERO_ANL_RUN', 'BOTH').lower() if aero_anl_run in ['both']: self.aero_anl_runs = ['gfs', 'gdas'] elif aero_anl_run in ['gfs', 'gdas']: self.aero_anl_runs = [aero_anl_run] - aero_fcst_run = _base.get('AERO_FCST_RUN', None).lower() + aero_fcst_run = base.get('AERO_FCST_RUN', None).lower() if aero_fcst_run in ['both']: self.aero_fcst_runs = ['gfs', 'gdas'] elif aero_fcst_run in ['gfs', 'gdas']: self.aero_fcst_runs = [aero_fcst_run] - def _init_finalize(self, *args): + def _init_finalize(self, conf: Configuration): print("Finalizing initialize") # Get a list of all possible config_files that would be part of the application self.configs_names = self._get_app_configs() - # Source the config_files for the jobs in the application - self.configs = self.source_configs() + # Source the config files for the jobs in the application without specifying a RUN + self.configs = {'_no_run': self._source_configs(conf)} - # Update the base config dictionary base on application - self.configs['base'] = self.update_base(self.configs['base']) + # Update the base config dictionary based on application + self.configs['_no_run']['base'] = self._update_base(self.configs['_no_run']['base']) # Save base in the internal state since it is often needed - self._base = self.configs['base'] + base = self.configs['_no_run']['base'] # Get more configuration options into the class attributes - self.gfs_cyc = self._base.get('gfs_cyc') + self.gfs_cyc = base.get('gfs_cyc') - # Finally get task names for the application + # Get task names for the application self.task_names = self.get_task_names() + # Finally, source the configuration files for each valid `RUN` + for run in self.task_names.keys(): + self.configs[run] = self._source_configs(conf, run=run, log=False) + + # Update the base config dictionary based on application and RUN + self.configs[run]['base'] = self._update_base(self.configs[run]['base']) + @abstractmethod def _get_app_configs(self): pass @staticmethod @abstractmethod - def update_base(base_in: Dict[str, Any]) -> Dict[str, Any]: + def _update_base(base_in: Dict[str, Any]) -> Dict[str, Any]: ''' Make final updates to base and return an updated copy @@ -146,7 +145,7 @@ def update_base(base_in: Dict[str, Any]) -> Dict[str, Any]: ''' pass - def source_configs(self, run: str = "gfs", log: bool = True) -> Dict[str, Any]: + def _source_configs(self, conf: Configuration, run: str = "gfs", log: bool = True) -> Dict[str, Any]: """ Given the configuration object used to initialize this application, source the configurations for each config and return a dictionary @@ -156,7 +155,7 @@ def source_configs(self, run: str = "gfs", log: bool = True) -> Dict[str, Any]: configs = dict() # Return config.base as well - configs['base'] = self.conf.parse_config('config.base') + configs['base'] = conf.parse_config('config.base', RUN=run) # Source the list of all config_files involved in the application for config in self.configs_names: @@ -180,12 +179,12 @@ def source_configs(self, run: str = "gfs", log: bool = True) -> Dict[str, Any]: files += [f'config.{config}'] print(f'sourcing config.{config}') if log else 0 - configs[config] = self.conf.parse_config(files, RUN=run) + configs[config] = conf.parse_config(files, RUN=run) return configs @abstractmethod - def get_task_names(self) -> Dict[str, List[str]]: + def get_task_names(self, run="_no_run") -> Dict[str, List[str]]: ''' Create a list of task names for each RUN valid for the configuation. diff --git a/workflow/applications/gefs.py b/workflow/applications/gefs.py index c1e001c171..1db3c51287 100644 --- a/workflow/applications/gefs.py +++ b/workflow/applications/gefs.py @@ -10,6 +10,9 @@ class GEFSAppConfig(AppConfig): def __init__(self, conf: Configuration): super().__init__(conf) + base = conf.parse_config('config.base') + self.run = base.get('RUN', 'gefs') + def _get_app_configs(self): """ Returns the config_files that are involved in gefs @@ -36,7 +39,7 @@ def _get_app_configs(self): return configs @staticmethod - def update_base(base_in): + def _update_base(base_in): base_out = base_in.copy() base_out['INTERVAL_GFS'] = AppConfig.get_gfs_interval(base_in['gfs_cyc']) @@ -81,4 +84,4 @@ def get_task_names(self): tasks += ['arch'] - return {f"{self._base['RUN']}": tasks} + return {f"{self.run}": tasks} diff --git a/workflow/applications/gfs_cycled.py b/workflow/applications/gfs_cycled.py index b8aa2dba3a..4bb473f454 100644 --- a/workflow/applications/gfs_cycled.py +++ b/workflow/applications/gfs_cycled.py @@ -11,20 +11,21 @@ class GFSCycledAppConfig(AppConfig): def __init__(self, conf: Configuration): super().__init__(conf) - self.do_hybvar = self._base.get('DOHYBVAR', False) - self.do_fit2obs = self._base.get('DO_FIT2OBS', True) - self.do_jediatmvar = self._base.get('DO_JEDIATMVAR', False) - self.do_jediatmens = self._base.get('DO_JEDIATMENS', False) - self.do_jediocnvar = self._base.get('DO_JEDIOCNVAR', False) - self.do_jedisnowda = self._base.get('DO_JEDISNOWDA', False) - self.do_mergensst = self._base.get('DO_MERGENSST', False) - self.do_vrfy_oceanda = self._base.get('DO_VRFY_OCEANDA', False) + base = conf.parse_config('config.base') + self.do_hybvar = base.get('DOHYBVAR', False) + self.do_fit2obs = base.get('DO_FIT2OBS', True) + self.do_jediatmvar = base.get('DO_JEDIATMVAR', False) + self.do_jediatmens = base.get('DO_JEDIATMENS', False) + self.do_jediocnvar = base.get('DO_JEDIOCNVAR', False) + self.do_jedisnowda = base.get('DO_JEDISNOWDA', False) + self.do_mergensst = base.get('DO_MERGENSST', False) + self.do_vrfy_oceanda = base.get('DO_VRFY_OCEANDA', False) self.lobsdiag_forenkf = False self.eupd_runs = None if self.do_hybvar: - self.lobsdiag_forenkf = self._base.get('lobsdiag_forenkf', False) - eupd_run = self._base.get('EUPD_CYC', 'gdas').lower() + self.lobsdiag_forenkf = base.get('lobsdiag_forenkf', False) + eupd_run = base.get('EUPD_CYC', 'gdas').lower() if eupd_run in ['both']: self.eupd_runs = ['gfs', 'gdas'] elif eupd_run in ['gfs', 'gdas']: @@ -125,7 +126,7 @@ def _get_app_configs(self): return configs @staticmethod - def update_base(base_in): + def _update_base(base_in): return GFSCycledAppConfig.get_gfs_cyc_dates(base_in) diff --git a/workflow/applications/gfs_forecast_only.py b/workflow/applications/gfs_forecast_only.py index 680588e4ca..93551ac0cc 100644 --- a/workflow/applications/gfs_forecast_only.py +++ b/workflow/applications/gfs_forecast_only.py @@ -10,6 +10,11 @@ class GFSForecastOnlyAppConfig(AppConfig): def __init__(self, conf: Configuration): super().__init__(conf) + base = conf.parse_config('config.base') + self.aero_fcst_run = base.get('AERO_FCST_RUN', 'BOTH').lower() + self.run = base.get('RUN', 'gfs') + self.exp_warm_start = base.get('EXP_WARM_START', False) + def _get_app_configs(self): """ Returns the config_files that are involved in the forecast-only app @@ -25,7 +30,7 @@ def _get_app_configs(self): configs += ['atmos_products'] if self.do_aero: - if not self._base['EXP_WARM_START']: + if not self.exp_warm_start: configs += ['aerosol_init'] if self.do_tracker: @@ -70,11 +75,10 @@ def _get_app_configs(self): return configs @staticmethod - def update_base(base_in): + def _update_base(base_in): base_out = base_in.copy() base_out['INTERVAL_GFS'] = AppConfig.get_gfs_interval(base_in['gfs_cyc']) - base_out['RUN'] = 'gfs' return base_out @@ -88,9 +92,9 @@ def get_task_names(self): tasks = ['stage_ic'] if self.do_aero: - aero_fcst_run = self._base.get('AERO_FCST_RUN', 'BOTH').lower() - if self._base['RUN'] in aero_fcst_run or aero_fcst_run == "both": - if not self._base['EXP_WARM_START']: + aero_fcst_run = self.aero_fcst_run + if self.run in aero_fcst_run or aero_fcst_run == "both": + if not self.exp_warm_start: tasks += ['aerosol_init'] if self.do_wave: @@ -153,4 +157,4 @@ def get_task_names(self): tasks += ['arch', 'cleanup'] # arch and cleanup **must** be the last tasks - return {f"{self._base['RUN']}": tasks} + return {f"{self.run}": tasks} diff --git a/workflow/rocoto/tasks.py b/workflow/rocoto/tasks.py index d8d5edb5e6..df2b0467db 100644 --- a/workflow/rocoto/tasks.py +++ b/workflow/rocoto/tasks.py @@ -1,6 +1,5 @@ #!/usr/bin/env python3 -import copy import numpy as np from applications.applications import AppConfig import rocoto.rocoto as rocoto @@ -39,15 +38,16 @@ class Tasks: def __init__(self, app_config: AppConfig, run: str) -> None: - self.app_config = copy.deepcopy(app_config) + self.app_config = app_config self.run = run - # Re-source the configs with RUN specified - print(f"Source configs with RUN={run}") - self._configs = self.app_config.source_configs(run=run, log=False) + + # Get the configs for the specified RUN + self._configs = self.app_config.configs[run] # Update the base config for the application - self._configs['base'] = self.app_config.update_base(self._configs['base']) - # Save dict_configs and base in the internal state (never know where it may be needed) + self._configs['base'] = self.app_config._update_base(self._configs['base']) + + # Save base in the internal state (never know where it may be needed) self._base = self._configs['base'] self.HOMEgfs = self._base['HOMEgfs'] @@ -134,7 +134,6 @@ def _template_to_rocoto_cycstring(self, template: str, subs_dict: dict = {}) -> def _get_forecast_hours(run, config, component='atmos') -> List[str]: # Make a local copy of the config to avoid modifying the original local_config = config.copy() - # Ocean/Ice components do not have a HF output option like the atmosphere if component in ['ocean', 'ice']: local_config['FHMAX_HF_GFS'] = 0 diff --git a/workflow/rocoto/workflow_xml.py b/workflow/rocoto/workflow_xml.py index d9ca4fb961..3ad7c4bd91 100644 --- a/workflow/rocoto/workflow_xml.py +++ b/workflow/rocoto/workflow_xml.py @@ -18,7 +18,8 @@ def __init__(self, app_config: AppConfig, rocoto_config: Dict) -> None: self._app_config = app_config self.rocoto_config = rocoto_config - self._base = self._app_config.configs['base'] + # Use the generic config.base (without RUN specified) + self._base = self._app_config.configs['_no_run']['base'] self.preamble = self._get_preamble() self.definitions = self._get_definitions()