From c20e8974fc45ce443b7acb638c002b3ad2902e88 Mon Sep 17 00:00:00 2001 From: RyanSpies-NOAA Date: Fri, 16 Aug 2024 14:06:33 -0500 Subject: [PATCH] v4.5.5.0 Update SRC optimization w/ ras2fim v2 inputs (#1247) --- docs/CHANGELOG.md | 12 ++++ fim_post_processing.sh | 3 +- src/bash_variables.env | 6 +- src/run_unit_wb.sh | 20 ++++++- src/src_adjust_ras2fim_rating.py | 98 +++++++++++++++++++++----------- src/usgs_gage_unit_setup.py | 39 +++++++------ src/utils/shared_functions.py | 24 ++++++++ 7 files changed, 144 insertions(+), 58 deletions(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 537a7d2ee..e385208b5 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -1,6 +1,18 @@ All notable changes to this project will be documented in this file. We follow the [Semantic Versioning 2.0.0](http://semver.org/) format. +## v4.5.5.0 - 2024-08-16 - [PR#1247](https://github.com/NOAA-OWP/inundation-mapping/pull/1247) + +Updated the gauge crosswalk and SRC adjustment routine to use the ras2fim v2 files. The v2 ras2fim file structure was changed to organize the data by huc8 - one gpkg and csv per huc8. Addresses #1091 + +### Changes +- `fim_post_processing.sh`: added new input variables for running the `src_adjust_ras2fim_rating.py` +- `src/bash_variables.env`: renamed and reassigned the ras2fim input variables: `ras2fim_input_dir`, `ras_rating_curve_csv_filename`, `ras_rating_curve_gpkg_filename` +- `src/run_unit_wb.sh`: Added logic to check if huc in process has ras2fim input data to process. If yes - copy the ras2fim cross section point gpkg to the huc run directory. +- `src/src_adjust_ras2fim_rating.py`: Updated code logic to use the huc-specific input files containing the ras2fim rating curve data (previous ras2fim input file contained all hucs in one csv) +- `src/utils/shared_functions.py`: Added function to find huc subdirectories with the same name btw two parent folders + +

## v4.5.4.4 - 2024-08-02 - [PR#1238](https://github.com/NOAA-OWP/inundation-mapping/pull/1238) diff --git a/fim_post_processing.sh b/fim_post_processing.sh index 6a58c075c..7dd2a7608 100755 --- a/fim_post_processing.sh +++ b/fim_post_processing.sh @@ -217,7 +217,8 @@ if [ "$src_adjust_ras2fim" = "True" ] && [ "$src_subdiv_toggle" = "True" ] && [ # Run SRC Optimization routine using ras2fim rating curve data (WSE and flow @ NWM recur flow values) python3 $srcDir/src_adjust_ras2fim_rating.py \ -run_dir $outputDestDir \ - -ras_rc $ras_rating_curve_csv \ + -ras_input $ras2fim_input_dir \ + -ras_rc $ras_rating_curve_csv_filename \ -nwm_recur $nwm_recur_file \ -j $jobLimit Tcount diff --git a/src/bash_variables.env b/src/bash_variables.env index 637b24eac..b1318f4e5 100644 --- a/src/bash_variables.env +++ b/src/bash_variables.env @@ -47,9 +47,9 @@ export nwm_recur_file=${inputsDir}/rating_curve/nwm_recur_flo export usgs_rating_curve_csv=${inputsDir}/usgs_gages/usgs_rating_curves.csv # input file locations for ras2fim locations and rating curve data -export ras_rating_curve_csv=${inputsDir}/rating_curve/ras2fim_exports/reformat_ras_rating_curve_table_rel_101.csv -export ras_rating_curve_points_gpkg=${inputsDir}/rating_curve/ras2fim_exports/reformat_ras_rating_curve_points_rel_101.gpkg -export ras_rating_curve_gpkg_filename=reformat_ras_rating_curve_points_rel_101.gpkg +export ras2fim_input_dir=${inputsDir}/rating_curve/ras2fim_exports/v2_0 +export ras_rating_curve_csv_filename=reformat_ras_rating_curve_table.csv +export ras_rating_curve_gpkg_filename=reformat_ras_rating_curve_points.gpkg export fim_obs_pnt_data=${inputsDir}/rating_curve/water_edge_database/usgs_nws_benchmark_points_cleaned.gpkg diff --git a/src/run_unit_wb.sh b/src/run_unit_wb.sh index 60c7a3812..c8a8766a1 100755 --- a/src/run_unit_wb.sh +++ b/src/run_unit_wb.sh @@ -48,9 +48,25 @@ cp -a $pre_clip_huc_dir/$hucNumber/. $tempHucDataDir # For buffer_stream_branches.py cp $huc_input_DEM_domain $tempHucDataDir # For usgs_gage_unit_setup.py -cp $inputsDir/usgs_gages/usgs_gages.gpkg $tempHucDataDir -cp $ras_rating_curve_points_gpkg $tempHucDataDir cp $inputsDir/ahps_sites/nws_lid.gpkg $tempHucDataDir +cp $inputsDir/usgs_gages/usgs_gages.gpkg $tempHucDataDir +# Check if the $hucNumber directory exists in the ras2fim $inputsDir +if [ -d "$ras2fim_input_dir/$hucNumber" ]; then + ras_rating_gpkg="$ras2fim_input_dir/$hucNumber/$ras_rating_curve_gpkg_filename" + ras_rating_csv="$ras2fim_input_dir/$hucNumber/$ras_rating_curve_csv_filename" + if [ -f "$ras_rating_gpkg" ]; then + cp "$ras_rating_gpkg" $tempHucDataDir + echo "Copied $ras_rating_gpkg to $tempHucDataDir" + else + echo "File $ras_rating_gpkg does not exist. Skipping copy." + fi + if [ -f "$ras_rating_csv" ]; then + cp "$ras_rating_csv" $tempHucDataDir + echo "Copied $ras_rating_csv to $tempHucDataDir" + else + echo "File $ras_rating_csv does not exist. Skipping copy." + fi +fi ## DERIVE LEVELPATH ## echo -e $startDiv"Generating Level Paths for $hucNumber" diff --git a/src/src_adjust_ras2fim_rating.py b/src/src_adjust_ras2fim_rating.py index 7f1c20d8a..ec35c8618 100644 --- a/src/src_adjust_ras2fim_rating.py +++ b/src/src_adjust_ras2fim_rating.py @@ -13,7 +13,7 @@ import pandas as pd from src_roughness_optimization import update_rating_curve -from utils.shared_functions import check_file_age, concat_huc_csv +from utils.shared_functions import check_file_age, concat_huc_csv, find_matching_subdirectories ''' @@ -47,23 +47,26 @@ ''' -def create_ras2fim_rating_database(ras_rc_filepath, ras_elev_df, nwm_recurr_filepath, log_dir): +def create_ras2fim_rating_database(huc_ras_input_file, ras_elev_df, nwm_recurr_filepath, log_dir): start_time = dt.datetime.now() print('Reading RAS2FIM rating curves from csv...') log_text = 'Processing database for RAS2FIM flow/WSE at NWM flow recur intervals...\n' - col_filter = ["fid_xs", "flow", "wse"] + # Note that we are using flow_cfs not flow_cms (error in raw data) + col_filter = ["fid_xs", "flow_cfs", "wse_m"] ras_rc_df = pd.read_csv( - ras_rc_filepath, dtype={'fid_xs': object}, usecols=col_filter, encoding="unicode_escape" + huc_ras_input_file, dtype={'fid_xs': object}, usecols=col_filter, encoding="unicode_escape" ) # , nrows=30000) ras_rc_df.rename(columns={'fid_xs': 'location_id'}, inplace=True) # ras_rc_df['location_id'] = ras_rc_df['feature_id'].astype(object) run_time = dt.datetime.now() - start_time print(f"Duration (read ras_rc_csv): {str(run_time).split('.')[0]}") - # convert WSE navd88 values to meters - ras_rc_df.rename( - columns={'wse': 'wse_navd88_m', 'flow': 'discharge_cms'}, inplace=True - ) # assume ras2fim elevation data in feet + # rename WSE column + ras_rc_df.rename(columns={'wse_m': 'wse_navd88_m'}, inplace=True) + + # Need to use the flow_cfs because there is an error in the raw flow_cms + ras_rc_df['discharge_cms'] = ras_rc_df['flow_cfs'] * 0.0283168 + ras_rc_df = ras_rc_df.drop(columns=["flow_cfs"]) # read in the aggregate RAS elev table csv start_time = dt.datetime.now() @@ -196,10 +199,10 @@ def create_ras2fim_rating_database(ras_rc_filepath, ras_elev_df, nwm_recurr_file return final_df -def branch_proc_list(ras_df, run_dir, debug_outputs_option, log_file): +def branch_proc_list(ras_df, huc_run_dir, debug_outputs_option, log_file): procs_list = [] # Initialize list for mulitprocessing. - # loop through all unique level paths that have a USGS gage + # loop through all unique level paths that have a ras2fim data points huc_branch_dict = ras_df.groupby('huc')['levpa_id'].apply(set).to_dict() for huc in sorted( @@ -210,7 +213,7 @@ def branch_proc_list(ras_df, run_dir, debug_outputs_option, log_file): # Define paths to branch HAND data. # Define paths to HAND raster, catchments raster, and synthetic rating curve JSON. # Assumes outputs are for HUC8 (not HUC6) - branch_dir = os.path.join(run_dir, huc, 'branches', branch_id) + branch_dir = os.path.join(huc_run_dir, 'branches', branch_id) hand_path = os.path.join(branch_dir, 'rem_zeroed_masked_' + branch_id + '.tif') catchments_path = os.path.join( branch_dir, 'gw_catchments_reaches_filtered_addedAttributes_' + branch_id + '.tif' @@ -302,14 +305,10 @@ def branch_proc_list(ras_df, run_dir, debug_outputs_option, log_file): # ) -def run_prep(run_dir, ras_rc_filepath, nwm_recurr_filepath, debug_outputs_option, job_number): +def run_prep(run_dir, ras_input_dir, ras_rc_filepath, nwm_recurr_filepath, debug_outputs_option, job_number): ## Check input args are valid assert os.path.isdir(run_dir), 'ERROR: could not find the input fim_dir location: ' + str(run_dir) - ## Create an aggregate dataframe with all ras_elev_table.csv entries for hucs in fim_dir - print('Reading RAS2FIM point loc HAND elevation from ras_elev_table csv files...') - csv_name = 'ras_elev_table.csv' # file name to search for ras location data (in the huc/branch dirs) - available_cores = multiprocessing.cpu_count() if job_number > available_cores: job_number = available_cores - 1 @@ -332,25 +331,52 @@ def run_prep(run_dir, ras_rc_filepath, nwm_recurr_filepath, debug_outputs_option log_file.write('START TIME: ' + str(begin_time) + '\n') log_file.write('#########################################################\n\n') - ras_elev_df = concat_huc_csv(run_dir, csv_name) - - if ras_elev_df is None: - warn_err = 'WARNING: ras_elev_df not created - check that ' + csv_name + ' files exist in fim_dir!' - print(warn_err) - log_file.write(warn_err) + hucs_with_data = find_matching_subdirectories(run_dir, ras_input_dir) + if len(hucs_with_data) == 0: + print('ALERT: Did not find any HUCs with ras2fim data to perform adjustments') + log_file.write('ALERT: Did not find any HUCs with ras2fim data to perform adjustments\n') + return + + log_file.write('RAS2FIM data available and will perform SRC adjustments for hucs:\n') + log_file.write(str(hucs_with_data)) + log_file.write('\n#########################################################\n\n') + for huc in hucs_with_data: + huc_run_dir = os.path.join(run_dir, huc) + huc_ras_input_file = os.path.join(huc_run_dir, ras_rc_filepath) + ## Create an aggregate dataframe with all ras_elev_table.csv entries for hucs in fim_dir + print('Reading RAS2FIM point loc HAND elevation from ras_elev_table csv files...') + csv_elev = 'ras_elev_table.csv' # file name to search for ras location data (in the huc/branch dirs) + # ras_elev_df = concat_huc_csv(huc_run_dir, csv_elev) + ras_elev_df = pd.read_csv( + os.path.join(huc_run_dir, 'ras_elev_table.csv'), + dtype={'HUC8': object, 'location_id': object, 'feature_id': int, 'levpa_id': object}, + ) - elif ras_elev_df.empty: - warn_err = 'WARNING: ras_elev_df is empty - check that ' + csv_name + ' files exist in fim_dir!' - print(warn_err) - log_file.write(warn_err) + ## Create an aggregate dataframe with all ras2fim rating curve csv files + # print('Reading RAS2FIM rating curves csv files from the input directory...') + # ras_rating_df = concat_huc_csv(ras_input_dir, ras_rc_filepath) - else: - print('This may take a few minutes...') - log_file.write("starting create RAS2FIM rating db") - ras_df = create_ras2fim_rating_database(ras_rc_filepath, ras_elev_df, nwm_recurr_filepath, log_dir) + if ras_elev_df is None: + warn_err = ( + 'WARNING: ras_elev_df not created - check that ' + csv_elev + ' files exist in fim_dir!' + ) + print(warn_err) + log_file.write(warn_err) + + elif ras_elev_df.empty: + warn_err = 'WARNING: ras_elev_df is empty - check that ' + csv_elev + ' files exist in fim_dir!' + print(warn_err) + log_file.write(warn_err) + + else: + print('This may take a few minutes...') + log_file.write("starting create RAS2FIM rating db") + ras_df = create_ras2fim_rating_database( + huc_ras_input_file, ras_elev_df, nwm_recurr_filepath, log_dir + ) - ## Create huc proc_list for multiprocessing and execute the update_rating_curve function - branch_proc_list(ras_df, run_dir, debug_outputs_option, log_file) + ## Create huc proc_list for multiprocessing and execute the update_rating_curve function + branch_proc_list(ras_df, huc_run_dir, debug_outputs_option, log_file) ## Record run time and close log file log_file.write('#########################################################\n\n') @@ -369,10 +395,13 @@ def run_prep(run_dir, ras_rc_filepath, nwm_recurr_filepath, debug_outputs_option '(calculated WSE/flow).' ) parser.add_argument('-run_dir', '--run-dir', help='Parent directory of FIM run.', required=True) + parser.add_argument( + '-ras_input', '--ras2fim-dir', help='Path to RAS2FIM rating curve input directory', required=True + ) parser.add_argument( '-ras_rc', '--ras2fim-ratings', - help='Path to RAS2FIM rating curve (reach avg) csv file', + help='CSV file name for RAS2FIM rating curve (reach avg)', required=True, ) parser.add_argument( @@ -394,10 +423,11 @@ def run_prep(run_dir, ras_rc_filepath, nwm_recurr_filepath, debug_outputs_option ## Assign variables from arguments. args = vars(parser.parse_args()) run_dir = args['run_dir'] + ras_input_dir = args['ras2fim_dir'] ras_rc_filepath = args['ras2fim_ratings'] nwm_recurr_filepath = args['nwm_recur'] debug_outputs_option = args['extra_outputs'] job_number = int(args['job_number']) ## Prepare/check inputs, create log file, and spin up the proc list - run_prep(run_dir, ras_rc_filepath, nwm_recurr_filepath, debug_outputs_option, job_number) + run_prep(run_dir, ras_input_dir, ras_rc_filepath, nwm_recurr_filepath, debug_outputs_option, job_number) diff --git a/src/usgs_gage_unit_setup.py b/src/usgs_gage_unit_setup.py index 308952b3a..464fee471 100755 --- a/src/usgs_gage_unit_setup.py +++ b/src/usgs_gage_unit_setup.py @@ -35,24 +35,27 @@ def load_gages(self, huc_CRS): # Read RAS2FIM point locations file # !!! Geopandas is not honoring the dtype arg with this read_file below (huc8 being read as int64). # Need the raw data to store the 'huc8' attribute as an object to avoid issues with integers truncating the leading zero from some hucs - ras_locs = gpd.read_file(self.ras_locs_filename, dtype={'huc8': 'object'}) - ras_locs = ras_locs[['feature_id', 'huc8', 'stream_stn', 'fid_xs', 'source', 'geometry']] - ras_locs['location_id'] = ras_locs['fid_xs'] - - # Convert ras locs crs to match usgs gage crs - ras_locs.to_crs(huc_CRS, inplace=True) - ras_locs = ras_locs.rename(columns={'huc8': 'HUC8'}) - - # Convert Multipoint geometry to Point geometry - ras_locs['geometry'] = ras_locs.representative_point() - - # if ras_locs.huc8.dtype == 'int64': - # ras_locs = ras_locs[ras_locs.huc8 == int(self.huc8)] - # ras_locs['HUC8'] = str(self.huc8) - # ras_locs = ras_locs.drop('huc8', axis=1) - # elif ras_locs.huc8.dtype == 'int64': - # ras_locs = ras_locs.rename(columns={'huc8':'HUC8'}) - + if os.path.exists(self.ras_locs_filename): + ras_columns = ['feature_id', 'huc8', 'stream_stn', 'fid_xs', 'source', 'geometry'] + ras_locs = gpd.read_file(self.ras_locs_filename, dtype={'huc8': 'object'}, usecols=ras_columns) + ras_locs = ras_locs[ras_columns] + ras_locs['location_id'] = ras_locs['fid_xs'] + + # Convert ras locs crs to match usgs gage crs + ras_locs.to_crs(huc_CRS, inplace=True) + ras_locs = ras_locs.rename(columns={'huc8': 'HUC8'}) + + # Convert Multipoint geometry to Point geometry + ras_locs['geometry'] = ras_locs.representative_point() + + # if ras_locs.huc8.dtype == 'int64': + # ras_locs = ras_locs[ras_locs.huc8 == int(self.huc8)] + # ras_locs['HUC8'] = str(self.huc8) + # ras_locs = ras_locs.drop('huc8', axis=1) + # elif ras_locs.huc8.dtype == 'int64': + # ras_locs = ras_locs.rename(columns={'huc8':'HUC8'}) + else: + ras_locs = pd.DataFrame(columns=['feature_id', 'stream_stn', 'fid_xs', 'source', 'geometry']) # Concat USGS points and RAS2FIM points gages_locs = pd.concat([usgs_gages, ras_locs], axis=0, ignore_index=True) # gages_locs.to_crs(PREP_CRS, inplace=True) diff --git a/src/utils/shared_functions.py b/src/utils/shared_functions.py index b30f515e3..f9f51a539 100644 --- a/src/utils/shared_functions.py +++ b/src/utils/shared_functions.py @@ -174,6 +174,30 @@ def check_file_age(file): return modified_date +######################################################################## +# Function to find huc subdirectories with the same name btw two parent folders +######################################################################## +def find_matching_subdirectories(parent_folder1, parent_folder2): + # List all subdirectories in the first parent folder + subdirs1 = { + d + for d in os.listdir(parent_folder1) + if os.path.isdir(os.path.join(parent_folder1, d)) and len(d) == 8 + } + + # List all subdirectories in the second parent folder + subdirs2 = { + d + for d in os.listdir(parent_folder2) + if os.path.isdir(os.path.join(parent_folder2, d)) and len(d) == 8 + } + + # Find common subdirectories with exactly 8 characters + matching_subdirs = list(subdirs1 & subdirs2) + + return matching_subdirs + + ######################################################################## # Function to concatenate huc csv files to a single dataframe/csv ########################################################################