Skip to content

Commit

Permalink
Update runAnalysis.py
Browse files Browse the repository at this point in the history
  • Loading branch information
matamadio committed Sep 19, 2023
1 parent e579718 commit dff7aac
Showing 1 changed file with 21 additions and 18 deletions.
39 changes: 21 additions & 18 deletions Top-down/parallelization/runAnalysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def zonal_stats_parallel(args):
def run_analysis(country: str, haz_cat: str, valid_RPs: list[int],
min_haz_threshold: float, exp_cat: str, exp_nam: str, adm_name: str,
analysis_type: str, class_edges: list[float],
save_check_raster: bool):
save_check_raster: bool, n_cores=None):
"""
Run specified analysis.
Expand Down Expand Up @@ -109,7 +109,11 @@ def run_analysis(country: str, haz_cat: str, valid_RPs: list[int],
all_adm_codes = adm_cols[all_adm_codes].to_list()
all_adm_names = adm_cols[all_adm_names].to_list()

cores = mp.cpu_count()
#if n_cores is None:
# cores = min(len(adm_data.index), mp.cpu_count())
#else:
# cores = n_cores
cores = min(len(adm_data.index), mp.cpu_count())
with mp.Pool(cores) as p:
# Get total exposure for each ADM region
func = partial(zonal_stats_partial, raster=exp_ras, stats="sum")
Expand All @@ -127,18 +131,22 @@ def run_analysis(country: str, haz_cat: str, valid_RPs: list[int],

# Defining the list of valid prob_RPs - probability of return period
prob_RPs = 1./np.array(valid_RPs)
prob_RPs_LB = np.append(-np.diff(prob_RPs),prob_RPs[-1]).tolist() # Lower bound
prob_RPs_UB = np.insert(-np.diff(prob_RPs),0,0.).tolist() # Upper bound
prob_RPs_LB = np.append(-np.diff(prob_RPs),prob_RPs[-1]).tolist() # Lower bound - alternative --> prob_RPs_LB = np.append(1-prob_RPs[0],-np.diff(prob_RPs)).tolist() # Lower bound [0-1]
prob_RPs_UB = np.insert(-np.diff(prob_RPs),0,0.).tolist() # Upper bound - alternative --> prob_RPs_UB = np.append(1-prob_RPs[0],-np.diff(prob_RPs)).tolist() # Upper bound [0-1]
prob_RPs_Mean = ((np.array(prob_RPs_LB)+np.array(prob_RPs_UB))/2).tolist() # Mean value
prob_RPs_df = pd.DataFrame({'RPs':valid_RPs,
'prob_RPs':prob_RPs,
'prob_RPs_LB':prob_RPs_LB,
'prob_RPs_UB':prob_RPs_UB,
'prob_RPs_Mean':prob_RPs_Mean})
prob_RPs_df.to_csv(os.path.join(OUTPUT_DIR, f"{country}_{haz_cat}_prob_RPs.csv"), index=False)

# Computing the results for each RP
n_valid_RPs_gt_1 = len(valid_RPs) > 1
cores = min(len(valid_RPs), mp.cpu_count())
if n_cores is None:
cores = min(len(valid_RPs), mp.cpu_count())
else:
cores = n_cores
with mp.Pool(cores) as p:
params = {
"haz_folder": haz_folder,
Expand Down Expand Up @@ -255,28 +263,23 @@ def calc_imp_RPs(RPs, haz_folder, analysis_type, country, haz_cat, exp_cat, exp_
# Checking the analysis_type
if analysis_type == "Function":
# Assign impact factor (this is F_i in the equations)
impact_rst = damage_factor(haz_data)
haz_data = damage_factor(haz_data)
if save_check_raster:
impact_rst.rio.to_raster(os.path.join(OUTPUT_DIR, f"{country}_{haz_cat}_{rp}_{exp_cat}_haz_imp_factor.tif"))

haz_data.rio.to_raster(os.path.join(OUTPUT_DIR, f"{country}_{haz_cat}_{rp}_{exp_cat}_haz_imp_factor.tif"))
elif analysis_type == "Classes":
# Assign bin values to raster data
# Follows: x_{i-1} <= x_{i} < x_{i+1}
# Assign bin values to raster data - Follows: x_{i-1} <= x_{i} < x_{i+1}
bin_idx = np.digitize(haz_data, bin_seq)
impact_rst = haz_data

# Calculate affected exposure in ADM
# Filter down to valid areas affected areas which have people
affected_exp = exp_data.where(impact_rst.data > 0, np.nan)
affected_exp = exp_data.where(haz_data.data > 0, np.nan)

if save_check_raster:
affected_exp.rio.to_raster(os.path.join(OUTPUT_DIR, f"{country}_{haz_cat}_{exp_cat}_{rp}_affected.tif"))

del haz_data
gc.collect()


# Conduct analyses for classes
if analysis_type == "Classes":
del haz_data
for bin_x in reversed(range(num_bins)):
# Compute the impact for this class
impact_class = gen_zonal_stats(vectors=adm_data["geometry"],
Expand All @@ -295,15 +298,15 @@ def calc_imp_RPs(RPs, haz_folder, analysis_type, country, haz_cat, exp_cat, exp_
stats=["sum"], affine=affected_exp.rio.transform(), nodata=np.nan)
result_df[f"RP{rp}_{exp_cat}_exp"] = [x['sum'] for x in affected_exp_per_ADM]
# Calculate impacted exposure in affected areas
impact_exp = affected_exp.data * impact_rst
impact_exp = affected_exp.data * haz_data
# If save intermediate to disk is TRUE, then
if save_check_raster:
impact_exp.rio.to_raster(os.path.join(OUTPUT_DIR, f"{country}_{haz_cat}_{exp_cat}_{rp}_impact.tif"))
# Compute the impact per ADM level
impact_exp_per_ADM = gen_zonal_stats(vectors=adm_data["geometry"], raster=impact_exp.data, stats=["sum"],
affine=impact_exp.rio.transform(), nodata=np.nan)
result_df[f"RP{rp}_{exp_cat}_imp"] = [x['sum'] for x in impact_exp_per_ADM]
del (impact_exp, impact_exp_per_ADM, affected_exp_per_ADM)
del (haz_data, impact_exp, impact_exp_per_ADM, affected_exp_per_ADM)

del affected_exp
gc.collect()
Expand Down

0 comments on commit dff7aac

Please sign in to comment.