Skip to content

Commit

Permalink
run and test ndr in D8 mode
Browse files Browse the repository at this point in the history
  • Loading branch information
emlys committed Sep 25, 2024
1 parent 2c430c3 commit cbf8098
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 70 deletions.
2 changes: 2 additions & 0 deletions src/natcap/invest/managed_raster/ManagedRaster.h
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,7 @@ class DownslopeNeighborIterator {

template<typename T_ = T, std::enable_if_t<std::is_same<T_, D8>::value>* = nullptr>
NeighborTuple next() {
flow_dir_sum = 1;
if (n_dir == 8) {
return NeighborTuple(8, -1, -1, -1);
}
Expand Down Expand Up @@ -512,6 +513,7 @@ class DownslopeNeighborIterator {

template<typename T_ = T, std::enable_if_t<std::is_same<T_, D8>::value>* = nullptr>
NeighborTuple next_no_skip() {
flow_dir_sum = 1;
if (n_dir == 8) {
return NeighborTuple(8, -1, -1, -1);
}
Expand Down
190 changes: 130 additions & 60 deletions src/natcap/invest/ndr/ndr.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,15 @@
"reached through subsurface flow. This characterizes the "
"retention due to biochemical degradation in soils. Required "
"if Calculate Nitrogen is selected.")
},
"algorithm": {
"type": "option_string",
"options": {
"D8": {"description": "D8 flow direction"},
"MFD": {"description": "Multiple flow direction"}
},
"about": gettext("Flow direction algorithm to use."),
"name": gettext("flow direction algorithm")
}
},
"outputs": {
Expand Down Expand Up @@ -668,35 +677,6 @@ def execute(args):
target_path_list=[f_reg['filled_dem_path']],
task_name='fill pits')

flow_dir_task = task_graph.add_task(
func=pygeoprocessing.routing.flow_dir_mfd,
args=(
(f_reg['filled_dem_path'], 1), f_reg['flow_direction_path']),
kwargs={'working_dir': intermediate_output_dir},
dependent_task_list=[fill_pits_task],
target_path_list=[f_reg['flow_direction_path']],
task_name='flow dir')

flow_accum_task = task_graph.add_task(
func=pygeoprocessing.routing.flow_accumulation_mfd,
args=(
(f_reg['flow_direction_path'], 1),
f_reg['flow_accumulation_path']),
target_path_list=[f_reg['flow_accumulation_path']],
dependent_task_list=[flow_dir_task],
task_name='flow accum')

stream_extraction_task = task_graph.add_task(
func=pygeoprocessing.routing.extract_streams_mfd,
args=(
(f_reg['flow_accumulation_path'], 1),
(f_reg['flow_direction_path'], 1),
float(args['threshold_flow_accumulation']),
f_reg['stream_path']),
target_path_list=[f_reg['stream_path']],
dependent_task_list=[flow_accum_task],
task_name='stream extraction')

calculate_slope_task = task_graph.add_task(
func=pygeoprocessing.calculate_slope,
args=((f_reg['filled_dem_path'], 1), f_reg['slope_path']),
Expand All @@ -714,6 +694,81 @@ def execute(args):
dependent_task_list=[calculate_slope_task],
task_name='threshold slope')

if args['algorithm'] == 'MFD':
flow_dir_task = task_graph.add_task(
func=pygeoprocessing.routing.flow_dir_mfd,
args=(
(f_reg['filled_dem_path'], 1), f_reg['flow_direction_path']),
kwargs={'working_dir': intermediate_output_dir},
dependent_task_list=[fill_pits_task],
target_path_list=[f_reg['flow_direction_path']],
task_name='flow dir')

flow_accum_task = task_graph.add_task(
func=pygeoprocessing.routing.flow_accumulation_mfd,
args=(
(f_reg['flow_direction_path'], 1),
f_reg['flow_accumulation_path']),
target_path_list=[f_reg['flow_accumulation_path']],
dependent_task_list=[flow_dir_task],
task_name='flow accum')

stream_extraction_task = task_graph.add_task(
func=pygeoprocessing.routing.extract_streams_mfd,
args=(
(f_reg['flow_accumulation_path'], 1),
(f_reg['flow_direction_path'], 1),
float(args['threshold_flow_accumulation']),
f_reg['stream_path']),
target_path_list=[f_reg['stream_path']],
dependent_task_list=[flow_accum_task],
task_name='stream extraction')
s_task = task_graph.add_task(
func=pygeoprocessing.routing.flow_accumulation_mfd,
args=((f_reg['flow_direction_path'], 1), f_reg['s_accumulation_path']),
kwargs={
'weight_raster_path_band': (f_reg['thresholded_slope_path'], 1)},
target_path_list=[f_reg['s_accumulation_path']],
dependent_task_list=[flow_dir_task, threshold_slope_task],
task_name='route s')
else: # D8
flow_dir_task = task_graph.add_task(
func=pygeoprocessing.routing.flow_dir_d8,
args=(
(f_reg['filled_dem_path'], 1), f_reg['flow_direction_path']),
kwargs={'working_dir': intermediate_output_dir},
dependent_task_list=[fill_pits_task],
target_path_list=[f_reg['flow_direction_path']],
task_name='flow dir')

flow_accum_task = task_graph.add_task(
func=pygeoprocessing.routing.flow_accumulation_d8,
args=(
(f_reg['flow_direction_path'], 1),
f_reg['flow_accumulation_path']),
target_path_list=[f_reg['flow_accumulation_path']],
dependent_task_list=[flow_dir_task],
task_name='flow accum')

stream_extraction_task = task_graph.add_task(
func=pygeoprocessing.routing.extract_streams_d8,
kwargs=dict(
flow_accum_raster_path_band=(f_reg['flow_accumulation_path'], 1),
flow_threshold=float(args['threshold_flow_accumulation']),
target_stream_raster_path=f_reg['stream_path']),
target_path_list=[f_reg['stream_path']],
dependent_task_list=[flow_accum_task],
task_name='stream extraction')

s_task = task_graph.add_task(
func=pygeoprocessing.routing.flow_accumulation_d8,
args=((f_reg['flow_direction_path'], 1), f_reg['s_accumulation_path']),
kwargs={
'weight_raster_path_band': (f_reg['thresholded_slope_path'], 1)},
target_path_list=[f_reg['s_accumulation_path']],
dependent_task_list=[flow_dir_task, threshold_slope_task],
task_name='route s')

runoff_proxy_index_task = task_graph.add_task(
func=_normalize_raster,
args=((f_reg['masked_runoff_proxy_path'], 1),
Expand All @@ -722,15 +777,6 @@ def execute(args):
dependent_task_list=[align_raster_task, mask_runoff_proxy_task],
task_name='runoff proxy mean')

s_task = task_graph.add_task(
func=pygeoprocessing.routing.flow_accumulation_mfd,
args=((f_reg['flow_direction_path'], 1), f_reg['s_accumulation_path']),
kwargs={
'weight_raster_path_band': (f_reg['thresholded_slope_path'], 1)},
target_path_list=[f_reg['s_accumulation_path']],
dependent_task_list=[flow_dir_task, threshold_slope_task],
task_name='route s')

s_bar_task = task_graph.add_task(
func=pygeoprocessing.raster_map,
kwargs=dict(
Expand Down Expand Up @@ -762,27 +808,50 @@ def execute(args):
dependent_task_list=[threshold_slope_task],
task_name='s inv')

d_dn_task = task_graph.add_task(
func=pygeoprocessing.routing.distance_to_channel_mfd,
args=(
(f_reg['flow_direction_path'], 1),
(f_reg['stream_path'], 1),
f_reg['d_dn_path']),
kwargs={'weight_raster_path_band': (
f_reg['s_factor_inverse_path'], 1)},
dependent_task_list=[stream_extraction_task, s_inv_task],
target_path_list=[f_reg['d_dn_path']],
task_name='d dn')

dist_to_channel_task = task_graph.add_task(
func=pygeoprocessing.routing.distance_to_channel_mfd,
args=(
(f_reg['flow_direction_path'], 1),
(f_reg['stream_path'], 1),
f_reg['dist_to_channel_path']),
dependent_task_list=[stream_extraction_task],
target_path_list=[f_reg['dist_to_channel_path']],
task_name='dist to channel')
if args['algorithm'] == 'MFD':
d_dn_task = task_graph.add_task(
func=pygeoprocessing.routing.distance_to_channel_mfd,
args=(
(f_reg['flow_direction_path'], 1),
(f_reg['stream_path'], 1),
f_reg['d_dn_path']),
kwargs={'weight_raster_path_band': (
f_reg['s_factor_inverse_path'], 1)},
dependent_task_list=[stream_extraction_task, s_inv_task],
target_path_list=[f_reg['d_dn_path']],
task_name='d dn')

dist_to_channel_task = task_graph.add_task(
func=pygeoprocessing.routing.distance_to_channel_mfd,
args=(
(f_reg['flow_direction_path'], 1),
(f_reg['stream_path'], 1),
f_reg['dist_to_channel_path']),
dependent_task_list=[stream_extraction_task],
target_path_list=[f_reg['dist_to_channel_path']],
task_name='dist to channel')
else: # D8
d_dn_task = task_graph.add_task(
func=pygeoprocessing.routing.distance_to_channel_d8,
args=(
(f_reg['flow_direction_path'], 1),
(f_reg['stream_path'], 1),
f_reg['d_dn_path']),
kwargs={'weight_raster_path_band': (
f_reg['s_factor_inverse_path'], 1)},
dependent_task_list=[stream_extraction_task, s_inv_task],
target_path_list=[f_reg['d_dn_path']],
task_name='d dn')

dist_to_channel_task = task_graph.add_task(
func=pygeoprocessing.routing.distance_to_channel_d8,
args=(
(f_reg['flow_direction_path'], 1),
(f_reg['stream_path'], 1),
f_reg['dist_to_channel_path']),
dependent_task_list=[stream_extraction_task],
target_path_list=[f_reg['dist_to_channel_path']],
task_name='dist to channel')

_ = task_graph.add_task(
func=sdr._calculate_what_drains_to_stream,
Expand Down Expand Up @@ -869,7 +938,8 @@ def execute(args):
args=(
f_reg['flow_direction_path'],
f_reg['stream_path'], eff_path,
crit_len_path, effective_retention_path),
crit_len_path, effective_retention_path,
args['algorithm']),
target_path_list=[effective_retention_path],
dependent_task_list=[
stream_extraction_task, eff_task, crit_len_task],
Expand Down
34 changes: 24 additions & 10 deletions src/natcap/invest/ndr/ndr_core.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@ LOGGER = logging.getLogger(__name__)
cdef int STREAM_EFFECTIVE_RETENTION = 0

def ndr_eff_calculation(
mfd_flow_direction_path, stream_path, retention_eff_lulc_path,
crit_len_path, effective_retention_path):
flow_direction_path, stream_path, retention_eff_lulc_path,
crit_len_path, effective_retention_path, algorithm):
"""Calculate flow downhill effective_retention to the channel.
Args:
mfd_flow_direction_path (string): a path to a raster with
pygeoprocessing.routing MFD flow direction values.
flow_direction_path (string): a path to a raster with
pygeoprocessing.routing flow direction values (MFD or D8).
stream_path (string): a path to a raster where 1 indicates a
stream all other values ignored must be same dimensions and
projection as mfd_flow_direction_path.
projection as flow_direction_path.
retention_eff_lulc_path (string): a path to a raster indicating
the maximum retention efficiency that the landcover on that
pixel can accumulate.
Expand All @@ -48,7 +48,7 @@ def ndr_eff_calculation(
"""
cdef float effective_retention_nodata = -1.0
pygeoprocessing.new_raster_from_base(
mfd_flow_direction_path, effective_retention_path, gdal.GDT_Float32,
flow_direction_path, effective_retention_path, gdal.GDT_Float32,
[effective_retention_nodata])
fp, to_process_flow_directions_path = tempfile.mkstemp(
suffix='.tif', prefix='flow_to_process',
Expand All @@ -62,18 +62,32 @@ def ndr_eff_calculation(
result[:] |= ((((mfd_array >> (i*4)) & 0xF) > 0) << i).astype(numpy.uint8)
return result

# create direction raster in bytes
def _d8_to_flow_dir_op(d8_array):
result = numpy.zeros(d8_array.shape, dtype=numpy.uint8)
for i in range(8):
result[d8_array == i] = 1 << i
return result

flow_dir_op = _mfd_to_flow_dir_op if algorithm == 'MFD' else _d8_to_flow_dir_op

# convert mfd raster to binary mfd
# each value is an 8-digit binary number
# where 1 indicates that the pixel drains in that direction
# and 0 indicates that it does not drain in that direction
pygeoprocessing.raster_calculator(
[(mfd_flow_direction_path, 1)], _mfd_to_flow_dir_op,
[(flow_direction_path, 1)], flow_dir_op,
to_process_flow_directions_path, gdal.GDT_Byte, None)

run_effective_retention[MFD](
mfd_flow_direction_path.encode('utf-8'),
args = [
flow_direction_path.encode('utf-8'),
stream_path.encode('utf-8'),
retention_eff_lulc_path.encode('utf-8'),
crit_len_path.encode('utf-8'),
to_process_flow_directions_path.encode('utf-8'),
effective_retention_path.encode('utf-8'))
effective_retention_path.encode('utf-8')]

if algorithm == 'MFD':
run_effective_retention[MFD](*args)
else: # D8
run_effective_retention[D8](*args)
Loading

0 comments on commit cbf8098

Please sign in to comment.