diff --git a/src/natcap/invest/managed_raster/ManagedRaster.h b/src/natcap/invest/managed_raster/ManagedRaster.h index 74b8a128e..1b4bedd3d 100644 --- a/src/natcap/invest/managed_raster/ManagedRaster.h +++ b/src/natcap/invest/managed_raster/ManagedRaster.h @@ -471,6 +471,7 @@ class DownslopeNeighborIterator { template::value>* = nullptr> NeighborTuple next() { + flow_dir_sum = 1; if (n_dir == 8) { return NeighborTuple(8, -1, -1, -1); } @@ -512,6 +513,7 @@ class DownslopeNeighborIterator { template::value>* = nullptr> NeighborTuple next_no_skip() { + flow_dir_sum = 1; if (n_dir == 8) { return NeighborTuple(8, -1, -1, -1); } diff --git a/src/natcap/invest/ndr/ndr.py b/src/natcap/invest/ndr/ndr.py index 4958a6252..ccb08202a 100644 --- a/src/natcap/invest/ndr/ndr.py +++ b/src/natcap/invest/ndr/ndr.py @@ -162,6 +162,15 @@ "reached through subsurface flow. This characterizes the " "retention due to biochemical degradation in soils. Required " "if Calculate Nitrogen is selected.") + }, + "algorithm": { + "type": "option_string", + "options": { + "D8": {"description": "D8 flow direction"}, + "MFD": {"description": "Multiple flow direction"} + }, + "about": gettext("Flow direction algorithm to use."), + "name": gettext("flow direction algorithm") } }, "outputs": { @@ -668,35 +677,6 @@ def execute(args): target_path_list=[f_reg['filled_dem_path']], task_name='fill pits') - flow_dir_task = task_graph.add_task( - func=pygeoprocessing.routing.flow_dir_mfd, - args=( - (f_reg['filled_dem_path'], 1), f_reg['flow_direction_path']), - kwargs={'working_dir': intermediate_output_dir}, - dependent_task_list=[fill_pits_task], - target_path_list=[f_reg['flow_direction_path']], - task_name='flow dir') - - flow_accum_task = task_graph.add_task( - func=pygeoprocessing.routing.flow_accumulation_mfd, - args=( - (f_reg['flow_direction_path'], 1), - f_reg['flow_accumulation_path']), - target_path_list=[f_reg['flow_accumulation_path']], - dependent_task_list=[flow_dir_task], - task_name='flow accum') - - stream_extraction_task = task_graph.add_task( - func=pygeoprocessing.routing.extract_streams_mfd, - args=( - (f_reg['flow_accumulation_path'], 1), - (f_reg['flow_direction_path'], 1), - float(args['threshold_flow_accumulation']), - f_reg['stream_path']), - target_path_list=[f_reg['stream_path']], - dependent_task_list=[flow_accum_task], - task_name='stream extraction') - calculate_slope_task = task_graph.add_task( func=pygeoprocessing.calculate_slope, args=((f_reg['filled_dem_path'], 1), f_reg['slope_path']), @@ -714,6 +694,81 @@ def execute(args): dependent_task_list=[calculate_slope_task], task_name='threshold slope') + if args['algorithm'] == 'MFD': + flow_dir_task = task_graph.add_task( + func=pygeoprocessing.routing.flow_dir_mfd, + args=( + (f_reg['filled_dem_path'], 1), f_reg['flow_direction_path']), + kwargs={'working_dir': intermediate_output_dir}, + dependent_task_list=[fill_pits_task], + target_path_list=[f_reg['flow_direction_path']], + task_name='flow dir') + + flow_accum_task = task_graph.add_task( + func=pygeoprocessing.routing.flow_accumulation_mfd, + args=( + (f_reg['flow_direction_path'], 1), + f_reg['flow_accumulation_path']), + target_path_list=[f_reg['flow_accumulation_path']], + dependent_task_list=[flow_dir_task], + task_name='flow accum') + + stream_extraction_task = task_graph.add_task( + func=pygeoprocessing.routing.extract_streams_mfd, + args=( + (f_reg['flow_accumulation_path'], 1), + (f_reg['flow_direction_path'], 1), + float(args['threshold_flow_accumulation']), + f_reg['stream_path']), + target_path_list=[f_reg['stream_path']], + dependent_task_list=[flow_accum_task], + task_name='stream extraction') + s_task = task_graph.add_task( + func=pygeoprocessing.routing.flow_accumulation_mfd, + args=((f_reg['flow_direction_path'], 1), f_reg['s_accumulation_path']), + kwargs={ + 'weight_raster_path_band': (f_reg['thresholded_slope_path'], 1)}, + target_path_list=[f_reg['s_accumulation_path']], + dependent_task_list=[flow_dir_task, threshold_slope_task], + task_name='route s') + else: # D8 + flow_dir_task = task_graph.add_task( + func=pygeoprocessing.routing.flow_dir_d8, + args=( + (f_reg['filled_dem_path'], 1), f_reg['flow_direction_path']), + kwargs={'working_dir': intermediate_output_dir}, + dependent_task_list=[fill_pits_task], + target_path_list=[f_reg['flow_direction_path']], + task_name='flow dir') + + flow_accum_task = task_graph.add_task( + func=pygeoprocessing.routing.flow_accumulation_d8, + args=( + (f_reg['flow_direction_path'], 1), + f_reg['flow_accumulation_path']), + target_path_list=[f_reg['flow_accumulation_path']], + dependent_task_list=[flow_dir_task], + task_name='flow accum') + + stream_extraction_task = task_graph.add_task( + func=pygeoprocessing.routing.extract_streams_d8, + kwargs=dict( + flow_accum_raster_path_band=(f_reg['flow_accumulation_path'], 1), + flow_threshold=float(args['threshold_flow_accumulation']), + target_stream_raster_path=f_reg['stream_path']), + target_path_list=[f_reg['stream_path']], + dependent_task_list=[flow_accum_task], + task_name='stream extraction') + + s_task = task_graph.add_task( + func=pygeoprocessing.routing.flow_accumulation_d8, + args=((f_reg['flow_direction_path'], 1), f_reg['s_accumulation_path']), + kwargs={ + 'weight_raster_path_band': (f_reg['thresholded_slope_path'], 1)}, + target_path_list=[f_reg['s_accumulation_path']], + dependent_task_list=[flow_dir_task, threshold_slope_task], + task_name='route s') + runoff_proxy_index_task = task_graph.add_task( func=_normalize_raster, args=((f_reg['masked_runoff_proxy_path'], 1), @@ -722,15 +777,6 @@ def execute(args): dependent_task_list=[align_raster_task, mask_runoff_proxy_task], task_name='runoff proxy mean') - s_task = task_graph.add_task( - func=pygeoprocessing.routing.flow_accumulation_mfd, - args=((f_reg['flow_direction_path'], 1), f_reg['s_accumulation_path']), - kwargs={ - 'weight_raster_path_band': (f_reg['thresholded_slope_path'], 1)}, - target_path_list=[f_reg['s_accumulation_path']], - dependent_task_list=[flow_dir_task, threshold_slope_task], - task_name='route s') - s_bar_task = task_graph.add_task( func=pygeoprocessing.raster_map, kwargs=dict( @@ -762,27 +808,50 @@ def execute(args): dependent_task_list=[threshold_slope_task], task_name='s inv') - d_dn_task = task_graph.add_task( - func=pygeoprocessing.routing.distance_to_channel_mfd, - args=( - (f_reg['flow_direction_path'], 1), - (f_reg['stream_path'], 1), - f_reg['d_dn_path']), - kwargs={'weight_raster_path_band': ( - f_reg['s_factor_inverse_path'], 1)}, - dependent_task_list=[stream_extraction_task, s_inv_task], - target_path_list=[f_reg['d_dn_path']], - task_name='d dn') - - dist_to_channel_task = task_graph.add_task( - func=pygeoprocessing.routing.distance_to_channel_mfd, - args=( - (f_reg['flow_direction_path'], 1), - (f_reg['stream_path'], 1), - f_reg['dist_to_channel_path']), - dependent_task_list=[stream_extraction_task], - target_path_list=[f_reg['dist_to_channel_path']], - task_name='dist to channel') + if args['algorithm'] == 'MFD': + d_dn_task = task_graph.add_task( + func=pygeoprocessing.routing.distance_to_channel_mfd, + args=( + (f_reg['flow_direction_path'], 1), + (f_reg['stream_path'], 1), + f_reg['d_dn_path']), + kwargs={'weight_raster_path_band': ( + f_reg['s_factor_inverse_path'], 1)}, + dependent_task_list=[stream_extraction_task, s_inv_task], + target_path_list=[f_reg['d_dn_path']], + task_name='d dn') + + dist_to_channel_task = task_graph.add_task( + func=pygeoprocessing.routing.distance_to_channel_mfd, + args=( + (f_reg['flow_direction_path'], 1), + (f_reg['stream_path'], 1), + f_reg['dist_to_channel_path']), + dependent_task_list=[stream_extraction_task], + target_path_list=[f_reg['dist_to_channel_path']], + task_name='dist to channel') + else: # D8 + d_dn_task = task_graph.add_task( + func=pygeoprocessing.routing.distance_to_channel_d8, + args=( + (f_reg['flow_direction_path'], 1), + (f_reg['stream_path'], 1), + f_reg['d_dn_path']), + kwargs={'weight_raster_path_band': ( + f_reg['s_factor_inverse_path'], 1)}, + dependent_task_list=[stream_extraction_task, s_inv_task], + target_path_list=[f_reg['d_dn_path']], + task_name='d dn') + + dist_to_channel_task = task_graph.add_task( + func=pygeoprocessing.routing.distance_to_channel_d8, + args=( + (f_reg['flow_direction_path'], 1), + (f_reg['stream_path'], 1), + f_reg['dist_to_channel_path']), + dependent_task_list=[stream_extraction_task], + target_path_list=[f_reg['dist_to_channel_path']], + task_name='dist to channel') _ = task_graph.add_task( func=sdr._calculate_what_drains_to_stream, @@ -869,7 +938,8 @@ def execute(args): args=( f_reg['flow_direction_path'], f_reg['stream_path'], eff_path, - crit_len_path, effective_retention_path), + crit_len_path, effective_retention_path, + args['algorithm']), target_path_list=[effective_retention_path], dependent_task_list=[ stream_extraction_task, eff_task, crit_len_task], diff --git a/src/natcap/invest/ndr/ndr_core.pyx b/src/natcap/invest/ndr/ndr_core.pyx index 88825cee9..6cdffb2df 100644 --- a/src/natcap/invest/ndr/ndr_core.pyx +++ b/src/natcap/invest/ndr/ndr_core.pyx @@ -22,16 +22,16 @@ LOGGER = logging.getLogger(__name__) cdef int STREAM_EFFECTIVE_RETENTION = 0 def ndr_eff_calculation( - mfd_flow_direction_path, stream_path, retention_eff_lulc_path, - crit_len_path, effective_retention_path): + flow_direction_path, stream_path, retention_eff_lulc_path, + crit_len_path, effective_retention_path, algorithm): """Calculate flow downhill effective_retention to the channel. Args: - mfd_flow_direction_path (string): a path to a raster with - pygeoprocessing.routing MFD flow direction values. + flow_direction_path (string): a path to a raster with + pygeoprocessing.routing flow direction values (MFD or D8). stream_path (string): a path to a raster where 1 indicates a stream all other values ignored must be same dimensions and - projection as mfd_flow_direction_path. + projection as flow_direction_path. retention_eff_lulc_path (string): a path to a raster indicating the maximum retention efficiency that the landcover on that pixel can accumulate. @@ -48,7 +48,7 @@ def ndr_eff_calculation( """ cdef float effective_retention_nodata = -1.0 pygeoprocessing.new_raster_from_base( - mfd_flow_direction_path, effective_retention_path, gdal.GDT_Float32, + flow_direction_path, effective_retention_path, gdal.GDT_Float32, [effective_retention_nodata]) fp, to_process_flow_directions_path = tempfile.mkstemp( suffix='.tif', prefix='flow_to_process', @@ -62,18 +62,32 @@ def ndr_eff_calculation( result[:] |= ((((mfd_array >> (i*4)) & 0xF) > 0) << i).astype(numpy.uint8) return result + # create direction raster in bytes + def _d8_to_flow_dir_op(d8_array): + result = numpy.zeros(d8_array.shape, dtype=numpy.uint8) + for i in range(8): + result[d8_array == i] = 1 << i + return result + + flow_dir_op = _mfd_to_flow_dir_op if algorithm == 'MFD' else _d8_to_flow_dir_op + # convert mfd raster to binary mfd # each value is an 8-digit binary number # where 1 indicates that the pixel drains in that direction # and 0 indicates that it does not drain in that direction pygeoprocessing.raster_calculator( - [(mfd_flow_direction_path, 1)], _mfd_to_flow_dir_op, + [(flow_direction_path, 1)], flow_dir_op, to_process_flow_directions_path, gdal.GDT_Byte, None) - run_effective_retention[MFD]( - mfd_flow_direction_path.encode('utf-8'), + args = [ + flow_direction_path.encode('utf-8'), stream_path.encode('utf-8'), retention_eff_lulc_path.encode('utf-8'), crit_len_path.encode('utf-8'), to_process_flow_directions_path.encode('utf-8'), - effective_retention_path.encode('utf-8')) + effective_retention_path.encode('utf-8')] + + if algorithm == 'MFD': + run_effective_retention[MFD](*args) + else: # D8 + run_effective_retention[D8](*args) diff --git a/tests/test_ndr.py b/tests/test_ndr.py index 5dc1bd456..3774210cf 100644 --- a/tests/test_ndr.py +++ b/tests/test_ndr.py @@ -47,6 +47,7 @@ def generate_base_args(workspace_dir): 'watersheds_path': os.path.join(REGRESSION_DATA, 'input', 'watersheds.shp'), 'workspace_dir': workspace_dir, + 'algorithm': 'MFD' } return args.copy() @@ -202,8 +203,62 @@ def test_base_regression(self): """ from natcap.invest.ndr import ndr + # use predefined directory so test can clean up files during teardown + args = NDRTests.generate_base_args('/Users/emily/Documents/ndr_mfd') + # make an empty output shapefile on top of where the new output + # shapefile should reside to ensure the model overwrites it + with open( + os.path.join(self.workspace_dir, 'watershed_results_ndr.gpkg'), + 'wb') as f: + f.write(b'') + ndr.execute(args) + + result_vector = ogr.Open(os.path.join( + args['workspace_dir'], 'watershed_results_ndr.gpkg')) + result_layer = result_vector.GetLayer() + result_feature = result_layer.GetFeature(1) + result_layer = None + result_vector = None + mismatch_list = [] + # these values were generated by manual inspection of regression + # results + for field, expected_value in [ + ('p_surface_load', 41.921860), + ('p_surface_export', 5.899117), + ('n_surface_load', 2978.519775), + ('n_surface_export', 289.0498), + ('n_subsurface_load', 28.614094), + ('n_subsurface_export', 15.61077), + ('n_total_export', 304.660614)]: + val = result_feature.GetField(field) + if not numpy.isclose(val, expected_value): + mismatch_list.append( + (field, 'expected: %f' % expected_value, + 'actual: %f' % val)) + result_feature = None + if mismatch_list: + raise RuntimeError("results not expected: %s" % mismatch_list) + + # We only need to test that the drainage mask exists. Functionality + # for that raster is tested in SDR. + self.assertTrue( + os.path.exists( + os.path.join( + args['workspace_dir'], 'intermediate_outputs', + 'what_drains_to_stream.tif'))) + + def test_base_regression_d8(self): + """NDR base regression test on sample data in D8 mode. + + Execute NDR with sample data and checks that the output files are + generated and that the aggregate shapefile fields are the same as the + regression case. + """ + from natcap.invest.ndr import ndr + # use predefined directory so test can clean up files during teardown args = NDRTests.generate_base_args(self.workspace_dir) + args['algorithm'] = 'D8' # make an empty output shapefile on top of where the new output # shapefile should reside to ensure the model overwrites it with open( @@ -246,6 +301,7 @@ def test_base_regression(self): args['workspace_dir'], 'intermediate_outputs', 'what_drains_to_stream.tif'))) + def test_regression_undefined_nodata(self): """NDR test when DEM, LULC and runoff proxy have undefined nodata.""" from natcap.invest.ndr import ndr @@ -361,6 +417,7 @@ def test_validation(self): 'k_param', 'watersheds_path', 'subsurface_eff_n', + 'algorithm' ] self.assertEqual(set(invalid_args), set(expected_missing_args))