Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 112 additions & 58 deletions pyomo/contrib/doe/doe.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,6 @@ def run_doe(self, model=None, results_file=None):
results_file: string name of the file path to save the results
to in the form of a .json file
default: None --> don't save

"""
# Check results file name
if results_file is not None:
Expand Down Expand Up @@ -376,6 +375,11 @@ def run_doe(self, model=None, results_file=None):
if self.objective_option == ObjectiveLib.trace:
trace_val = np.trace(np.linalg.pinv(self.get_FIM()))
model.obj_cons.egb_fim_block.outputs["A-opt"].set_value(trace_val)
elif self.objective_option == ObjectiveLib.pseudo_trace:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not see any tests for the pseudo_trace objective in test_greybox.py

pseudo_trace_val = np.trace(np.array(self.get_FIM()))
model.obj_cons.egb_fim_block.outputs["pseudo-A-opt"].set_value(
pseudo_trace_val
)
elif self.objective_option == ObjectiveLib.determinant:
det_val = np.linalg.det(np.array(self.get_FIM()))
model.obj_cons.egb_fim_block.outputs["log-D-opt"].set_value(
Expand All @@ -389,63 +393,8 @@ def run_doe(self, model=None, results_file=None):
cond_number = np.log(np.abs(np.max(eig) / np.min(eig)))
model.obj_cons.egb_fim_block.outputs["ME-opt"].set_value(cond_number)

# If the model has L, initialize it with the solved FIM
if hasattr(model, "L"):
# Get the FIM values
fim_vals = [
pyo.value(model.fim[i, j])
for i in model.parameter_names
for j in model.parameter_names
]
fim_np = np.array(fim_vals).reshape(
(len(model.parameter_names), len(model.parameter_names))
)

# Need to compute the full FIM before
# initializing the Cholesky factorization
if self.only_compute_fim_lower:
fim_np = fim_np + fim_np.T - np.diag(np.diag(fim_np))

# Check if the FIM is positive definite
# If not, add jitter to the diagonal
# to ensure positive definiteness
min_eig = np.min(np.linalg.eigvals(fim_np))

if min_eig < _SMALL_TOLERANCE_DEFINITENESS:
# Raise the minimum eigenvalue to at
# least _SMALL_TOLERANCE_DEFINITENESS
jitter = np.min(
[
-min_eig + _SMALL_TOLERANCE_DEFINITENESS,
_SMALL_TOLERANCE_DEFINITENESS,
]
)
else:
# No jitter needed
jitter = 0

# Add jitter to the diagonal to ensure positive definiteness
L_vals_sq = np.linalg.cholesky(
fim_np + jitter * np.eye(len(model.parameter_names))
)
for i, c in enumerate(model.parameter_names):
for j, d in enumerate(model.parameter_names):
model.L[c, d].value = L_vals_sq[i, j]

# Initialize the inverse of L if it exists
if hasattr(model, "L_inv"):
L_inv_vals = np.linalg.inv(L_vals_sq)

for i, c in enumerate(model.parameter_names):
for j, d in enumerate(model.parameter_names):
if i >= j:
model.L_inv[c, d].value = L_inv_vals[i, j]
else:
model.L_inv[c, d].value = 0.0
# Initialize the cov_trace if it exists
if hasattr(model, "cov_trace"):
initial_cov_trace = np.sum(L_inv_vals**2)
model.cov_trace.value = initial_cov_trace
# Keep Cholesky-related variables synchronized with current FIM values
self._initialize_cholesky_from_fim(model=model)

if hasattr(model, "determinant"):
model.determinant.value = np.linalg.det(np.array(self.get_FIM()))
Expand Down Expand Up @@ -537,6 +486,105 @@ def run_doe(self, model=None, results_file=None):
with open(results_file, "w") as file:
json.dump(self.results, file)

def _compute_cholesky_jitter(self, min_eig):
"""
Compute diagonal regularization for Cholesky initialization.

Parameters
----------
min_eig: float
Minimum eigenvalue of the current FIM estimate.

Returns
-------
float
Nonnegative diagonal shift needed so the minimum eigenvalue
is at least ``_SMALL_TOLERANCE_DEFINITENESS``.
"""
return max(0.0, _SMALL_TOLERANCE_DEFINITENESS - float(min_eig))

def _get_fim_numpy(self, model):
"""
Assemble the current FIM variable values into a NumPy array.

Parameters
----------
model: ConcreteModel
DoE model containing variable ``fim``.

Returns
-------
ndarray
Dense FIM array. If ``only_compute_fim_lower`` is True, the
returned array is symmetrized from the lower triangle.
"""
fim_vals = [
pyo.value(model.fim[i, j])
for i in model.parameter_names
for j in model.parameter_names
]
fim_np = np.array(fim_vals, dtype=float).reshape(
(len(model.parameter_names), len(model.parameter_names))
)
if self.only_compute_fim_lower:
fim_np = fim_np + fim_np.T - np.diag(np.diag(fim_np))
return fim_np

def _initialize_cholesky_from_fim(self, model=None):
"""
Synchronize Cholesky-related variables using the current FIM.

Parameters
----------
model: ConcreteModel, optional
DoE model to update. Defaults to ``self.model``.

Returns
-------
None
Updates model values in place for available variables:
``L``, ``L_inv``, ``fim_inv``, and ``cov_trace``.
"""
if model is None:
model = self.model
if not hasattr(model, "L"):
return

fim_np = self._get_fim_numpy(model)
min_eig = float(np.min(np.linalg.eigvalsh(fim_np)))
jitter = self._compute_cholesky_jitter(min_eig)
fim_pd = fim_np + jitter * np.eye(len(model.parameter_names))

L_vals = np.linalg.cholesky(fim_pd)
for i, c in enumerate(model.parameter_names):
for j, d in enumerate(model.parameter_names):
if i >= j:
model.L[c, d].value = L_vals[i, j]
else:
model.L[c, d].value = 0.0

if hasattr(model, "L_inv"):
L_inv_vals = np.linalg.inv(L_vals)
for i, c in enumerate(model.parameter_names):
for j, d in enumerate(model.parameter_names):
if i >= j:
model.L_inv[c, d].value = L_inv_vals[i, j]
else:
model.L_inv[c, d].value = 0.0

if hasattr(model, "fim_inv"):
fim_inv_vals = np.linalg.inv(fim_pd)
for i, c in enumerate(model.parameter_names):
for j, d in enumerate(model.parameter_names):
if self.only_compute_fim_lower and i < j:
model.fim_inv[c, d].value = 0.0
else:
model.fim_inv[c, d].value = fim_inv_vals[i, j]

if hasattr(model, "cov_trace"):
fim_inv_np = np.linalg.inv(fim_pd)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you are making the fim PD by adding the jitter, I believe the fim_pd will probably be non-singular. But if it is singular for some edge cases, then isn't it better to use the pseudo-inverse here just to be safe?

model.cov_trace.value = np.trace(fim_inv_np)

# Perform multi-experiment doe (sequential, or ``greedy`` approach)
def run_multi_doe_sequential(self, N_exp=1):
raise NotImplementedError("Multiple experiment optimization not yet supported.")
Expand Down Expand Up @@ -898,6 +946,7 @@ def create_doe_model(self, model=None):
self.only_compute_fim_lower
and self.objective_option == ObjectiveLib.determinant
and not self.Cholesky_option
and not self.use_grey_box
):
raise ValueError(
"Cannot compute determinant with explicit formula "
Expand Down Expand Up @@ -1663,6 +1712,11 @@ def FIM_egb_cons(m, p1, p2):
model.objective = pyo.Objective(
expr=model.obj_cons.egb_fim_block.outputs["A-opt"], sense=pyo.minimize
)
elif self.objective_option == ObjectiveLib.pseudo_trace:
model.objective = pyo.Objective(
expr=model.obj_cons.egb_fim_block.outputs["pseudo-A-opt"],
sense=pyo.maximize,
)
elif self.objective_option == ObjectiveLib.determinant:
model.objective = pyo.Objective(
expr=model.obj_cons.egb_fim_block.outputs["log-D-opt"],
Expand Down
15 changes: 12 additions & 3 deletions pyomo/contrib/doe/grey_box_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,10 @@ def __init__(self, doe_object, objective_option="determinant", logger_level=None
def _get_FIM(self):
# Grabs the current FIM subject
# to the input values.
# This function currently assumes
# that we use a lower triangular
# FIM.
# Inputs store one triangular half
# of a symmetric FIM. Reconstruct
# the full symmetric matrix here,
# consistent with manuscript S5.
upt_FIM = self._input_values

# Create FIM in the correct way
Expand Down Expand Up @@ -164,6 +165,8 @@ def output_names(self):

if self.objective_option == ObjectiveLib.trace:
obj_name = "A-opt"
elif self.objective_option == ObjectiveLib.pseudo_trace:
obj_name = "pseudo-A-opt"
elif self.objective_option == ObjectiveLib.determinant:
obj_name = "log-D-opt"
elif self.objective_option == ObjectiveLib.minimum_eigenvalue:
Expand Down Expand Up @@ -196,6 +199,8 @@ def evaluate_outputs(self):

if self.objective_option == ObjectiveLib.trace:
obj_value = np.trace(np.linalg.pinv(M))
elif self.objective_option == ObjectiveLib.pseudo_trace:
obj_value = np.trace(M)
elif self.objective_option == ObjectiveLib.determinant:
sign, logdet = np.linalg.slogdet(M)
obj_value = logdet
Expand Down Expand Up @@ -231,6 +236,8 @@ def finalize_block_construction(self, pyomo_block):
# objective function.
if self.objective_option == ObjectiveLib.trace:
pyomo_block.outputs["A-opt"] = output_value
elif self.objective_option == ObjectiveLib.pseudo_trace:
pyomo_block.outputs["pseudo-A-opt"] = output_value
elif self.objective_option == ObjectiveLib.determinant:
pyomo_block.outputs["log-D-opt"] = output_value
elif self.objective_option == ObjectiveLib.minimum_eigenvalue:
Expand Down Expand Up @@ -268,6 +275,8 @@ def evaluate_jacobian_outputs(self):
# is -inv(FIM) @ inv(FIM). Add reference to
# pyomo.DoE 2.0 manuscript S.I.
jac_M = -Minv @ Minv
elif self.objective_option == ObjectiveLib.pseudo_trace:
jac_M = np.eye(self._n_params, dtype=np.float64)
elif self.objective_option == ObjectiveLib.determinant:
Minv = np.linalg.pinv(M)
# Derivative formula derived using tensor
Expand Down
Loading
Loading