Source code for upoqa.solver

# Copyright (c) 2025, Yichuan Liu and Yingzhou Li
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""
UPOQA Solver
============

Maintain the main minimizer. 
"""

import numpy as np
import numpy.linalg as LA
import traceback
from .utils import *
from typing import List, Any, Dict, Tuple, Optional, Callable, Union, Literal
from copy import deepcopy
import warnings

__all__ = [
    "minimize",
]

TERMINATION_MSG = "\n# Optimization terminated: {msg}"


[docs] def minimize( fun: Union[ Callable[[np.ndarray], float], Dict[Any, Callable[[np.ndarray], float]], List[Callable[[np.ndarray], float]], ], x0: np.ndarray, coords: Union[ Dict[Any, Union[List, np.ndarray]], List[Union[List, np.ndarray]] ] = dict(), maxiter: Optional[int] = None, maxfev: Optional[Union[int, Dict[Any, int], List[int]]] = dict(), weights: Union[ Dict[Any, Union[float, int]], List[Union[float, int]], int, float ] = dict(), xforms: Union[ Dict[Any, Optional[List[Callable[[np.ndarray], Union[np.ndarray, float]]]]], List[Optional[List[Callable[[np.ndarray], Union[np.ndarray, float]]]]], ] = dict(), xform_bounds: Union[ Tuple[float, float], Dict[Any, Tuple[float, float]], List[Tuple[float, float]] ] = dict(), extra_fun: Optional[List[Callable[[np.ndarray], Union[np.ndarray, float]]]] = None, npt: Optional[Union[int, Dict[Any, int], List[int]]] = None, radius_init: float = 1.0, radius_final: float = 1e-6, noise_level: int = 0, seek_global_minimum: bool = False, f_target: Optional[float] = None, tr_shape: Literal["structured", "spherical"] = "structured", callback: Optional[ Union[ Callable[[OptimizeResult], Optional[bool]], Callable[[np.ndarray], Optional[bool]], ] ] = None, disp: bool = True, verbose: bool = False, debug: bool = False, return_internals: bool = False, options: Dict[Any, Any] = dict(), **kwargs, ) -> OptimizeResult: r""" Minimize an unconstrained objective function that has a partially-separable structure using a derivative-free model-based trust-region method. Partial separability means the objective can be expressed as: .. math:: \min_{x\in\mathbb{R}^n} \quad \sum_{i=1}^q f_i(U_i x), where :math:`f_i:\mathbb{R}^{|\mathcal{I}_i|} \to \mathbb{R}` are black-box functions (termed as *element function* or *element*) whose gradients and Hessians are unavailable, :math:`U_i:\mathbb{R}^n \to \mathbb{R}^{|\mathcal{I}_i|}` are projection operators, and :math:`\mathcal{I}_i \subset [i]` is an index set satisfying :math:`|\mathcal{I}_i| < n` (element functions depend on small subsets of variables). The solver also supports a more general objective form: .. math:: \min_{x\in\mathbb{R}^n} \quad f_0(x) + \sum_{i=1}^q w_i h_i\left(f_i(U_i x)\right), where: - :math:`f_0: \mathbb{R}^n \to \mathbb{R}` is a white-box component with computable gradients and Hessians - :math:`w_i \in \mathbb{R}` are element weights - :math:`h_i: \mathbb{R}\to\mathbb{R}` are smooth transformations on the outputs of :math:`f_i`, with computable gradients/Hessians Parameters ---------- fun : list of callables, dict of callables, or callable The objective element function(s). Can be: - A dictionary mapping element names to element functions - A list containing all element functions - A single function (not recommended; treated as a single element without exploiting structure) When using dictionaries, element names can be any hashable type. Dictionary input is recommended for better readability, especially when elements have different contexts or lack inherent ordering. For list input, the solver assigns default names as ``"fun[0]"``, ``"fun[1]"``, etc. x0 : ndarray, shape (n,) Initial guess for the optimization variables. coords : dict of array_like or list of array-like, optional Mapping of element names to variable indices. Each value specifies the variables an element depends on. Example for dictionary input: ``{'f1': [0, 1], 'f2': [2, 3]}`` indicates: - Element f1 depends on variables 0 and 1 - Element f2 depends on variables 2 and 3 Example for list input: ``[[0, 1], [2, 3]]`` assigns indices to elements in list order. If not provided, all elements default to depending on all variables. maxiter : int, optional Maximum number of iterations. Default: ``1000 * n``. maxfev : int, dict of int or list of int, optional Maximum function evaluations: - dict: Per-element limits (keys match ``fun``) - list: Per-element limits (order matches ``fun``) - int: Uniform limit for all elements weights : dict or list, optional Nonnegative weights :math:`w_i` for elements. Formats: - dict: Per-element weights (keys match ``fun``) - list: Per-element weights (order matches ``fun``) - scalar: Uniform weight for all elements xforms : dict or list, optional Transformations :math:`h_i` for element outputs. Each entry must be a list of length 3 containing [function, gradient, Hessian] callables. Example for two simple transformations: .. code-block:: python xforms = { 'ele1': [lambda x: x**2, lambda x: 2*x, lambda x: 2], 'ele2': [ lambda x: np.sqrt(x), lambda x: 1/(2*np.sqrt(x)), lambda x: -1/(4*x**(3/2)) ] } Note: Appropriate use of transformations can accelerate convergence speed but may introduce instability. For simple cases, consider incorporating them directly into element functions. xform_bounds : dict of tuple, list of tuple, optional Bounds for the input domain of transformation functions. Each entry must be a tuple of length 2 (lower_bound, upper_bound) defining the valid input range for a transformation. When provided, the solver automatically performs bound checking on transformation inputs, ensuring they remain within specified limits. Example: ``{'log_transform': (1e-8, np.inf)}`` prevents log(0) errors. Note: Even if element function outputs naturally stay within bounds, surrogate model outputs may exceed valid ranges during optimization. If your transformation functions cannot gracefully handle out-of-bound inputs, it is strongly recommended to provide this parameter to prevent algorithm crashes due to invalid transformation evaluations. extra_fun : list of callables, optional White-box component :math:`f_0` of the objective function, which is a list of length 3 specified as ``[function_value, gradient, Hessian]`` callables. npt : int or dict or list, optional Number of interpolation points per element. Formats similar to ``maxfev``. radius_init : float, default=1.0 Initial trust region radius radius_final : float, default=1e-6 Final (minimum) trust region radius noise_level : {0, 1, 2}, default=0 Estimated noise level in objective function: - ``noise_level=0``: No noise - ``noise_level=1``: Moderate noise. Restart mechanisms enabled and the default interpolation points are set to .. math:: \max\{\min\{0.8 n_i ^{1.5} + n_i + 1, (n_i + 1)(n_i + 2) / 2\}, 2 n_i + 1\} where :math:`n_i` are the dimension of the :math:`i`-th element. - ``noise_level=2``: Significant noise. Restart mechanisms enabled and the default interpolation points are set to .. math:: (n_i + 1)(n_i + 2) / 2 for better noise resilience. seek_global_minimum : bool, default=False Whether to enable global optimization restarts. When True, activates restart mechanisms with parameters optimized for global search (similar to Py-BOBYQA [1]_). f_target : float, optional Target on the objective function value. The optimization procedure is terminated when the objective function value at the iterate is less than or equal to this target. tr_shape : {'structured', 'spherical'}, default='structured' Geometry of the trust region: - ``'structured'``: Complex region formed by intersecting cylinders with element-specific radii. In simple cases (e.g., two orthogonal cylinders), resembles a Steinmetz solid. Allows larger steps in well-modeled directions while restricting poorly-modeled ones. - ``'spherical'``: Traditional spherical trust region. Simpler but less adaptive to element-wise model accuracy. Recommended if the ``'structured'`` option incurs excessive runtime overhead (typically when the number of elements is very large). callback : callable, optional A callback executed at each objective function evaluation. The method terminates if a ``StopIteration`` exception is raised by the callback function. Its signature can be one of the following: ``callback(intermediate_result)`` where ``intermediate_result`` is a keyword parameter that contains an instance of :class:`~upoqa.utils.result.OptimizeResult`, with attributes ``x``, ``fun`` and other useful information, being the point at which the objective function is evaluated and the value of the objective function, respectively. The name of the parameter must be ``intermediate_result`` for the callback to be passed an instance of :class:`~upoqa.utils.result.OptimizeResult`. Alternatively, the callback function can have the signature: ``callback(xk)`` where ``xk`` is the point at which the objective function is evaluated. Introspection is used to determine which of the signatures to invoke. disp : bool, default=True Whether to display progress. verbose : bool, default=False Whether to display detailed progress information. debug : bool, default=False Enable debugging features: - ``debug=1``: NaN checking in function evaluations - ``debug=2``: Include solver internals (manager, interp_set, model) in result - ``debug=3``: Enhanced verbosity when both disp and verbose are True return_internals : bool, default=False If True, the returned result will contain the fields ``manager``, ``interp_set`` and ``model`` for debugging purposes even if ``debug=False``. options : dict Advanced algorithm parameters. See the source code of :class:`~upoqa.utils.params.UPOQAParameterList` for available options. Returns ------- :class:`~upoqa.utils.result.OptimizeResult` Result of the optimization procedure, with the following fields: message : str Description of the cause of the termination. success : bool Whether the optimization procedure terminated successfully. fun : float Value of objective function at the solution. funs : dict or list Values of the element functions evaluated at the solution. The type matches the input ``fun``: - If ``fun`` was a dictionary, return a dictionary mapping element names to their function values. - If ``fun`` was a list, return a list of function values in the same order as the input list. extra_fun : float Value of extra function (the white-box component :math:`f_0`) at the solution x : ndarray The solution of the optimization. jac, hess : ndarray Values of objective function's Jacobian and its Hessian at ``x``. The Hessian is an approximation. nit : int Number of iterations performed by the optimizer. nfev : dict or list Number of evaluations of element functions. The type matches the input ``fun``: - If ``fun`` was a dictionary, return a dictionary mapping element names to their numbers of function evaluations. - If ``fun`` was a list, return a list of evaluation numbers in the same order as the input list. max_nfev : int Maximum number of evaluations of element functions. avg_nfev : float Average number of evaluations of element functions. nrun : int Number of runs (when enabling restarts). If ``debug=True`` or ``return_internals=True``, the result also has the following fields: manager : :class:`~upoqa.utils.manager.UPOQAManager` Algorithm manager. interp_set : :class:`~upoqa.utils.interp_set.OverallInterpSet` Interpolation point set. model : :class:`~upoqa.utils.model.OverallSurrogate` Surrogate model. If an exception is raised during the optimization that cannot be handled by the algorithm, the result will also contain the following fields: exception : Exception Exception raised during the optimization that caused the termination, if any. traceback : str Traceback of the exception, if any. References ---------- .. [1] C. Cartis, J. Fiala, B. Marteau, and L. Roberts. Improving the flexibility and robustness of model-based derivative-free optimization solvers. *ACM Trans. Math. Softw.* 45, 3 (2019), 1-41. `doi:10.1145/3338517 <https://doi.org/10.1145/3338517>`_. Examples -------- Here is a simple example of using the solver to minimize a function with two elements. Consider the following problem: .. math:: \min_{x,y,z\in \mathbb{R}} \quad x^2 + 2y^2 + z^2 + 2xy - (y + 1)z let's replace :math:`[x,y,z]` with :math:`\mathbf{x} = [x_1,x_2,x_3]`, and rewrite this problem into .. math:: \min_{\mathbf{x}\in\mathbb{R}^3} \quad f_1(x_1, x_2) + f_2(x_2, x_3), where .. math:: \begin{align*} &f_1(x_1, x_2) = x_1^2 + x_2^2 + 2x_1 x_2, \\ &f_2(x_2, x_3) = x_2^2 + x_3^2 - (x_2 + 1)x_3, \end{align*} then the objective function can be implemented as: >>> def f1(x): # f1(x,y) ... return x[0] ** 2 + x[1] ** 2 + 2 * x[0] * x[1] # x^2 + y^2 + 2xy >>> def f2(x): # f2(y,z) ... return x[0] ** 2 + x[1] ** 2 - (x[0] + 1) * x[1] # y^2 + z^2 - (y+1)z >>> fun = {'xy': f1, 'yz': f2 } >>> coords = {'xy': [0, 1], 'yz': [1, 2]} The problem can be solved by: >>> x0 = [0, 0, 0] >>> res = minimize(fun, x0, coords = coords, disp = False) >>> res.x array([-0.33333332, 0.3333333 , 0.66666665]) >>> res.fun np.float64(-0.33333333333333215) """ ## catch any exception that may break the algorithm try: # Initialize basic parameters x0 = np.array(x0, dtype=np.float64) xk, x_best = x0.copy(), x0.copy() n = x0.size fval_k, f_best, extra_f_best = np.inf, np.inf, np.inf exit_info = None options.update(kwargs) if maxiter is None and ( maxfev is None or (isinstance(maxfev, Iterable) and len(maxfev) == 0) ): maxiter = 1000 * n if disp: disp_level = 1 if verbose: disp_level = 2 if debug: disp_level = 3 else: disp_level = 0 weights_xforms_enabled = bool(xforms or weights) extra_fun_enabled = bool(extra_fun) is_dict_input = isinstance(fun, dict) reorganized_inputs = get_reorganized_inputs_or_exit_info( fun, n, maxiter, maxfev, npt, coords, weights, xforms, xform_bounds, extra_fun, radius_init, radius_final, noise_level, seek_global_minimum, callback, debug, tr_shape, options, ) if isinstance(reorganized_inputs, ExitInfo): return _get_result_of_a_bad_run( exit_info=reorganized_inputs, disp=disp_level, return_internals=debug or return_internals, is_dict_input=is_dict_input, ) # extract parameters from preprocessed inputs ( ele_names, ele_idxs, params, resolution_init, resolution_final, callback_sig, tr_shape, fun, coords, weights, dims, npt, maxfev, maxiter, xforms, xform_bounds, extra_fun, ) = map( reorganized_inputs.get, [ "ele_names", "ele_idxs", "params", "resolution_init", "resolution_final", "callback_sig", "tr_shape", "fun", "coords", "weights", "dims", "npt", "maxfev", "maxiter", "xforms", "xform_bounds", "extra_fun", ], ) def proj_onto_ele(X: np.ndarray, ele_idx: int) -> np.ndarray: """project point x onto the elemental range space""" if X.ndim == 1: return X[coords[ele_idx]] elif X.ndim == 2: return X[:, coords[ele_idx]] return X # The radius size of the enveloping sphere trust region <= envelope_factor * max(radii) ele_num = len(ele_names) envelope_factor: float = min(n, ele_num) ** 0.5 # Bool masks for the elemental variable indices coords_mask = _build_coords_mask(coords, n) # Initialize the algorithm manager manager = UPOQAManager( resolution_init, resolution_final, maxiter, maxfev, proj_onto_ele, coords, xform_bounds, params, ele_names, disp_level, ) # apply bound check for xforms if `bounds` is not None xforms = manager.wrap_xforms_with_bounds(xforms) if xform_bounds else xforms def element_generator(ele_idx: int) -> Callable[[np.ndarray], float]: def ele_fun_eval(x: np.ndarray) -> float: """ Wrapped element function, which internally tracks its evaluation count and raises :class:`~upoqa.utils.manager.MaxEvalNumReached` if the maximum number of evaluations is exceeded, or ``ValueError`` if the function returns an invalid value. """ nonlocal manager if manager.nfev[ele_idx] >= manager.maxfev[ele_idx]: raise MaxEvalNumReached( ele_idx=ele_idx, ele_name=ele_names[ele_idx] ) manager.nfev[ele_idx] += 1 # increase the evaluation counter try: eval_result = fun[ele_idx](x) except Exception as e: raise ValueError( f"(element {ele_names[ele_idx]}) Exception raised by " "the objective function: " f"{e}\n\n{traceback.format_exc()}" ) if params("debug.check_nan_fval") and np.any(np.isnan(eval_result)): raise ValueError( f"(element {ele_names[ele_idx]}) NaN encountered in the return value" f" of the objective function. \nreturn value = {eval_result}" ) return float(eval_result) return ele_fun_eval # Initialize the interpolation sets, the surrogate models and some other parameters. element_funcs: Dict[Any, Callable] = dict() ele_interp_sets: List[InterpSet] = [None for _ in ele_idxs] ele_models: List[QuadSurrogate] = [None for _ in ele_idxs] if disp_level >= 1: import tqdm progress_bar = tqdm.tqdm( total=len(ele_names), desc="Initializing surrogate models" ) for ele_idx in ele_idxs: if disp_level >= 1: progress_bar.set_postfix({"element": ele_names[ele_idx]}) element_funcs[ele_idx] = element_generator(ele_idx) ele_interp_set = InterpSet(n=dims[ele_idx], npt=npt[ele_idx]) ele_interp_sets[ele_idx] = ele_interp_set.init_interp_set( fun=element_funcs[ele_idx], center=proj_onto_ele(x_best, ele_idx), step_size=resolution_init, ) ele_models[ele_idx] = QuadSurrogate( interp_set=ele_interp_set, center=proj_onto_ele(x_best, ele_idx), n=dims[ele_idx], ) if disp_level >= 1: progress_bar.update(1) if disp_level >= 1: progress_bar.close() def fun_eles_eval(x: np.ndarray) -> Tuple[List[float], float]: """Given input x, return [ele1(x), ele2(x), ..., elek(x)] and extra_fun(x).""" fval_eles = [ element_funcs[ele_idx](proj_onto_ele(x, ele_idx)) for ele_idx in ele_idxs ] try: extra_fval = float(extra_fun[0](x)) except Exception as e: raise ValueError( f"Exception raised by the extra smooth objective function: " f"{e}\n\n{traceback.format_exc()}" ) if params("debug.check_nan_fval") and np.any(np.isnan(extra_fval)): raise ValueError( f"NaN encountered in the return value" f" of the extra objective function. \nreturn value = {extra_fval}" ) return fval_eles, extra_fval # Initialize the overall interpolation set overall_interp_set = OverallInterpSet( n=n, set_size=params("init.overall_interp_set_size"), max_size=int( max( params("general.overall_interp_set_size_factor") * n, params("init.overall_interp_set_size"), ) ), ele_interp_sets=ele_interp_sets, proj_onto_ele=proj_onto_ele, ele_names=ele_names, ) # Initialize the overall surrogate model overall_surrogate = OverallSurrogate( n=n, interp_set=overall_interp_set, proj_onto_ele=proj_onto_ele, coords=coords, ele_models=ele_models, extra_fun=extra_fun, ele_names=ele_names, params=params, ) manager.couple_with_utils(overall_interp_set, overall_surrogate) manager.set_weights(weights) manager.set_xforms(xforms) if params("init.search_opt_x0"): # Search the optimal start point from combinations of elemental interpolation points. start_x, start_fval, fval_eles, extra_fval, exit_info = ( manager.find_best_start_point() ) else: # otherwise, start from x0. start_x = x0 fval_eles = [ ele_interp_sets[ele_idx].get_interp_set(0)[1] for ele_idx in ele_idxs ] extra_fval = float(extra_fun[0](x0)) start_fval = manager.build_fval(fval_eles, extra_fval) # Prepare some useful auxiliary functions def update_short_step_count(step_max_ele_norm: float): """Updates the count of short steps and very short steps.""" nonlocal n_short_steps, n_very_short_steps if ( step_max_ele_norm <= params("general.short_step_thresh") * manager.resolution ): if manager.max_radius > manager.resolution: n_short_steps, n_very_short_steps = 0, 0 else: n_short_steps += 1 n_very_short_steps += 1 if ( step_max_ele_norm > params("general.very_short_step_thresh") * manager.resolution ): n_very_short_steps = 0 return True return False ele_fs_best: List[float] = [None for _ in ele_idxs] def update_x_best_in_history( x_new: np.ndarray, f_new: float, ele_fs_new: List[float], extra_f_new: float ) -> Optional[ExitInfo]: """ Updates the best x and its elemental, overall and extra function value in history. """ nonlocal x_best, f_best, ele_fs_best, extra_f_best if f_new < f_best: x_best, f_best, ele_fs_best, extra_f_best = ( x_new.copy(), float(f_new), [float(x) for x in deepcopy(ele_fs_new)], float(extra_f_new), ) # if the current objective function value is already <= f_target, terminate. if f_target is not None and f_best <= f_target: return ExitInfo( flag=ExitStatus.SUCCESS, msg=( f"The current objective function value {f_best:.4} " "is already <= ftaget." ), ) return None def refresh_f_best() -> None: """ refresh the best elemental, overall and extra function value in history in case the weights or xforms are updated by the user. """ nonlocal f_best, ele_fs_best, extra_f_best extra_f_best = float(manager.extra_fun[0](x_best)) f_best = manager.build_fval(ele_fs_best, extra_f_best) def check_and_try_to_restart( exit_info: Optional[ExitInfo] = None, ) -> Tuple[Optional[ExitInfo], bool]: """ Trying to launch a restart, and return the updated ``exit_info`` and a flag indicating whether a restart has been initiated successfully. """ nonlocal nit_in_a_run, n_short_steps, n_very_short_steps, n_alt_models, manager, xk, fval_k if exit_info: if manager.params("restarts.use_restarts"): if disp_level >= 3: print( f" Trying to launch a restart instead of terminating " f"by: {exit_info.message()}" ) exit_info = manager.prepare_for_a_restart() if exit_info: return exit_info, False exit_info = manager.soft_restart(fun_eles_eval) if exit_info: return exit_info, False nit_in_a_run, n_short_steps, n_very_short_steps = 0, 0, 0 n_alt_models = [0 for _ in ele_names] manager.resolution_final *= manager.params( "restarts.resolution_final_scale" ) manager.reset_restart_track_info() xk, fval_k = overall_interp_set.get_opt() return None, True else: return exit_info, False # quit return None, False def wrapped_callback() -> bool: """The wrapped callback function.""" if callback: try: if set(callback_sig.parameters) == {"intermediate_result"}: # callback function with signature `callback(intermediate_result)` state_dict = dict( x=np.copy(xk), fun=fval_k, jac=np.copy(grad_k), funs=deepcopy(ele_fvals_k), extra_fun=extra_fval_k, nit=nit + 1, nrun=manager.nrun + 1, nfev=deepcopy(manager.nfev), avg_nfev=np.mean(manager.nfev), max_nfev=max(manager.nfev), ) if debug or return_internals: state_dict.update( dict( manager=manager, interp_set=overall_interp_set, model=overall_surrogate, ) ) if not is_dict_input: state_dict.update( dict( funs=list(state_dict["funs"].values()), nfev=list(state_dict["nfev"].values()), ) ) state = OptimizeResult(**state_dict) return callback(state) else: # callback function with signature `callback(xk)` return callback(np.copy(xk)) except StopIteration: return True return False if exit_info: return _get_result_of_a_bad_run( exit_info=exit_info, manager=manager, disp=disp_level, return_internals=return_internals or debug, is_dict_input=is_dict_input, ) # Add the start point into the overall interpolation set and get ready to # start the algorithm overall_interp_set.append(start_x, start_fval, fval_eles, extra_fval) manager.shift_center_to(start_x) update_x_best_in_history(*(overall_interp_set.get_opt(verbose=True))) xk, fval_k = overall_interp_set.get_opt() grad_k = overall_surrogate.grad_eval(xk) old_grad, old_hess = grad_k, overall_surrogate.hess_eval(xk) # Initialize the iteration counter and other tracking variables nit_in_a_run, n_short_steps, n_very_short_steps = 0, 0, 0 n_alt_models = [0 for _ in ele_names] # Print header if disp_level >= 2: print( f"# UPOQA ready to start (nrun: {manager.nrun})\n" f" nfev: \n {dict(zip(ele_names, manager.nfev))}\n" f" radii: \n {dict(zip(ele_names, manager.radii))}\n" f" resolution: {manager.resolution:.12}\n" f" obj: {fval_k:.12}" ) elif disp_level == 1: print( "{:^5}{:^7}{:^10}{:^10}{:^11}{:^12}{:^7}".format( "Run", "Iter", "Obj", "Grad", "Delta", "Resolution", "Evals" ) ) # Start the main loop nit = -1 # necessary if maxiter = 0. for nit in range(maxiter): n_want_GI = 0 last_radii = deepcopy(manager.radii) want_reduce_resolution, reach_minimum_resolution_flag = False, False want_improve_geometry = [False for _ in ele_names] nit_in_a_run += 1 xk, fval_k, ele_fvals_k, extra_fval_k = overall_interp_set.get_opt( verbose=True ) grad_k = overall_surrogate.grad_eval(xk) if disp_level >= 2: print(f"\n# Iteration {nit_in_a_run} (nrun: {manager.nrun + 1})") # callback. The user is free to vary the `weights` and `xforms` inside the callback # function `callback(state)` by calling `state.manager.update_weights(new_weights)` # and `state.manager.update_xforms(new_xforms)`. if wrapped_callback(): exit_info = ExitInfo( ExitStatus.SUCCESS, "Terminated by user-defined callback. " ) break # Update f_best in case the user has changed the weights or xforms. refresh_f_best() overall_interp_set.update_fval_with_new_weights_and_xforms( weights=manager.weights, xforms=manager.xforms, extra_fun_eval=manager.extra_fun[0], ) # Detect whether to call a restart restart_detected_flag = manager.decide_to_restart_due_to_exploding_coeff() if restart_detected_flag: exit_info, continue_flag = check_and_try_to_restart( ExitInfo( ExitStatus.AUTO_DETECT_RESTART_WARNING, "Auto-detected restart. ", ) ) if exit_info: break elif continue_flag: continue # Update xk, x_best xk, fval_k, ele_fvals_k, extra_fval_k = overall_interp_set.get_opt( verbose=True ) grad_k = overall_surrogate.grad_eval(xk) exit_info = update_x_best_in_history(xk, fval_k, ele_fvals_k, extra_fval_k) if exit_info: break # Shift model center to xk manager.shift_center_to(xk) # Calculate the trust region step try: surrogate_fvalk = overall_surrogate.fun_eval(xk) if tr_shape == "structured": # Solve the trust region subproblem using gradient projection method envelope_radius = manager.max_radius * envelope_factor sk = conjugate_gradient_proj_steinmetz( fun=lambda s: overall_surrogate.fun_eval(xk + s) - surrogate_fvalk, grad=lambda s: overall_surrogate.grad_eval(xk + s), hess_prod=lambda s, v: overall_surrogate.hess_operator( xk + s, v ), coords_mask=coords_mask, n=n, deltas=np.array(manager.radii), envelope_delta=envelope_radius, ) elif tr_shape == "spherical": # Solve the trust region subproblem using tangential_byrd_omojokun sk = tangential_byrd_omojokun( fun=lambda s: overall_surrogate.fun_eval(xk + s) - surrogate_fvalk, grad=lambda s: overall_surrogate.grad_eval(xk + s), hess_prod=lambda s, v: overall_surrogate.hess_operator( xk + s, v ), delta=min(manager.radii), n=n, ) else: raise RuntimeError(f"Unknown tr_shape: {tr_shape}.") x_hold = xk + sk decrease = surrogate_fvalk - overall_surrogate.fun_eval(x_hold) s_norm = LA.norm(sk) # tr_vio is short for "trust region violation", which is the ratio of the norm # of the trust region step to the radius. tr_vios = get_violation_ratio(sk, coords, np.array(manager.radii)) tr_max_vio: float = tr_vios.max() step_max_ele_norm: float = (tr_vios * np.asarray(manager.radii)).max() if tr_shape == "structured" and not extra_fun_enabled: checked_step_norm = step_max_ele_norm else: checked_step_norm = s_norm except (LA.LinAlgError, ZeroDivisionError) as e: # If there is a numerical error when calculating the trust region step, # try to restart the algorithm. exit_info, continue_flag = check_and_try_to_restart( ExitInfo( flag=ExitStatus.LINALG_ERROR, msg=( "Numerical error encountered when calculating trust region" f" step: {e} " ), exception=e, traceback=traceback.format_exc(), ) ) if exit_info: break elif continue_flag: continue # Decide whether or not to accept the new point `x_hold` depending on the norm # of trust region step. if update_short_step_count(checked_step_norm): # If the norm of trust region step is small if disp_level == 3: print( f" Detect short step (step norm = {s_norm:.6}, " f"max elemental step norm = {step_max_ele_norm:.6}, " f"n_short_steps = {n_short_steps}," f" n_very_short_steps = {n_very_short_steps})" ) elif disp_level == 2: print( f" Detect short step (step norm = {s_norm:.6}, " f"max elemental step norm = {step_max_ele_norm:.6})" ) for ele_idx in ele_idxs: manager.set_radius( params("tr_radius.alpha2") * manager.radii[ele_idx], ele_idx ) if n_short_steps >= params( "general.max_short_steps" ) or n_very_short_steps >= params("general.max_very_short_steps"): # If the number of short steps exceeds the threshold, reduce the trust region resolution. n_short_steps, n_very_short_steps = 0, 0 want_reduce_resolution = True else: try: idx_to_replaced_eles, dist_new_eles, _ = ( manager.get_index_to_remove() ) except (LA.LinAlgError, ZeroDivisionError): exit_info, continue_flag = check_and_try_to_restart( flag=ExitStatus.LINALG_ERROR, msg="Singular matrix when choosing point to replace for " + "geometry-improving procedure (short step). ", exception=e, traceback=traceback.format_exc(), ) if exit_info: break elif continue_flag: continue want_improve_geometry = [ dist_new_eles[ele_idx] > max(manager.radii[ele_idx], 2.0 * manager.resolution) for ele_idx in ele_idxs ] n_want_GI = np.array(want_improve_geometry).sum() want_reduce_resolution = True if (n_want_GI == 0) else False else: # If the norm of trust region step is big enough n_short_steps, n_very_short_steps = 0, 0 try: old_fval_eles = fval_eles fval_eles, extra_fval = fun_eles_eval(x_hold) fval = manager.build_fval(fval_eles, extra_fval) except MaxEvalNumReached as e: exit_info = ExitInfo( flag=ExitStatus.SUCCESS, msg=f"(element {e.ele_name}) Objective has been called MAXFUN times. ", ) break decrease_extra_fval = extra_fval_k - extra_fval # Calculate decreases of element models, and decreases of the value of # `weight * xform(element model or element obj-func value)` decrease_eles, decrease_wx_m_eles, decrease_wx_f_eles, fval_wx_eles = ( manager.calc_detailed_decrease(xk, x_hold, old_fval_eles, fval_eles) ) # Calculate the reduction ratio ratio, ratios, exit_info = manager.get_reduction_ratio( fval, fval_k, decrease, fval_eles, ele_fvals_k, decrease_eles ) exit_info, continue_flag = check_and_try_to_restart(exit_info) if exit_info: break elif continue_flag: continue # Decide which interpolation point to delete try: idx_to_replaced_eles, _, cached_kkt_info_eles = ( manager.get_index_to_remove(x=x_hold, center=xk) ) except (LA.LinAlgError, ZeroDivisionError): exit_info, continue_flag = check_and_try_to_restart( flag=ExitStatus.LINALG_ERROR, msg="Singular matrix when choosing point to replace. ", exception=e, traceback=traceback.format_exc(), ) if exit_info: break elif continue_flag: continue # Check slow iteration if ( manager.params("slow.terminate_when_slow") and ratio > 0 and manager.check_slow_iteration()[1] ): exit_info, continue_flag = check_and_try_to_restart( ExitInfo( flag=ExitStatus.SLOW_WARNING, msg="Maximum slow iterations reached. ", ) ) if exit_info: break elif continue_flag: continue # For each elemental surrogate model, check whether or not to accept the # new point `x_hold` as a new interpolation point. need_update, exit_info = manager.get_update_requests( idx_to_replaced_eles, tr_vios, cached_kkt_info_eles, ) if exit_info: break if disp_level >= 3: for ele_idx in ele_idxs: cached_kkt_info = cached_kkt_info_eles[ele_idx] if need_update[ele_idx]: print( f" (element {ele_names[ele_idx]}) Determinant ratio =" f" {cached_kkt_info[3][idx_to_replaced_eles[ele_idx]]:.4e}, " f"beta = {cached_kkt_info[1]:.4e}" ) else: print( f" (element {ele_names[ele_idx]}) [filtered out]" " Determinant ratio =" f" {cached_kkt_info[3][idx_to_replaced_eles[ele_idx]]:.4e}, " f"beta = {cached_kkt_info[1]:.4e}" ) # Update the interpolation set and the surrogate model overall_interp_set.update_point_on_idx( x_hold, idx_to_replaced_eles, fval_eles, fval, extra_fval, need_update, ) try: overall_surrogate.update(need_update, cached_kkt_info_eles) except SurrogateLinAlgError as e: exit_info, continue_flag = check_and_try_to_restart( ExitInfo( flag=ExitStatus.LINALG_ERROR, msg=( f"(element {e.ele_name}) Singular matrix when " "updating surrogate model. " ), exception=e, traceback=traceback.format_exc(), ) ) if exit_info: break elif continue_flag: continue # Update useful tracking information for the auto detection of restart manager.update_restart_track_info(xk, old_grad, old_hess) old_grad, old_hess = overall_surrogate.grad_eval( xk ), overall_surrogate.hess_eval(xk) grad_k = overall_surrogate.grad_eval(overall_interp_set.get_opt()[0]) # Print information of current iteration if disp_level >= 2: iter_log = "" if disp_level == 3: iter_log += f" point change indices: \n {dict(zip(ele_names, idx_to_replaced_eles))}\n" iter_log += ( f" nfev: \n {dict(zip(ele_names, manager.nfev))}\n" f" reduc-ratio: {ratio:.12}\n" f" step size: {s_norm:.12}\n" f" radii: " ) if tr_shape == "structured": iter_log += f"\n {dict(zip(ele_names, [float(x) for x in manager.radii]))}\n" else: iter_log += f"{manager.radii[0]:.2}\n" iter_log += ( f" resolution: {manager.resolution:.12}\n" f" obj change: {fval_k:.12} --> {fval:.12}\n" ) if extra_fun_enabled: iter_log += f" extra-fun change: {extra_fval_k:.12} --> {extra_fval:.12}\n" iter_log += f" value of element: \n {dict(zip(ele_names, [float(x) for x in fval_eles]))}\n" if weights_xforms_enabled: iter_log += ( f" value of weighted and transformed element on the new point:\n" f" {dict(zip(ele_names, [float(x) for x in fval_wx_eles]))}\n" ) if disp_level == 3: iter_log += ( f" norm(gk): {LA.norm(grad_k):.12}\n" f" tr_max_vio(sk): {tr_max_vio:.12}\n" f" ele-wise reduc-ratio: \n {dict(zip(ele_names, ratios))}\n" ) if weights_xforms_enabled: iter_log += ( f" model decrease (with weights and transformations):\n" f" {dict(zip(ele_names, [float(x) for x in decrease_wx_f_eles]))}\n" ) iter_log += ( f" tr_vios: \n {dict(zip(ele_names, [float(x) for x in tr_vios]))}\n" f" n_alt_models: \n {dict(zip(ele_names, n_alt_models))}\n" f" n_short_steps: {n_short_steps}\n" f" n_very_short_steps: {n_very_short_steps}\n" ) print(iter_log, end="") xk, fval_k = overall_interp_set.get_opt() # Update the trust region radii manager.update_radius( tr_vios, ratio, ratios, decrease, decrease_extra_fval, decrease_wx_m_eles, decrease_wx_f_eles, s_norm / min(manager.radii), (tr_shape == "spherical"), ) if disp_level == 1: print( "{:^5}{:^7}{:^10.2e}{:^10.2e}{:^11.2e}{:^12.2e}{:^7}".format( manager.nrun + 1, nit_in_a_run, fval_k, LA.norm(grad_k), manager.average_radius, manager.resolution, max(manager.nfev), ) ) # Consider reinitializing the surrogate model if the difference of model grad at xk # between current model and the alternative model grows too large. for ele_idx in ele_idxs: if manager.radii[ele_idx] <= manager.resolution and ratios[ ele_idx ] < params("general.alt_model_check_thresh"): surrogate = ele_models[ele_idx] n_alt_models[ele_idx] += 1 xk_ele = proj_onto_ele(xk, ele_idx) grad_norm, grad_alt_norm = ( LA.norm(surrogate.grad_eval(xk_ele)), LA.norm(surrogate.alt_grad_eval(xk_ele)), ) if disp_level == 3: print( f" (element {ele_names[ele_idx]}) Model Grad Norm =" f" {grad_norm}, Alt Model Grad Norm = {grad_alt_norm}." ) if ( grad_norm < params("general.alt_model_thresh") * grad_alt_norm ): n_alt_models[ele_idx] = 0 if n_alt_models[ele_idx] >= params( "general.max_alt_model_steps" ): surrogate.reinit() n_alt_models[ele_idx] = 0 if disp_level >= 2: print( " Reinitialize the surrogate model " f"(element {ele_names[ele_idx]})." ) else: n_alt_models[ele_idx] = 0 try: idx_to_replaced_eles, dist_new_eles, _ = ( manager.get_index_to_remove() ) except (LA.LinAlgError, ZeroDivisionError) as e: exit_info, continue_flag = check_and_try_to_restart( ExitInfo( flag=ExitStatus.LINALG_ERROR, msg=( "Singular matrix when choosing point to replace " "for geometry-improving procedure. " ), exception=e, traceback=traceback.format_exc(), ) ) if exit_info: break elif continue_flag: continue # Check whether or not to launch a geometry-improving step if ratio <= params("tr_radius.eta1"): want_improve_geometry = [ bool( dist_new_eles[ele_idx] > max(manager.radii[ele_idx], 2.0 * manager.resolution) ) for ele_idx in ele_idxs ] n_want_GI = np.array(want_improve_geometry).sum() want_reduce_resolution = (n_want_GI == 0) and ( max(last_radii) <= manager.resolution ) # Reduce the trust region resolution if want_reduce_resolution: if disp_level >= 2: print(f" Reduce resolution from {manager.resolution:.3}", end="") reach_minimum_resolution_flag = manager.reduce_resolution() if disp_level >= 2: print(f" to {manager.resolution:.3}.") # Has reduced the trust region resolution to the minimum? if reach_minimum_resolution_flag: exit_info = ExitInfo( flag=ExitStatus.SUCCESS, msg="The resolution has reached its minimum. ", ) exit_info, continue_flag = check_and_try_to_restart(exit_info) if exit_info: break elif continue_flag: continue # Improve the geometry of the interpolation set if necessary. if n_want_GI > 0: xk, fval_k, ele_fvals_k, extra_fval_k = overall_interp_set.get_opt( verbose=True ) exit_info = update_x_best_in_history( xk, fval_k, ele_fvals_k, extra_fval_k ) if exit_info: break try: step_eles, cached_kkt_info_eles = manager.get_geometry_step( idx_to_replaced_eles, want_improve_geometry ) except (LA.LinAlgError, ZeroDivisionError) as e: exit_info, continue_flag = check_and_try_to_restart( ExitInfo( flag=ExitStatus.LINALG_ERROR, msg="Singular matrix encountered in geometry-improving step. ", exception=e, traceback=traceback.format_exc(), ) ) if exit_info: break elif continue_flag: continue except SurrogateLinAlgError as e: exit_info = ExitInfo( flag=ExitStatus.LINALG_ERROR, msg=f"(element {e.ele_name}) " + str(e), exception=e, traceback=traceback.format_exc(), ) break break_flag = False for ele_idx in ele_idxs: interp_set = ele_interp_sets[ele_idx] if want_improve_geometry[ele_idx]: x_ele_GI = proj_onto_ele(xk, ele_idx) + step_eles[ele_idx] try: ele_fval = element_funcs[ele_idx](x_ele_GI) interp_set.update_point_on_idx( x_ele_GI, idx_to_replaced_eles[ele_idx], ele_fval, ) except MaxEvalNumReached: exit_info = ExitInfo( flag=ExitStatus.SUCCESS, msg=( f"(element {ele_names[ele_idx]}) Objective" " has been called MAXFUN times. " ), ) break_flag = True break if break_flag: break try: overall_surrogate.update( want_improve_geometry, cached_kkt_info_eles ) except SurrogateLinAlgError as e: exit_info, continue_flag = check_and_try_to_restart( ExitInfo( flag=ExitStatus.LINALG_ERROR, msg=f"Singular matrix when updating surrogate model" + f"(element {e.ele_name}) in geometry-improving step. ", exception=e, traceback=traceback.format_exc(), ) ) if exit_info: break elif continue_flag: continue if disp_level >= 2: print(f" Improving Geometry ...") if disp_level == 3: print( f" point change indices: \n {dict(zip(ele_names, idx_to_replaced_eles))}" ) print( f" element improved: \n {dict(zip(ele_names, want_improve_geometry))}" ) # Delete the worst points, so that the number of maintained points does not # exceed `overall_interp_set.max_size``. overall_interp_set.clear_invalid_point() exit_info = exit_info or ExitInfo( flag=ExitStatus.SUCCESS, msg="Maximum iterations reached." ) update_x_best_in_history(*(overall_interp_set.get_opt(verbose=True))) if disp_level >= 1: print(TERMINATION_MSG.format(msg=exit_info.message())) # Prepare the final result result = dict( x=x_best, fun=f_best, funs=dict(zip(ele_names, ele_fs_best)), extra_fun=extra_f_best, jac=overall_surrogate.grad_eval(x_best), hess=overall_surrogate.hess_eval(x_best), success=True if exit_info.flag >= 0 else False, message=exit_info.message(with_stem=True), nit=nit + 1, nfev=dict(zip(ele_names, manager.nfev)), avg_nfev=np.mean(manager.nfev), max_nfev=max(manager.nfev), nrun=manager.nrun + 1, ) if not is_dict_input: result.update( dict( funs=list(result["funs"].values()), nfev=list(result["nfev"].values()), ) ) if debug or return_internals: result.update( dict( manager=manager, interp_set=overall_interp_set, model=overall_surrogate, ) ) if exit_info.exception: # If there is an exception, add it to the result. result.update( dict(exception=exit_info.exception, traceback=exit_info.traceback) ) return OptimizeResult(**result) except Exception as e: tr_e = traceback.format_exc() flag = ( ExitStatus.INVALID_EVAL_ERROR if isinstance(e, ValueError) else ExitStatus.UNKNOWN_ERROR ) exit_info = ExitInfo(flag, str(e), e, tr_e) if "disp_level" in vars() and disp_level >= 1: print(TERMINATION_MSG.format(msg=exit_info.message())) try: res_jac = ( overall_surrogate.grad_eval(x_best) if ("x_best" in vars() and "overall_surrogate" in vars()) else None ) except: res_jac = None try: res_hess = ( overall_surrogate.hess_eval(x_best) if ("x_best" in vars() and "overall_surrogate" in vars()) else None ) except: res_hess = None try: result = dict( x=x_best if "x_best" in vars() else None, fun=f_best if "f_best" in vars() else None, funs=( dict(zip(ele_names, ele_fs_best)) if ("ele_names" in vars() and "ele_fs_best" in vars()) else None ), extra_fun=( extra_f_best if ("x_best" in vars() and "extra_f_best" in vars()) else None ), jac=res_jac, hess=res_hess, success=False, message=exit_info.message(), exception=exit_info.exception, traceback=exit_info.traceback, nit=nit + 1 if "nit" in vars() else None, nrun=manager.nrun + 1 if "manager" in vars() else None, nfev=( dict(zip(ele_names, manager.nfev)) if ("ele_names" in vars() and "manager" in vars()) else None ), avg_nfev=np.mean(manager.nfev) if "manager" in vars() else None, max_nfev=max(manager.nfev) if "manager" in vars() else None, ) if not is_dict_input: result.update( dict( funs=( list(result["funs"].values()) if isinstance(result["funs"], dict) else None ), nfev=( list(result["nfev"].values()) if isinstance(result["nfev"], dict) else None ), ) ) if debug or return_internals: result.update( dict( manager=manager if "manager" in vars() else None, interp_set=( overall_interp_set if "overall_interp_set" in vars() else None ), model=( overall_surrogate if "overall_surrogate" in vars() else None ), ) ) return OptimizeResult(**result) except Exception as e2: warnings.warn( f"During handling the unknown exception: {e}\n\n{tr_e}\n\n" f"Another exception is raised: {e2}\n\n{traceback.format_exc()}\n\n" "Return empty optimization result." ) return OptimizeResult()
def _get_result_of_a_bad_run( exit_info: ExitInfo, manager: Optional[UPOQAManager] = None, disp: int = 1, return_internals: bool = False, is_dict_input: bool = True, ) -> OptimizeResult: """ Return an empty optimization result when the optimization process is terminated abnormally. """ if disp >= 1: print(TERMINATION_MSG.format(msg=exit_info.message())) result = dict( x=None, fun=None, funs=None, extra_fun=None, nit=0, nrun=manager.nrun + 1 if manager and hasattr(manager, "nrun") else 0, nfev=( dict(zip(manager.ele_names, manager.nfev)) if manager and hasattr(manager, "nfev") else None ), avg_nfev=np.mean(manager.nfev) if manager and hasattr(manager, "nfev") else 0, max_nfev=max(manager.nfev) if manager and hasattr(manager, "nfev") else 0, jac=None, hess=None, success=False, message=exit_info.message(), ) if not is_dict_input: result.update( dict( nfev=( list(result["nfev"].values()) if isinstance(result["nfev"], dict) else None ) ) ) if return_internals and manager: result.update( dict(manager=manager, interp_set=manager.interp_set, model=manager.model) ) if exit_info.exception: result.update( dict( exception=exit_info.exception, traceback=exit_info.traceback, ) ) return OptimizeResult(**result) def _build_coords_mask(coords: List[np.ndarray], n: int): q = len(coords) result = np.zeros((q, n), dtype=bool) for i in range(q): result[i][coords[i]] = np.True_ return result