Skip to content

Commit

Permalink
MESMER-X: decorator to supress warnings (MESMER-group#467)
Browse files Browse the repository at this point in the history
  • Loading branch information
mathause authored Jun 14, 2024
1 parent 9fcb6be commit f0bddc4
Showing 1 changed file with 152 additions and 147 deletions.
299 changes: 152 additions & 147 deletions mesmer/mesmer_x/train_l_distrib_mesmerx.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"""

import functools
import warnings

import numpy as np
Expand All @@ -24,6 +25,19 @@
from mesmer.stats import gaspari_cohn_correlation_matrices


def ignore_warnings(func):
# adapted from https://stackoverflow.com/a/70292317
# TODO: don't supress all warnings

@functools.wraps(func)
def wrapper(*args, **kwargs):
with warnings.catch_warnings():
warnings.filterwarnings("ignore")
return func(*args, **kwargs)

return wrapper


def xr_train_distrib(
predictors,
target,
Expand Down Expand Up @@ -749,7 +763,8 @@ def test_all(self, coefficients):
# evaluated
return test_coeff, test_param, test_proba, distrib

# FIRST GUESS
# supress nan & inf warnings
@ignore_warnings
def find_fg(self):
"""
compute first guess of the coefficients, to ensure convergence of the incoming fit.
Expand Down Expand Up @@ -838,142 +853,135 @@ def find_fg(self):
situations: variables, grid points, distributions & expressions.
"""

# during optimizations, will encounter many warnings because NaN or inf values
# encountered: avoiding the spam.
with warnings.catch_warnings():
warnings.simplefilter("ignore")

# preparing derivatives to estimate derivatives of data along predictors,
# and infer a very first guess for the coefficients facilitates the
# representation of the trends
self.smooth_data_targ = self.smooth_data(self.data_targ)

m, s = np.mean(self.smooth_data_targ), np.std(self.smooth_data_targ)
ind_targ_low = np.where(self.smooth_data_targ < m - s)[0]
ind_targ_high = np.where(self.smooth_data_targ > m + s)[0]
pred_low = {
p: np.mean(self.data_pred[p][ind_targ_low]) for p in self.data_pred
}
pred_high = {
p: np.mean(self.data_pred[p][ind_targ_high]) for p in self.data_pred
}
deriv_targ = {
p: (
np.mean(self.smooth_data_targ[ind_targ_high])
- np.mean(self.smooth_data_targ[ind_targ_low])
)
/ (pred_high[p] - pred_low[p])
for p in self.data_pred
}
self.fg_info_derivatives = {
"pred_low": pred_low,
"pred_high": pred_high,
"deriv_targ": deriv_targ,
"m": m,
}

# Initialize first guess
if self.first_guess is None:
self.fg_coeffs = np.zeros(self.n_coeffs)

# Step 1: fit coefficients of location (objective: generate an adequate
# first guess for the coefficients of location. proven to be necessary
# in many situations, & accelerate step 2)
globalfit_d01 = basinhopping(
func=self.fg_fun_deriv01, x0=self.fg_coeffs, niter=10
)
# warning, basinhopping tends to indroduce non-reproductibility in fits,
# reduced when using 2nd round of fits

self.fg_coeffs = globalfit_d01.x
# preparing derivatives to estimate derivatives of data along predictors,
# and infer a very first guess for the coefficients facilitates the
# representation of the trends
self.smooth_data_targ = self.smooth_data(self.data_targ)

m, s = np.mean(self.smooth_data_targ), np.std(self.smooth_data_targ)
ind_targ_low = np.where(self.smooth_data_targ < m - s)[0]
ind_targ_high = np.where(self.smooth_data_targ > m + s)[0]
pred_low = {p: np.mean(self.data_pred[p][ind_targ_low]) for p in self.data_pred}
pred_high = {
p: np.mean(self.data_pred[p][ind_targ_high]) for p in self.data_pred
}
deriv_targ = {
p: (
np.mean(self.smooth_data_targ[ind_targ_high])
- np.mean(self.smooth_data_targ[ind_targ_low])
)
/ (pred_high[p] - pred_low[p])
for p in self.data_pred
}
self.fg_info_derivatives = {
"pred_low": pred_low,
"pred_high": pred_high,
"deriv_targ": deriv_targ,
"m": m,
}

else:
# Using provided first guess, eg from 1st round of fits
self.fg_coeffs = np.copy(self.first_guess)
# Initialize first guess
if self.first_guess is None:
self.fg_coeffs = np.zeros(self.n_coeffs)

# Step 2: fit coefficients of location (objective: improving the subset of
# location coefficients)
self.fg_ind_loc = np.array(
[
self.expr_fit.coefficients_list.index(c)
for c in self.expr_fit.coefficients_dict["loc"]
]
# Step 1: fit coefficients of location (objective: generate an adequate
# first guess for the coefficients of location. proven to be necessary
# in many situations, & accelerate step 2)
globalfit_d01 = basinhopping(
func=self.fg_fun_deriv01, x0=self.fg_coeffs, niter=10
)
localfit_loc = self.minimize(
func=self.fg_fun_loc,
x0=self.fg_coeffs[self.fg_ind_loc],
fact_maxfev_iter=len(self.fg_ind_loc) / self.n_coeffs,
option_NelderMead="best_run",
)
self.fg_coeffs[self.fg_ind_loc] = localfit_loc.x
# warning, basinhopping tends to indroduce non-reproductibility in fits,
# reduced when using 2nd round of fits

# Step 3: fit coefficients of scale (objective: improving the subset of
# scale coefficients)
self.fg_ind_sca = np.array(
[
self.expr_fit.coefficients_list.index(c)
for c in self.expr_fit.coefficients_dict["scale"]
]
)
if self.first_guess is None:
# compared to all 0, better for ref level but worse for trend
x0 = np.std(self.data_targ) * np.ones(
len(self.expr_fit.coefficients_dict["scale"])
)
self.fg_coeffs = globalfit_d01.x

else:
x0 = self.fg_coeffs[self.fg_ind_sca]
localfit_sca = self.minimize(
func=self.fg_fun_sca,
x0=x0,
fact_maxfev_iter=len(self.fg_ind_sca) / self.n_coeffs,
option_NelderMead="best_run",
)
self.fg_coeffs[self.fg_ind_sca] = localfit_sca.x

# Step 4: fit coefficients using NLL (objective: improving all coefficients,
# necessary to get good estimates for shape parameters, and avoid some local minima)
localfit_nll = self.minimize(
func=self.fg_fun_NLL_notests,
x0=self.fg_coeffs,
fact_maxfev_iter=1,
option_NelderMead="best_run",
)
self.fg_coeffs = localfit_nll.x

# Step 5: fit on CDF or LL^n (objective: improving all coefficients,
# necessary to have all points within support. NB: NLL doesnt behave well enough here)
if False:
# TODO: unreachable code?
# fit coefficients on CDFs
fun_opti_prob = self.fg_fun_cdfs
else:
# fit coefficients on log-likelihood to the power n
fun_opti_prob = self.fg_fun_LL_n

localfit_opti = self.minimize(
func=fun_opti_prob,
x0=self.fg_coeffs,
fact_maxfev_iter=1,
option_NelderMead="best_run",
else:
# Using provided first guess, eg from 1st round of fits
self.fg_coeffs = np.copy(self.first_guess)

# Step 2: fit coefficients of location (objective: improving the subset of
# location coefficients)
self.fg_ind_loc = np.array(
[
self.expr_fit.coefficients_list.index(c)
for c in self.expr_fit.coefficients_dict["loc"]
]
)
localfit_loc = self.minimize(
func=self.fg_fun_loc,
x0=self.fg_coeffs[self.fg_ind_loc],
fact_maxfev_iter=len(self.fg_ind_loc) / self.n_coeffs,
option_NelderMead="best_run",
)
self.fg_coeffs[self.fg_ind_loc] = localfit_loc.x

# Step 3: fit coefficients of scale (objective: improving the subset of
# scale coefficients)
self.fg_ind_sca = np.array(
[
self.expr_fit.coefficients_list.index(c)
for c in self.expr_fit.coefficients_dict["scale"]
]
)
if self.first_guess is None:
# compared to all 0, better for ref level but worse for trend
x0 = np.std(self.data_targ) * np.ones(
len(self.expr_fit.coefficients_dict["scale"])
)
self.fg_coeffs = localfit_opti.x

# Step 6: if required, global fit within boundaries
if self.fg_with_global_opti:
# find boundaries on each coefficient
bounds = []
for i_c in np.arange(self.n_coeffs):
vals_bounds = self.find_bound(
i_c=i_c, x0=self.fg_coeffs, fact_coeff=-0.05
), self.find_bound(i_c=i_c, x0=self.fg_coeffs, fact_coeff=0.05)
bounds.append([np.min(vals_bounds), np.max(vals_bounds)])

# global minimization, using the one with the best performances in this
# situation. sobol or halton, observed lower performances with
# implicial. n=1000, options={'maxiter':10000, 'maxev':10000})
globalfit_all = shgo(self.func_optim, bounds, sampling_method="sobol")
self.fg_coeffs = globalfit_all.x

else:
x0 = self.fg_coeffs[self.fg_ind_sca]
localfit_sca = self.minimize(
func=self.fg_fun_sca,
x0=x0,
fact_maxfev_iter=len(self.fg_ind_sca) / self.n_coeffs,
option_NelderMead="best_run",
)
self.fg_coeffs[self.fg_ind_sca] = localfit_sca.x

# Step 4: fit coefficients using NLL (objective: improving all coefficients,
# necessary to get good estimates for shape parameters, and avoid some local minima)
localfit_nll = self.minimize(
func=self.fg_fun_NLL_notests,
x0=self.fg_coeffs,
fact_maxfev_iter=1,
option_NelderMead="best_run",
)
self.fg_coeffs = localfit_nll.x

# Step 5: fit on CDF or LL^n (objective: improving all coefficients,
# necessary to have all points within support. NB: NLL doesnt behave well enough here)
if False:
# TODO: unreachable code?
# fit coefficients on CDFs
fun_opti_prob = self.fg_fun_cdfs
else:
# fit coefficients on log-likelihood to the power n
fun_opti_prob = self.fg_fun_LL_n

localfit_opti = self.minimize(
func=fun_opti_prob,
x0=self.fg_coeffs,
fact_maxfev_iter=1,
option_NelderMead="best_run",
)
self.fg_coeffs = localfit_opti.x

# Step 6: if required, global fit within boundaries
if self.fg_with_global_opti:
# find boundaries on each coefficient
bounds = []
for i_c in np.arange(self.n_coeffs):
vals_bounds = self.find_bound(
i_c=i_c, x0=self.fg_coeffs, fact_coeff=-0.05
), self.find_bound(i_c=i_c, x0=self.fg_coeffs, fact_coeff=0.05)
bounds.append([np.min(vals_bounds), np.max(vals_bounds)])

# global minimization, using the one with the best performances in this
# situation. sobol or halton, observed lower performances with
# implicial. n=1000, options={'maxiter':10000, 'maxev':10000})
globalfit_all = shgo(self.func_optim, bounds, sampling_method="sobol")
self.fg_coeffs = globalfit_all.x

def minimize(self, func, x0, fact_maxfev_iter=1, option_NelderMead="dont_run"):
"""
Expand Down Expand Up @@ -1206,30 +1214,27 @@ def crps(self, coeffs):
# averaging
return np.sum(self.weights_driver * np.array(tmp_cprs))

# FIT
@ignore_warnings # supress nan & inf warnings
def fit(self):
# Before fitting, need a good first guess, using 'find_fg'.
if self.func_first_guess is not None:
self.func_first_guess()
else:
self.find_fg()

# fitting, and avoiding spamming error messages when nan or inf values
with warnings.catch_warnings():
warnings.simplefilter("ignore")
m = self.minimize(
func=self.func_optim,
x0=self.fg_coeffs,
fact_maxfev_iter=1,
option_NelderMead="best_run",
)
m = self.minimize(
func=self.func_optim,
x0=self.fg_coeffs,
fact_maxfev_iter=1,
option_NelderMead="best_run",
)

# checking if the fit has failed
if self.error_failedfit and (m.success is False):
raise Exception("Failed fit.")
else:
self.coefficients_fit = m.x
self.eval_quality_fit()
# checking if the fit has failed
if self.error_failedfit and (m.success is False):
raise Exception("Failed fit.")
else:
self.coefficients_fit = m.x
self.eval_quality_fit()

def eval_quality_fit(self):
# initialize
Expand Down

0 comments on commit f0bddc4

Please sign in to comment.